You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/02 17:14:34 UTC
[6/8] flink git commit: [hotfix] [streaming] Fix race in stream tasks
when canceling tasks early.
[hotfix] [streaming] Fix race in stream tasks when canceling tasks early.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40eef52e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40eef52e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40eef52e
Branch: refs/heads/master
Commit: 40eef52e9f68c3c7727e9b9493959d5fd36d7f70
Parents: af88aa0
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Jul 30 21:09:57 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Aug 2 15:58:28 2015 +0200
----------------------------------------------------------------------
.../flink/streaming/runtime/tasks/OneInputStreamTask.java | 2 --
.../flink/streaming/runtime/tasks/SourceStreamTask.java | 2 --
.../flink/streaming/runtime/tasks/StreamIterationHead.java | 1 -
.../org/apache/flink/streaming/runtime/tasks/StreamTask.java | 7 +++++--
.../flink/streaming/runtime/tasks/TwoInputStreamTask.java | 2 --
5 files changed, 5 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/40eef52e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 605b8f5..6136f24 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -61,8 +61,6 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
@Override
public void invoke() throws Exception {
- this.isRunning = true;
-
boolean operatorOpen = false;
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/40eef52e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 1940c11..4b25577 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -45,8 +45,6 @@ public class SourceStreamTask<OUT> extends StreamTask<OUT, StreamSource<OUT>> {
public void invoke() throws Exception {
final SourceOutput<StreamRecord<OUT>> output = new SourceOutput<StreamRecord<OUT>>(outputHandler.getOutput(), checkpointLock);
- this.isRunning = true;
-
boolean operatorOpen = false;
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/40eef52e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
index 1736e52..2911f44 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
@@ -72,7 +72,6 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
@SuppressWarnings("unchecked")
@Override
public void invoke() throws Exception {
- isRunning = true;
if (LOG.isDebugEnabled()) {
LOG.debug("Iteration source {} invoked", getName());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/40eef52e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index aabc95d..88813d0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -66,7 +66,8 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
protected boolean hasChainedOperators;
- protected volatile boolean isRunning = false;
+ // needs to be initialized to true, so that early cancel() before invoke() behaves correctly
+ protected volatile boolean isRunning = true;
protected List<StreamingRuntimeContext> contexts;
@@ -229,10 +230,12 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
@Override
public void triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
+ LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName());
+
synchronized (checkpointLock) {
if (isRunning) {
try {
- LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName());
+
// We wrap the states of the chained operators in a list, marking non-stateful oeprators with null
List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>> chainedStates = new ArrayList<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>>();
http://git-wip-us.apache.org/repos/asf/flink/blob/40eef52e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 99c053b..8cf5a40 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -85,8 +85,6 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
@Override
public void invoke() throws Exception {
- this.isRunning = true;
-
boolean operatorOpen = false;
if (LOG.isDebugEnabled()) {