You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/15 06:33:40 UTC

[flink] branch master updated: [FLINK-13253][jdbc] Deadlock may occur in JDBCUpsertOutputFormat (#9107)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 30587cb  [FLINK-13253][jdbc] Deadlock may occur in JDBCUpsertOutputFormat (#9107)
30587cb is described below

commit 30587cbc68f881cc7cc43c83b3551902fd05c9c9
Author: Jingsong Lee <lz...@aliyun.com>
AuthorDate: Mon Jul 15 14:33:11 2019 +0800

    [FLINK-13253][jdbc] Deadlock may occur in JDBCUpsertOutputFormat (#9107)
---
 .../api/java/io/jdbc/JDBCUpsertOutputFormat.java   | 25 ++++++++--------------
 1 file changed, 9 insertions(+), 16 deletions(-)

diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertOutputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertOutputFormat.java
index f377995..b3b4e03 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertOutputFormat.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertOutputFormat.java
@@ -116,13 +116,15 @@ public class JDBCUpsertOutputFormat extends AbstractJDBCOutputFormat<Tuple2<Bool
 			this.scheduler = Executors.newScheduledThreadPool(
 					1, new ExecutorThreadFactory("jdbc-upsert-output-format"));
 			this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
-				if (closed) {
-					return;
-				}
-				try {
-					flush();
-				} catch (Exception e) {
-					flushException = e;
+				synchronized (JDBCUpsertOutputFormat.this) {
+					if (closed) {
+						return;
+					}
+					try {
+						flush();
+					} catch (Exception e) {
+						flushException = e;
+					}
 				}
 			}, flushIntervalMills, flushIntervalMills, TimeUnit.MILLISECONDS);
 		}
@@ -184,15 +186,6 @@ public class JDBCUpsertOutputFormat extends AbstractJDBCOutputFormat<Tuple2<Bool
 		if (this.scheduledFuture != null) {
 			scheduledFuture.cancel(false);
 			this.scheduler.shutdown();
-
-			try {
-				if (!scheduler.awaitTermination(10, TimeUnit.MINUTES)) {
-					throw new RuntimeException(
-							"The scheduled executor service can not properly terminate.");
-				}
-			} catch (InterruptedException e) {
-				throw new RuntimeException(e);
-			}
 		}
 
 		if (batchCount > 0) {