You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by xiexinyuan341 <xi...@163.com> on 2020/11/12 02:52:43 UTC

flink 1.11.1 使用mysql作为维表进行temporal join时,偶尔会报超时异常导致数据更新失败,请教解决方案

在flink 1.11.1 使用mysql作为维表进行temporal join时,大部分时间都能正常join,偶尔会出现mysql连接超时的情况,此时程序直接抛出异常,这条数据就不能被正确的更新到sink 表里面,请问对于这个情况有解决方案吗?

Re:Re: Re:flink 1.11.1 使用mysql作为维表进行temporal join时,偶尔会报超时异常导致数据更新失败,请教解决方案

Posted by hailongwang <18...@163.com>.

这个应该就是长时间没有数据传输导致的 链接不可用,其中可能是:
1、kakfa 的数据和稀疏,数据达到的时间间隔大于 “wait_timeout“
2、一直没有 join 上 mysql 的数据导致的。
可以设置下 数据库的 wait_timeout 看下
PS,如果这个场景,自动恢复应该是没问题的,但是需要确定下根本原因,看是正常的还是异常的,怎么去避免。
最好设置下 checkpoint,这个 kafka 的 offset 是在checkpoint 成功的时候才 ack的,这样就不会导致 这条数据被自动ack而丢弃的。
如果开启 checkpoint 的话,下游支持 upsert 或者 精确一次语意的话,就会保证全链路精确一次,要不然会最少一次,就是会重复

At 2020-11-12 13:36:11, "xiexinyuan341" <xi...@163.com> wrote:
>souce是kafka,使用JdbcRowDataLookupFunction作为维表.异常信息是这样的,看了下日志,这种异常基本上每10多分钟就会有一次.
>2020-11-12 01:00:09.028 ERROR JdbcRowDataLookupFunction.java:170 JDBC
>executeBatch error, retry times = 1
>com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link
>failure
>
>The last packet successfully received from the server was 815,816
>milliseconds ago.  The last packet sent successfully to the server was 1
>milliseconds ago.
>	at sun.reflect.GeneratedConstructorAccessor16.newInstance(Unknown Source)
>	at
>sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>	at com.mysql.jdbc.Util.handleNewInstance(Util.java:389)
>	at
>com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1038)
>	at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3422)
>	at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3322)
>	at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3762)
>	at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2435)
>	at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2582)
>	at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2535)
>	at
>com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1911)
>	at
>com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2034)
>	at
>org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:152)
>	at LookupFunction$10.flatMap(Unknown Source)
>	at
>org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82)
>	at
>org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36)
>	at
>org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>	at
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>	at
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>	at
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>	at
>org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>	at
>org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>	at StreamExecCalc$7.processElement(Unknown Source)
>	at
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>	at
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>	at
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>	at
>org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>	at
>org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>	at
>org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
>	at
>org.apache.flink.table.runtime.collector.TableFunctionCollector.outputResult(TableFunctionCollector.java:75)
>	at JoinTableFuncCollector$6.collect(Unknown Source)
>	at
>org.apache.flink.table.functions.TableFunction.collect(TableFunction.java:203)
>	at
>org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:162)
>	at LookupFunction$2.flatMap(Unknown Source)
>	at
>org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82)
>	at
>org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36)
>	at
>org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>	at
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>	at
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>	at
>org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>	at
>org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>	at
>org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>	at
>org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>	at
>org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>	at
>org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
>	at
>org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
>	at
>org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>	at
>org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>	at
>org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>	at
>org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>	at
>org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
>Caused by: java.io.EOFException: Can not read response from server. Expected
>to read 4 bytes, read 0 bytes before connection was unexpectedly lost.
>	at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2914)
>	at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3332)
>	... 46 common frames omitted
>2020-11-12 01:00:10.503 ERROR JdbcBatchingOutputFormat.java:175 JDBC
>executeBatch error, retry times = 1
>java.sql.SQLException: Could not retrieve transation read-only status server
>	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:998)
>	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:937)
>	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:926)
>	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:872)
>	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:904)
>	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:894)
>	at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3613)
>	at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3582)
>	at
>com.mysql.jdbc.PreparedStatement.executeBatch(PreparedStatement.java:1249)
>	at
>org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:71)
>	at
>org.apache.flink.connector.jdbc.internal.executor.BufferReduceStatementExecutor.executeBatch(BufferReduceStatementExecutor.java:98)
>	at
>org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:200)
>	at
>org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:171)
>	at
>org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.lambda$open$0(JdbcBatchingOutputFormat.java:120)
>	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>	at
>java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>	at
>java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>	at
>java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>	at
>java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>	at java.lang.Thread.run(Thread.java:748)
>Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException:
>Communications link failure
>
>The last packet successfully received from the server was 817,119
>milliseconds ago.  The last packet sent successfully to the server was 0
>milliseconds ago.
>	at sun.reflect.GeneratedConstructorAccessor16.newInstance(Unknown Source)
>	at
>sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>	at com.mysql.jdbc.Util.handleNewInstance(Util.java:389)
>	at
>com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1038)
>	at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3422)
>	at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3322)
>	at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3762)
>	at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2435)
>	at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2582)
>	at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2531)
>	at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2489)
>	at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1446)
>	at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3607)
>	... 14 common frames omitted
>Caused by: java.io.EOFException: Can not read response from server. Expected
>to read 4 bytes, read 0 bytes before connection was unexpectedly lost.
>	at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2914)
>	at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3332)
>	... 22 common frames omitted
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re:flink 1.11.1 使用mysql作为维表进行temporal join时,偶尔会报超时异常导致数据更新失败,请教解决方案

