You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/01/12 16:50:27 UTC
[2/2] flink git commit: [FLINK-5407] Handle snapshoting null-operator
in chain
[FLINK-5407] Handle snapshoting null-operator in chain
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9c6eb579
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9c6eb579
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9c6eb579
Branch: refs/heads/master
Commit: 9c6eb5793258de15a83f4cf7b13180d370062531
Parents: fc343e0
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu Jan 5 14:28:50 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Jan 12 17:40:32 2017 +0100
----------------------------------------------------------------------
.../streaming/runtime/tasks/StreamTask.java | 45 ++++++++++++++------
1 file changed, 33 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9c6eb579/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 1c20393..265cb5c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -902,8 +902,15 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
List<OperatorStateHandle> operatorStatesStream = new ArrayList<>(snapshotInProgressList.size());
for (OperatorSnapshotResult snapshotInProgress : snapshotInProgressList) {
- operatorStatesBackend.add(FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture()));
- operatorStatesStream.add(FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateRawFuture()));
+ if (null != snapshotInProgress) {
+ operatorStatesBackend.add(
+ FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture()));
+ operatorStatesStream.add(
+ FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateRawFuture()));
+ } else {
+ operatorStatesBackend.add(null);
+ operatorStatesStream.add(null);
+ }
}
final long asyncEndNanos = System.nanoTime();
@@ -950,7 +957,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
public void close() {
// cleanup/release ongoing snapshot operations
for (OperatorSnapshotResult snapshotResult : snapshotInProgressList) {
- snapshotResult.cancel();
+ if (null != snapshotResult) {
+ snapshotResult.cancel();
+ }
}
}
}
@@ -995,14 +1004,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
try {
for (StreamOperator<?> op : allOperators) {
-
- createStreamFactory(op);
- snapshotNonPartitionableState(op);
-
- OperatorSnapshotResult snapshotInProgress =
- op.snapshotState(checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), streamFactory);
-
- snapshotInProgressList.add(snapshotInProgress);
+ checkpointStreamOperator(op);
}
if (LOG.isDebugEnabled()) {
@@ -1029,7 +1031,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
if (failed) {
// Cleanup to release resources
for (OperatorSnapshotResult operatorSnapshotResult : snapshotInProgressList) {
- operatorSnapshotResult.cancel();
+ if (null != operatorSnapshotResult) {
+ operatorSnapshotResult.cancel();
+ }
}
if (LOG.isDebugEnabled()) {
@@ -1039,7 +1043,24 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
}
}
+ }
+
+ private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
+ if (null != op) {
+ createStreamFactory(op);
+ snapshotNonPartitionableState(op);
+
+ OperatorSnapshotResult snapshotInProgress = op.snapshotState(
+ checkpointMetaData.getCheckpointId(),
+ checkpointMetaData.getTimestamp(),
+ streamFactory);
+ snapshotInProgressList.add(snapshotInProgress);
+ } else {
+ nonPartitionedStates.add(null);
+ OperatorSnapshotResult emptySnapshotInProgress = new OperatorSnapshotResult();
+ snapshotInProgressList.add(emptySnapshotInProgress);
+ }
}
private void createStreamFactory(StreamOperator<?> operator) throws IOException {