You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/13 10:26:16 UTC
flink git commit: [FLINK-5467] Avoid legacy state for
CheckpointedRestoring operators
Repository: flink
Updated Branches:
refs/heads/master 46423b9c7 -> 51a357351
[FLINK-5467] Avoid legacy state for CheckpointedRestoring operators
This closes #3102.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51a35735
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51a35735
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51a35735
Branch: refs/heads/master
Commit: 51a357351b955844941edd9a9b1406cdc787b18a
Parents: 46423b9
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu Jan 12 12:24:34 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri Jan 13 11:20:53 2017 +0100
----------------------------------------------------------------------
.../streaming/api/operators/AbstractUdfStreamOperator.java | 6 +++---
.../org/apache/flink/test/checkpointing/RescalingITCase.java | 8 +++++++-
2 files changed, 10 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/51a35735/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 81f709b..15e26c9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -195,14 +195,13 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
} catch (Exception e) {
throw new Exception("Failed to draw state snapshot from function: " + e.getMessage(), e);
}
- } else if (userFunction instanceof CheckpointedRestoring) {
- out.write(0);
}
}
@Override
public void restoreState(FSDataInputStream in) throws Exception {
- if (userFunction instanceof CheckpointedRestoring) {
+ if (userFunction instanceof Checkpointed ||
+ (userFunction instanceof CheckpointedRestoring && in instanceof Migration)) {
@SuppressWarnings("unchecked")
CheckpointedRestoring<Serializable> chkFunction = (CheckpointedRestoring<Serializable>) userFunction;
@@ -219,6 +218,7 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
}
}
} else if (in instanceof Migration) {
+ // absorb the introduced byte from the migration stream without too much further consequences
int hasUdfState = in.read();
if (hasUdfState == 1) {
throw new Exception("Found UDF state but operator is not instance of CheckpointedRestoring");
http://git-wip-us.apache.org/repos/asf/flink/blob/51a35735/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index bc65abf..da4a01b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -903,7 +904,7 @@ public class RescalingITCase extends TestLogger {
}
}
- private static class PartitionedStateSource extends StateSourceBase implements CheckpointedFunction {
+ private static class PartitionedStateSource extends StateSourceBase implements CheckpointedFunction, CheckpointedRestoring<Integer> {
private static final long serialVersionUID = -359715965103593462L;
private static final int NUM_PARTITIONS = 7;
@@ -945,5 +946,10 @@ public class RescalingITCase extends TestLogger {
CHECK_CORRECT_RESTORE[getRuntimeContext().getIndexOfThisSubtask()] = counter;
}
}
+
+ @Override
+ public void restoreState(Integer state) throws Exception {
+ counterPartitions.add(state);
+ }
}
}