You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/16 18:08:45 UTC
[18/24] flink git commit: [FLINK-2846] [streaming] Emit downstream
checkpoint barriers at beginning of the checkpoint scope
[FLINK-2846] [streaming] Emit downstream checkpoint barriers at beginning of the checkpoint scope
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/da159ef5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/da159ef5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/da159ef5
Branch: refs/heads/master
Commit: da159ef563e934cda6f802aa5535986356760f35
Parents: ca8c73d
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Oct 9 21:40:28 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 16 15:26:11 2015 +0200
----------------------------------------------------------------------
.../flink/streaming/runtime/tasks/StreamTask.java | 14 +++++++-------
1 file changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/da159ef5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index b53d9c4..b607433 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -422,6 +422,13 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
synchronized (lock) {
if (isRunning) {
+
+ // since both state checkpointing and downstream barrier emission occurs in this
+ // lock scope, they are an atomic operation regardless of the order in which they occur
+ // we immediately emit the checkpoint barriers, so the downstream operators can start
+ // their checkpoint work as soon as possible
+ operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp);
+
// now draw the state snapshot
try {
final StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
@@ -436,13 +443,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
}
StreamTaskStateList allStates = new StreamTaskStateList(states);
-
- // since both state checkpointing and downstream barrier emission occurs in this
- // lock scope, they are an atomic operation regardless of the order in which they occur
- // we immediately emit the checkpoint barriers, so the downstream operators can start
- // their checkpoint work as soon as possible
- operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp);
-
if (allStates.isEmpty()) {
getEnvironment().acknowledgeCheckpoint(checkpointId);
} else {