You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/15 06:33:40 UTC
[flink] branch master updated: [FLINK-13253][jdbc] Deadlock may
occur in JDBCUpsertOutputFormat (#9107)
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 30587cb [FLINK-13253][jdbc] Deadlock may occur in JDBCUpsertOutputFormat (#9107)
30587cb is described below
commit 30587cbc68f881cc7cc43c83b3551902fd05c9c9
Author: Jingsong Lee <lz...@aliyun.com>
AuthorDate: Mon Jul 15 14:33:11 2019 +0800
[FLINK-13253][jdbc] Deadlock may occur in JDBCUpsertOutputFormat (#9107)
---
.../api/java/io/jdbc/JDBCUpsertOutputFormat.java | 25 ++++++++--------------
1 file changed, 9 insertions(+), 16 deletions(-)
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertOutputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertOutputFormat.java
index f377995..b3b4e03 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertOutputFormat.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertOutputFormat.java
@@ -116,13 +116,15 @@ public class JDBCUpsertOutputFormat extends AbstractJDBCOutputFormat<Tuple2<Bool
this.scheduler = Executors.newScheduledThreadPool(
1, new ExecutorThreadFactory("jdbc-upsert-output-format"));
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
- if (closed) {
- return;
- }
- try {
- flush();
- } catch (Exception e) {
- flushException = e;
+ synchronized (JDBCUpsertOutputFormat.this) {
+ if (closed) {
+ return;
+ }
+ try {
+ flush();
+ } catch (Exception e) {
+ flushException = e;
+ }
}
}, flushIntervalMills, flushIntervalMills, TimeUnit.MILLISECONDS);
}
@@ -184,15 +186,6 @@ public class JDBCUpsertOutputFormat extends AbstractJDBCOutputFormat<Tuple2<Bool
if (this.scheduledFuture != null) {
scheduledFuture.cancel(false);
this.scheduler.shutdown();
-
- try {
- if (!scheduler.awaitTermination(10, TimeUnit.MINUTES)) {
- throw new RuntimeException(
- "The scheduled executor service can not properly terminate.");
- }
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
}
if (batchCount > 0) {