Posted by suisuimu <72...@qq.com>.
Hi,我使用的1.12也出现了这个问题,请问你怎么解决的呢?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re:flink 1.11.1 使用mysql作为维表进行temporal join时,偶尔会报超时异常导致数据更新失败,请教解决方案

Posted by xiexinyuan341 <xi...@163.com>.
souce是kafka,使用JdbcRowDataLookupFunction作为维表.异常信息是这样的,看了下日志,这种异常基本上每10多分钟就会有一次.
2020-11-12 01:00:09.028 ERROR JdbcRowDataLookupFunction.java:170 JDBC
executeBatch error, retry times = 1
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link
failure

The last packet successfully received from the server was 815,816
milliseconds ago.  The last packet sent successfully to the server was 1
milliseconds ago.
	at sun.reflect.GeneratedConstructorAccessor16.newInstance(Unknown Source)
	at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at com.mysql.jdbc.Util.handleNewInstance(Util.java:389)
	at
com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1038)
	at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3422)
	at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3322)
	at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3762)
	at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2435)
	at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2582)
	at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2535)
	at
com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1911)
	at
com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2034)
	at
org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:152)
	at LookupFunction$10.flatMap(Unknown Source)
	at
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82)
	at
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36)
	at
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
	at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
	at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
	at StreamExecCalc$7.processElement(Unknown Source)
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
	at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
	at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
	at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
	at
org.apache.flink.table.runtime.collector.TableFunctionCollector.outputResult(TableFunctionCollector.java:75)
	at JoinTableFuncCollector$6.collect(Unknown Source)
	at
org.apache.flink.table.functions.TableFunction.collect(TableFunction.java:203)
	at
org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:162)
	at LookupFunction$2.flatMap(Unknown Source)
	at
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82)
	at
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36)
	at
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
	at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
	at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
	at
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
	at
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
	at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
	at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
	at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
	at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
	at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
	at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
	at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
Caused by: java.io.EOFException: Can not read response from server. Expected
to read 4 bytes, read 0 bytes before connection was unexpectedly lost.
	at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2914)
	at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3332)
	... 46 common frames omitted
2020-11-12 01:00:10.503 ERROR JdbcBatchingOutputFormat.java:175 JDBC
executeBatch error, retry times = 1
java.sql.SQLException: Could not retrieve transation read-only status server
	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:998)
	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:937)
	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:926)
	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:872)
	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:904)
	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:894)
	at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3613)
	at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3582)
	at
