You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/01/12 18:24:34 UTC

[4/5] flink git commit: [FLINK-5407] Handle snapshoting null-operator in chain

[FLINK-5407] Handle snapshoting null-operator in chain


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/81eaafac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/81eaafac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/81eaafac

Branch: refs/heads/release-1.2
Commit: 81eaafac70a9ec543ae2e81b6dd006d80c137fa5
Parents: 68a2520
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu Jan 5 14:28:50 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Jan 12 17:51:02 2017 +0100

----------------------------------------------------------------------
 .../streaming/runtime/tasks/StreamTask.java     | 45 ++++++++++++++------
 1 file changed, 33 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/81eaafac/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 3bbc53b..530401b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -901,8 +901,15 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				List<OperatorStateHandle> operatorStatesStream = new ArrayList<>(snapshotInProgressList.size());
 
 				for (OperatorSnapshotResult snapshotInProgress : snapshotInProgressList) {
-					operatorStatesBackend.add(FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture()));
-					operatorStatesStream.add(FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateRawFuture()));
+					if (null != snapshotInProgress) {
+						operatorStatesBackend.add(
+								FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture()));
+						operatorStatesStream.add(
+								FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateRawFuture()));
+					} else {
+						operatorStatesBackend.add(null);
+						operatorStatesStream.add(null);
+					}
 				}
 
 				final long asyncEndNanos = System.nanoTime();
@@ -949,7 +956,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		public void close() {
 			// cleanup/release ongoing snapshot operations
 			for (OperatorSnapshotResult snapshotResult : snapshotInProgressList) {
-				snapshotResult.cancel();
+				if (null != snapshotResult) {
+					snapshotResult.cancel();
+				}
 			}
 		}
 	}
@@ -994,14 +1003,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			try {
 
 				for (StreamOperator<?> op : allOperators) {
-
-					createStreamFactory(op);
-					snapshotNonPartitionableState(op);
-
-					OperatorSnapshotResult snapshotInProgress =
-							op.snapshotState(checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), streamFactory);
-
-					snapshotInProgressList.add(snapshotInProgress);
+					checkpointStreamOperator(op);
 				}
 
 				if (LOG.isDebugEnabled()) {
@@ -1028,7 +1030,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				if (failed) {
 					// Cleanup to release resources
 					for (OperatorSnapshotResult operatorSnapshotResult : snapshotInProgressList) {
-						operatorSnapshotResult.cancel();
+						if (null != operatorSnapshotResult) {
+							operatorSnapshotResult.cancel();
+						}
 					}
 
 					if (LOG.isDebugEnabled()) {
@@ -1038,7 +1042,24 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 					}
 				}
 			}
+		}
+
+		private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
+			if (null != op) {
+				createStreamFactory(op);
+				snapshotNonPartitionableState(op);
+
+				OperatorSnapshotResult snapshotInProgress = op.snapshotState(
+						checkpointMetaData.getCheckpointId(),
+						checkpointMetaData.getTimestamp(),
+						streamFactory);
 
+				snapshotInProgressList.add(snapshotInProgress);
+			} else {
+				nonPartitionedStates.add(null);
+				OperatorSnapshotResult emptySnapshotInProgress = new OperatorSnapshotResult();
+				snapshotInProgressList.add(emptySnapshotInProgress);
+			}
 		}
 
 		private void createStreamFactory(StreamOperator<?> operator) throws IOException {