You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/14 14:48:53 UTC

[3/3] flink git commit: [FLINK-5762] [runtime] Protect initializeState() and open() by the same lock

[FLINK-5762] [runtime] Protect initializeState() and open() by the same lock

This closes #3291


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a91b6ff0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a91b6ff0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a91b6ff0

Branch: refs/heads/master
Commit: a91b6ff05d8af870ad076f9bf0fc17886787bc46
Parents: 663c1e3
Author: kl0u <kk...@gmail.com>
Authored: Thu Feb 9 16:02:27 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Feb 14 15:32:43 2017 +0100

----------------------------------------------------------------------
 .../apache/flink/streaming/runtime/tasks/StreamTask.java    | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a91b6ff0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 2676b64..3781cb6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -244,12 +244,15 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			// -------- Invoke --------
 			LOG.debug("Invoking {}", getName());
 
-			// first order of business is to give operators their state
-			initializeState();
-
 			// we need to make sure that any triggers scheduled in open() cannot be
 			// executed before all operators are opened
 			synchronized (lock) {
+
+				// both the following operations are protected by the lock
+				// so that we avoid race conditions in the case that initializeState()
+				// registers a timer, that fires before the open() is called.
+
+				initializeState();
 				openAllOperators();
 			}