You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Martijn Visser (Jira)" <ji...@apache.org> on 2022/12/16 13:01:00 UTC

[jira] [Assigned] (FLINK-30431) JDBC Connector fails to reestablish the lost DB connection.

     [ https://issues.apache.org/jira/browse/FLINK-30431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Martijn Visser reassigned FLINK-30431:
--------------------------------------

    Assignee: Jiabao Sun

> JDBC Connector fails to reestablish the lost DB connection. 
> ------------------------------------------------------------
>
>                 Key: FLINK-30431
>                 URL: https://issues.apache.org/jira/browse/FLINK-30431
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / JDBC
>    Affects Versions: 1.15.0
>         Environment: Flink 1.15.2
> flink-connector-jdbc: 1.15.0
>            Reporter: Sai Nadendla
>            Assignee: Jiabao Sun
>            Priority: Major
>              Labels: pull-request-available
>
> Our use case with JDBC connector is to sink records to Amazon Redshift DB table.
> At some point in time the connection with redshift gets closed and the Flink's JDBC connector tries to detect & reestablish the connection in the following manner in the @ JdbcOutputFormat.flush() :
> {code:java}
> 1. public synchronized void flush() throws IOException {
> 2.    ..
> 3.    
> 4.    for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
> 5.        try {
> 6.            attemptFlush();
> 7.            ...
> 8.        } catch (SQLException e) {
> 9.            ...
> 10.            try {
> 11.                if (!connectionProvider.isConnectionValid()) { <-- TRUE!
> 12.                    updateExecutor(true);  
> 13.                }
> 14.            } catch (Exception exception) {
> 15.              ....
> 16.              throw new IOException("Reestablish JDBC connection failed", exception);
> 17.            }
> 18.            ....
> 19.        }
> 20.     }
> 21.     ....
> 22. }{code}
> updateExecutor() is called (from line#12 of the above code snippet) to close statements and re-establish the DB connection. 
>  
> {code:java}
> 1. public void updateExecutor(boolean reconnect) throws SQLException, ...{
> 2.     jdbcStatementExecutor.closeStatements(); 
> 3.     jdbcStatementExecutor.prepareStatements(
> 4.             reconnect
> 5.                     ? connectionProvider.reestablishConnection()
> 6.                     : connectionProvider.getConnection());
> 7. } {code}
> h4.  
> ----
> h3. Results:
> h4. Expected:
> The connection should be re-established and the updates should be reflected on DB.
> h4. Actual:
> {color:#de350b}*The {{connection re-establish}} code is never reached/invoked !!.*{color} The closeStatements() fails/throws (as the connection is already closed).
> {noformat}
> Caused by: com.amazon.redshift.util.RedshiftException: This connection has been closed.
>     at com.amazon.redshift.jdbc.RedshiftConnectionImpl.checkClosed(RedshiftConnectionImpl.java:1095) ~[?:?]
>     at com.amazon.redshift.jdbc.RedshiftConnectionImpl.cancelQuery(RedshiftConnectionImpl.java:1299) ~[?:?]
>     at com.amazon.redshift.jdbc.RedshiftStatementImpl.cancel(RedshiftStatementImpl.java:1042) ~[?:?]
>     at com.amazon.redshift.jdbc.RedshiftStatementImpl.close(RedshiftStatementImpl.java:748) ~[?:?]
>     at org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.closeStatements(SimpleBatchStatementExecutor.java:81) ~[?:?]
>     at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.updateExecutor(JdbcOutputFormat.java:402) ~[?:?]
>     at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:226) ~[?:?]
>     at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:155) ~[?:?]
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_292]
>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[?:1.8.0_292]
>     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_292]
>     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[?:1.8.0_292]
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_292]
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_292]
>     at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
> 2022-11-09 03:00:07,510 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Stopping checkpoint coordinator for job 0f70cbc56798e19978b509bf0da0107b.
> 2022-11-09 03:00:07,517 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Job 0f70cbc56798e19978b509bf0da0107b reached terminal state FAILED.
> {noformat}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)