You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/25 14:26:27 UTC

[2/3] flink git commit: [FLINK-7949] Make AsyncWaitOperator recoverable also when queue is full

[FLINK-7949] Make AsyncWaitOperator recoverable also when queue is full

Start emitter thread BEFORE filling up the queue of recovered elements.
This guarantees that we won't deadlock inserting the recovered elements,
because the emitter can already start processing elements.

This closes #4924.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a2198b04
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a2198b04
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a2198b04

Branch: refs/heads/release-1.4
Commit: a2198b04712b8ec6105999414f33781c6efcf4a9
Parents: 38278eb
Author: Bartłomiej Tartanus <ba...@gmail.com>
Authored: Mon Oct 30 15:39:43 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jan 25 15:26:09 2018 +0100

----------------------------------------------------------------------
 .../api/operators/async/AsyncWaitOperator.java      | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a2198b04/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index aec20c0..a7b9438 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -163,6 +163,14 @@ public class AsyncWaitOperator<IN, OUT>
 	public void open() throws Exception {
 		super.open();
 
+		// create the emitter
+		this.emitter = new Emitter<>(checkpointingLock, output, queue, this);
+
+		// start the emitter thread
+		this.emitterThread = new Thread(emitter, "AsyncIO-Emitter-Thread (" + getOperatorName() + ')');
+		emitterThread.setDaemon(true);
+		emitterThread.start();
+
 		// process stream elements from state, since the Emit thread will start as soon as all
 		// elements from previous state are in the StreamElementQueue, we have to make sure that the
 		// order to open all operators in the operator chain proceeds from the tail operator to the
@@ -186,14 +194,6 @@ public class AsyncWaitOperator<IN, OUT>
 			recoveredStreamElements = null;
 		}
 
-		// create the emitter
-		this.emitter = new Emitter<>(checkpointingLock, output, queue, this);
-
-		// start the emitter thread
-		this.emitterThread = new Thread(emitter, "AsyncIO-Emitter-Thread (" + getOperatorName() + ')');
-		emitterThread.setDaemon(true);
-		emitterThread.start();
-
 	}
 
 	@Override