You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/05/03 14:28:18 UTC
[12/13] flink git commit: [FLINK-5969] Also snapshot legacy state in
operator test harness
[FLINK-5969] Also snapshot legacy state in operator test harness
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84eea722
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84eea722
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84eea722
Branch: refs/heads/master
Commit: 84eea72295eda5e7289deb5221c7b990b7b65883
Parents: e40f2e1
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Apr 24 17:13:49 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed May 3 16:25:57 2017 +0200
----------------------------------------------------------------------
.../util/AbstractStreamOperatorTestHarness.java | 15 +++++++++++++--
1 file changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/84eea722/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 4a9463a..7a8488f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -509,9 +509,21 @@ public class AbstractStreamOperatorTestHarness<OUT> {
OperatorStateHandle opManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateManagedFuture());
OperatorStateHandle opRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateRawFuture());
+ // also snapshot legacy state, if any
+ StreamStateHandle legacyStateHandle = null;
+
+ if (operator instanceof StreamCheckpointedOperator) {
+
+ final CheckpointStreamFactory.CheckpointStateOutputStream outStream =
+ streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
+
+ ((StreamCheckpointedOperator) operator).snapshotState(outStream, checkpointId, timestamp);
+ legacyStateHandle = outStream.closeAndGetHandle();
+ }
+
return new OperatorStateHandles(
0,
- null,
+ legacyStateHandle,
keyedManaged != null ? Collections.singletonList(keyedManaged) : null,
keyedRaw != null ? Collections.singletonList(keyedRaw) : null,
opManaged != null ? Collections.singletonList(opManaged) : null,
@@ -523,7 +535,6 @@ public class AbstractStreamOperatorTestHarness<OUT> {
* the operator implements this interface.
*/
@Deprecated
- @SuppressWarnings("deprecation")
public StreamStateHandle snapshotLegacy(long checkpointId, long timestamp) throws Exception {
CheckpointStreamFactory.CheckpointStateOutputStream outStream = stateBackend.createStreamFactory(