You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "zhaoshijie (JIRA)" <ji...@apache.org> on 2019/05/13 03:54:00 UTC

[jira] [Updated] (FLINK-12494) JDBCOutputFormat support reconnect when link failure and flush by timeInterval

     [ https://issues.apache.org/jira/browse/FLINK-12494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

zhaoshijie updated FLINK-12494:
-------------------------------
    Description: 
when i JDBCSink(flink-1.4.2) wite recode to mysql,find exception as flow :

 
{code:java}
java.util.concurrent.ExecutionException: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet successfully received from the server was 265,251 milliseconds ago. The last packet sent successfully to the server was 265,252 milliseconds ago.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.get(StreamRecordQueueEntry.java:68)
at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:129)
... 2 more
Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet successfully received from the server was 265,251 milliseconds ago. The last packet sent successfully to the server was 265,252 milliseconds ago.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1116)
at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3364)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:1983)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2163)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2624)
at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2127)
at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2293)
at org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction.executeQuery(JDBCDimensionTableFunction.scala:199)
at org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction.executeQueryAndCombine(JDBCDimensionTableFunction.scala:139)
at org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction$$anon$1.apply(JDBCDimensionTableFunction.scala:83)
at org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction$$anon$1.apply(JDBCDimensionTableFunction.scala:73)
at org.apache.flink.streaming.api.functions.async.DimensionTableJoinFunction.lambda$asyncInvoke$0(DimensionTableJoinFunction.java:105)
at akka.dispatch.Futures$$anonfun$future$1.apply(Future.scala:97)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
Caused by: java.net.SocketException: Connection reset
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3345)
... 16 more
{code}
i think it is too long not write record by connection(idleConnection),server close connection initiative. sparse data is relatively common in fact, so i think we should reconnect when then connection is invalid。

besides,i find JDBCOutputFormat.flush only call by snapshotState method and "batchCount >= batchInterval",also if ours sink records is sparse, we will find actual write happended by very large time delay,so should we add a flush condition:currentTime- lastFlushTime > timeInterval?

 

  was:
when i JDBCSink(flink-1.4.2) wite recode to mysql,find exception as flow :
{code:java}
java.util.concurrent.ExecutionException: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet successfully received from the server was 265,251 milliseconds ago. The last packet sent successfully to the server was 265,252 milliseconds ago.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.get(StreamRecordQueueEntry.java:68)
at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:129)
... 2 more
Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet successfully received from the server was 265,251 milliseconds ago. The last packet sent successfully to the server was 265,252 milliseconds ago.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1116)
at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3364)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:1983)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2163)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2624)
at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2127)
at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2293)
at org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction.executeQuery(JDBCDimensionTableFunction.scala:199)
at org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction.executeQueryAndCombine(JDBCDimensionTableFunction.scala:139)
at org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction$$anon$1.apply(JDBCDimensionTableFunction.scala:83)
at org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction$$anon$1.apply(JDBCDimensionTableFunction.scala:73)
at org.apache.flink.streaming.api.functions.async.DimensionTableJoinFunction.lambda$asyncInvoke$0(DimensionTableJoinFunction.java:105)
at akka.dispatch.Futures$$anonfun$future$1.apply(Future.scala:97)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
Caused by: java.net.SocketException: Connection reset
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3345)
... 16 more
{code}

i think it is too long not write record by connection(idleConnection),server close connection initiative. sparse data is relatively common in fact, so i think we should reconnect when then connection is invalid。
besides,i find JDBCOutputFormat.flush only call by snapshotState method and "batchCount >= batchInterval",also if ours sink records is sparse, we will find actual write happended by very large time delay,so should we add a flush condition:currentTime- lastFlushTime > timeInterval?

 


> JDBCOutputFormat support reconnect when link failure and flush by timeInterval
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-12494
>                 URL: https://issues.apache.org/jira/browse/FLINK-12494
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / JDBC
>    Affects Versions: 1.8.0
>            Reporter: zhaoshijie
>            Assignee: zhaoshijie
>            Priority: Major
>
> when i JDBCSink(flink-1.4.2) wite recode to mysql,find exception as flow :
>  
> {code:java}
> java.util.concurrent.ExecutionException: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
> The last packet successfully received from the server was 265,251 milliseconds ago. The last packet sent successfully to the server was 265,252 milliseconds ago.
> at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.get(StreamRecordQueueEntry.java:68)
> at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:129)
> ... 2 more
> Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
> The last packet successfully received from the server was 265,251 milliseconds ago. The last packet sent successfully to the server was 265,252 milliseconds ago.
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
> at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1116)
> at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3364)
> at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:1983)
> at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2163)
> at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2624)
> at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2127)
> at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2293)
> at org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction.executeQuery(JDBCDimensionTableFunction.scala:199)
> at org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction.executeQueryAndCombine(JDBCDimensionTableFunction.scala:139)
> at org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction$$anon$1.apply(JDBCDimensionTableFunction.scala:83)
> at org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction$$anon$1.apply(JDBCDimensionTableFunction.scala:73)
> at org.apache.flink.streaming.api.functions.async.DimensionTableJoinFunction.lambda$asyncInvoke$0(DimensionTableJoinFunction.java:105)
> at akka.dispatch.Futures$$anonfun$future$1.apply(Future.scala:97)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> ... 1 more
> Caused by: java.net.SocketException: Connection reset
> at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
> at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
> at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3345)
> ... 16 more
> {code}
> i think it is too long not write record by connection(idleConnection),server close connection initiative. sparse data is relatively common in fact, so i think we should reconnect when then connection is invalid。
> besides,i find JDBCOutputFormat.flush only call by snapshotState method and "batchCount >= batchInterval",also if ours sink records is sparse, we will find actual write happended by very large time delay,so should we add a flush condition:currentTime- lastFlushTime > timeInterval?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)