You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/16 18:08:45 UTC

[18/24] flink git commit: [FLINK-2846] [streaming] Emit downstream checkpoint barriers at beginning of the checkpoint scope

[FLINK-2846] [streaming] Emit downstream checkpoint barriers at beginning of the checkpoint scope


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/da159ef5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/da159ef5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/da159ef5

Branch: refs/heads/master
Commit: da159ef563e934cda6f802aa5535986356760f35
Parents: ca8c73d
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Oct 9 21:40:28 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 16 15:26:11 2015 +0200

----------------------------------------------------------------------
 .../flink/streaming/runtime/tasks/StreamTask.java     | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/da159ef5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index b53d9c4..b607433 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -422,6 +422,13 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 		
 		synchronized (lock) {
 			if (isRunning) {
+
+				// since both state checkpointing and downstream barrier emission occurs in this
+				// lock scope, they are an atomic operation regardless of the order in which they occur
+				// we immediately emit the checkpoint barriers, so the downstream operators can start
+				// their checkpoint work as soon as possible
+				operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp);
+				
 				// now draw the state snapshot
 				try {
 					final StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
@@ -436,13 +443,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 					}
 
 					StreamTaskStateList allStates = new StreamTaskStateList(states);
-
-					// since both state checkpointing and downstream barrier emission occurs in this
-					// lock scope, they are an atomic operation regardless of the order in which they occur
-					// we immediately emit the checkpoint barriers, so the downstream operators can start
-					// their checkpoint work as soon as possible
-					operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp);
-					
 					if (allStates.isEmpty()) {
 						getEnvironment().acknowledgeCheckpoint(checkpointId);
 					} else {