You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Martijn Visser (Jira)" <ji...@apache.org> on 2022/12/16 08:09:00 UTC
[jira] [Updated] (FLINK-30431) JDBC Connector fails to reestablish the lost DB connection.
[ https://issues.apache.org/jira/browse/FLINK-30431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Martijn Visser updated FLINK-30431:
-----------------------------------
Priority: Major (was: Blocker)
> JDBC Connector fails to reestablish the lost DB connection.
> ------------------------------------------------------------
>
> Key: FLINK-30431
> URL: https://issues.apache.org/jira/browse/FLINK-30431
> Project: Flink
> Issue Type: Bug
> Components: Connectors / JDBC
> Affects Versions: 1.15.0
> Environment: Flink 1.15.2
> flink-connector-jdbc: 1.15.0
> Reporter: Sai Nadendla
> Priority: Major
>
> Our use case with JDBC connector is to sink records to Amazon Redshift DB table.
> At some point in time the connection with redshift gets closed and the Flink's JDBC connector tries to detect & reestablish the connection in the following manner in the @ JdbcOutputFormat.flush() :
> {code:java}
> 1. public synchronized void flush() throws IOException {
> 2. ..
> 3.
> 4. for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
> 5. try {
> 6. attemptFlush();
> 7. ...
> 8. } catch (SQLException e) {
> 9. ...
> 10. try {
> 11. if (!connectionProvider.isConnectionValid()) { <-- TRUE!
> 12. updateExecutor(true);
> 13. }
> 14. } catch (Exception exception) {
> 15. ....
> 16. throw new IOException("Reestablish JDBC connection failed", exception);
> 17. }
> 18. ....
> 19. }
> 20. }
> 21. ....
> 22. }{code}
> updateExecutor() is called (from line#12 of the above code snippet) to close statements and re-establish the DB connection.
>
> {code:java}
> 1. public void updateExecutor(boolean reconnect) throws SQLException, ...{
> 2. jdbcStatementExecutor.closeStatements();
> 3. jdbcStatementExecutor.prepareStatements(
> 4. reconnect
> 5. ? connectionProvider.reestablishConnection()
> 6. : connectionProvider.getConnection());
> 7. } {code}
> h4.
> ----
> h3. Results:
> h4. Expected:
> The connection should be re-established and the updates should be reflected on DB.
> h4. Actual:
> {color:#de350b}*The {{connection re-establish}} code is never reached/invoked !!.*{color} The closeStatements() fails/throws (as the connection is already closed).
> {noformat}
> Caused by: com.amazon.redshift.util.RedshiftException: This connection has been closed.
> at com.amazon.redshift.jdbc.RedshiftConnectionImpl.checkClosed(RedshiftConnectionImpl.java:1095) ~[?:?]
> at com.amazon.redshift.jdbc.RedshiftConnectionImpl.cancelQuery(RedshiftConnectionImpl.java:1299) ~[?:?]
> at com.amazon.redshift.jdbc.RedshiftStatementImpl.cancel(RedshiftStatementImpl.java:1042) ~[?:?]
> at com.amazon.redshift.jdbc.RedshiftStatementImpl.close(RedshiftStatementImpl.java:748) ~[?:?]
> at org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.closeStatements(SimpleBatchStatementExecutor.java:81) ~[?:?]
> at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.updateExecutor(JdbcOutputFormat.java:402) ~[?:?]
> at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:226) ~[?:?]
> at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:155) ~[?:?]
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_292]
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[?:1.8.0_292]
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_292]
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[?:1.8.0_292]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_292]
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_292]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
> 2022-11-09 03:00:07,510 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Stopping checkpoint coordinator for job 0f70cbc56798e19978b509bf0da0107b.
> 2022-11-09 03:00:07,517 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 0f70cbc56798e19978b509bf0da0107b reached terminal state FAILED.
> {noformat}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)