You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/14 14:48:53 UTC
[3/3] flink git commit: [FLINK-5762] [runtime] Protect
initializeState() and open() by the same lock
[FLINK-5762] [runtime] Protect initializeState() and open() by the same lock
This closes #3291
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a91b6ff0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a91b6ff0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a91b6ff0
Branch: refs/heads/master
Commit: a91b6ff05d8af870ad076f9bf0fc17886787bc46
Parents: 663c1e3
Author: kl0u <kk...@gmail.com>
Authored: Thu Feb 9 16:02:27 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Feb 14 15:32:43 2017 +0100
----------------------------------------------------------------------
.../apache/flink/streaming/runtime/tasks/StreamTask.java | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a91b6ff0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 2676b64..3781cb6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -244,12 +244,15 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
// -------- Invoke --------
LOG.debug("Invoking {}", getName());
- // first order of business is to give operators their state
- initializeState();
-
// we need to make sure that any triggers scheduled in open() cannot be
// executed before all operators are opened
synchronized (lock) {
+
+ // both the following operations are protected by the lock
+ // so that we avoid race conditions in the case that initializeState()
+ // registers a timer, that fires before the open() is called.
+
+ initializeState();
openAllOperators();
}