You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Sai Nadendla (Jira)" <ji...@apache.org> on 2022/12/15 22:05:00 UTC

[jira] [Created] (FLINK-30431) JDBC Connector fails to reestablish the lost DB connection.

Sai Nadendla created FLINK-30431:
------------------------------------

             Summary: JDBC Connector fails to reestablish the lost DB connection. 
                 Key: FLINK-30431
                 URL: https://issues.apache.org/jira/browse/FLINK-30431
             Project: Flink
          Issue Type: Bug
          Components: Connectors / JDBC
    Affects Versions: 1.15.0
         Environment: Flink 1.15.2

flink-connector-jdbc: 1.15.0
            Reporter: Sai Nadendla


Our use case with JDBC connector is to sink records to Amazon Redshift DB table.
At some point in time the connection with redshift gets closed and the Flink's JDBC connector tries to detect & reestablish the connection in the following manner in the @ JdbcOutputFormat.flush() :
{code:java}
1. public synchronized void flush() throws IOException {
2.    ..
3.    
4.    for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
5.        try {
6.            attemptFlush();
7.            ...
8.        } catch (SQLException e) {
9.            ...
10.            try {
11.                if (!connectionProvider.isConnectionValid()) { <-- TRUE!
12.                    updateExecutor(true);  
13.                }
14.            } catch (Exception exception) {
15.              ....
16.              throw new IOException("Reestablish JDBC connection failed", exception);
17.            }
18.            ....
19.        }
20.     }
21.     ....
22. }{code}
updateExecutor() is called (from line#12 of the above code snippet) to close statements and re-establish the DB connection. 

 
{code:java}
1. public void updateExecutor(boolean reconnect) throws SQLException, ...{
2.     jdbcStatementExecutor.closeStatements(); 
3.     jdbcStatementExecutor.prepareStatements(
4.             reconnect
5.                     ? connectionProvider.reestablishConnection()
6.                     : connectionProvider.getConnection());
7. } {code}
h4.  
----
h3. Results:
h4. Expected:

The connection should be re-established and the updates should be reflected on DB.
h4. Actual:

{color:#de350b}*The {{connection re-establish}} code is never reached/invoked !!.*{color} The closeStatements() fails/throws (as the connection is already closed).
{noformat}
Caused by: com.amazon.redshift.util.RedshiftException: This connection has been closed.
    at com.amazon.redshift.jdbc.RedshiftConnectionImpl.checkClosed(RedshiftConnectionImpl.java:1095) ~[?:?]
    at com.amazon.redshift.jdbc.RedshiftConnectionImpl.cancelQuery(RedshiftConnectionImpl.java:1299) ~[?:?]
    at com.amazon.redshift.jdbc.RedshiftStatementImpl.cancel(RedshiftStatementImpl.java:1042) ~[?:?]
    at com.amazon.redshift.jdbc.RedshiftStatementImpl.close(RedshiftStatementImpl.java:748) ~[?:?]
    at org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.closeStatements(SimpleBatchStatementExecutor.java:81) ~[?:?]
    at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.updateExecutor(JdbcOutputFormat.java:402) ~[?:?]
    at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:226) ~[?:?]
    at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:155) ~[?:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_292]
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[?:1.8.0_292]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_292]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[?:1.8.0_292]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_292]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_292]
    at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
2022-11-09 03:00:07,510 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Stopping checkpoint coordinator for job 0f70cbc56798e19978b509bf0da0107b.
2022-11-09 03:00:07,517 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Job 0f70cbc56798e19978b509bf0da0107b reached terminal state FAILED.
{noformat}




 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)