com.mysql.jdbc.PreparedStatement.executeBatch(PreparedStatement.java:1249)
	at
org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:71)
	at
org.apache.flink.connector.jdbc.internal.executor.BufferReduceStatementExecutor.executeBatch(BufferReduceStatementExecutor.java:98)
	at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:200)
	at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:171)
	at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.lambda$open$0(JdbcBatchingOutputFormat.java:120)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException:
Communications link failure

The last packet successfully received from the server was 817,119
milliseconds ago.  The last packet sent successfully to the server was 0
milliseconds ago.
	at sun.reflect.GeneratedConstructorAccessor16.newInstance(Unknown Source)
	at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at com.mysql.jdbc.Util.handleNewInstance(Util.java:389)
	at
com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1038)
	at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3422)
	at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3322)
	at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3762)
	at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2435)
	at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2582)
	at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2531)
	at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2489)
	at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1446)
	at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3607)
	... 14 common frames omitted
Caused by: java.io.EOFException: Can not read response from server. Expected
to read 4 bytes, read 0 bytes before connection was unexpectedly lost.
	at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2914)
	at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3332)
	... 22 common frames omitted



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re:flink 1.11.1 使用mysql作为维表进行temporal join时,偶尔会报超时异常导致数据更新失败,请教解决方案

Posted by xiexinyuan341 <xi...@163.com>.
source组件使用的是 kafka connector ,  使用的JdbcRowDataLookupFunction 作为维表查询.
报错信息是下面这种:
2020-11-12 00:10:00.153 ERROR JdbcRowDataLookupFunction.java:170 JDBC
executeBatch error, retry times = 1
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link
failure

The last packet successfully received from the server was 277,705
milliseconds ago.  The last packet sent successfully to the server was 0
milliseconds ago.
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at com.mysql.jdbc.Util.handleNewInstance(Util.java:389)
	at
com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1038)
	at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3422)
	at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3322)
	at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3762)
	at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2435)
	at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2582)
	at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2535)
	at
com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1911)
	at
com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2034)
	at
org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:152)
	at LookupFunction$2.flatMap(Unknown Source)
	at
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82)
	at
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36)
	at
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
	at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
	at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
	at
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
	at
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
	at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
	at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
	at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
	at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
	at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
	at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
	at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
Caused by: java.io.EOFException: Can not read response from server. Expected
to read 4 bytes, read 0 bytes before connection was unexpectedly lost.
	at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2914)
	at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3332)
	... 26 common frames omitted
2020-11-12 00:10:01.261 ERROR JdbcRowDataLookupFunction.java:170 JDBC
executeBatch error, retry times = 1
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link
failure




--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re:flink 1.11.1 使用mysql作为维表进行temporal join时,偶尔会报超时异常导致数据更新失败,请教解决方案

Posted by hailongwang <18...@163.com>.
Hi xiexinyuan341,


我理解这边有 2 个问题:
1. “偶尔会出现连接超时”,这个的话有具体的堆栈吗。如果是因为长时间没有数据的查询导致 connection invalid 话,这个在1.12,1.11.3 中应该是解决了[1].
2. 你的 source 是什么组件呢?程序抛异常的话,自动重启或者手动重启话,如果是 “最少一次” 语义的话,应该还是会 join 上 sink 到下游的;或者可以开启 checkpoint,保证 flink 内部的 “精确一次”。


[1] https://issues.apache.org/jira/browse/FLINK-16681


Best,
hailong
在 2020-11-12 09:52:43,"xiexinyuan341" <xi...@163.com> 写道:
>在flink 1.11.1 使用mysql作为维表进行temporal join时,大部分时间都能正常join,偶尔会出现mysql连接超时的情况,此时程序直接抛出异常,这条数据就不能被正确的更新到sink 表里面,请问对于这个情况有解决方案吗?