You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/02 17:14:34 UTC

[6/8] flink git commit: [hotfix] [streaming] Fix race in stream tasks when canceling tasks early.

[hotfix] [streaming] Fix race in stream tasks when canceling tasks early.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40eef52e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40eef52e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40eef52e

Branch: refs/heads/master
Commit: 40eef52e9f68c3c7727e9b9493959d5fd36d7f70
Parents: af88aa0
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Jul 30 21:09:57 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Aug 2 15:58:28 2015 +0200

----------------------------------------------------------------------
 .../flink/streaming/runtime/tasks/OneInputStreamTask.java     | 2 --
 .../flink/streaming/runtime/tasks/SourceStreamTask.java       | 2 --
 .../flink/streaming/runtime/tasks/StreamIterationHead.java    | 1 -
 .../org/apache/flink/streaming/runtime/tasks/StreamTask.java  | 7 +++++--
 .../flink/streaming/runtime/tasks/TwoInputStreamTask.java     | 2 --
 5 files changed, 5 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/40eef52e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 605b8f5..6136f24 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -61,8 +61,6 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 
 	@Override
 	public void invoke() throws Exception {
-		this.isRunning = true;
-
 		boolean operatorOpen = false;
 
 		if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/40eef52e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 1940c11..4b25577 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -45,8 +45,6 @@ public class SourceStreamTask<OUT> extends StreamTask<OUT, StreamSource<OUT>> {
 	public void invoke() throws Exception {
 		final SourceOutput<StreamRecord<OUT>> output = new SourceOutput<StreamRecord<OUT>>(outputHandler.getOutput(), checkpointLock);
 
-		this.isRunning = true;
-
 		boolean operatorOpen = false;
 
 		if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/40eef52e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
index 1736e52..2911f44 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
@@ -72,7 +72,6 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
 	@SuppressWarnings("unchecked")
 	@Override
 	public void invoke() throws Exception {
-		isRunning = true;
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Iteration source {} invoked", getName());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/40eef52e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index aabc95d..88813d0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -66,7 +66,8 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 
 	protected boolean hasChainedOperators;
 
-	protected volatile boolean isRunning = false;
+	// needs to be initialized to true, so that early cancel() before invoke() behaves correctly
+	protected volatile boolean isRunning = true;
 
 	protected List<StreamingRuntimeContext> contexts;
 
@@ -229,10 +230,12 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 	@Override
 	public void triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
 
+		LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName());
+		
 		synchronized (checkpointLock) {
 			if (isRunning) {
 				try {
-					LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName());
+					
 
 					// We wrap the states of the chained operators in a list, marking non-stateful oeprators with null
 					List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>> chainedStates = new ArrayList<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>>();

http://git-wip-us.apache.org/repos/asf/flink/blob/40eef52e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 99c053b..8cf5a40 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -85,8 +85,6 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
 
 	@Override
 	public void invoke() throws Exception {
-		this.isRunning = true;
-
 		boolean operatorOpen = false;
 
 		if (LOG.isDebugEnabled()) {