You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/05/03 14:28:18 UTC

[12/13] flink git commit: [FLINK-5969] Also snapshot legacy state in operator test harness

[FLINK-5969] Also snapshot legacy state in operator test harness


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84eea722
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84eea722
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84eea722

Branch: refs/heads/master
Commit: 84eea72295eda5e7289deb5221c7b990b7b65883
Parents: e40f2e1
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Apr 24 17:13:49 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed May 3 16:25:57 2017 +0200

----------------------------------------------------------------------
 .../util/AbstractStreamOperatorTestHarness.java      | 15 +++++++++++++--
 1 file changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/84eea722/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 4a9463a..7a8488f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -509,9 +509,21 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 		OperatorStateHandle opManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateManagedFuture());
 		OperatorStateHandle opRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateRawFuture());
 
+		// also snapshot legacy state, if any
+		StreamStateHandle legacyStateHandle = null;
+
+		if (operator instanceof StreamCheckpointedOperator) {
+
+			final CheckpointStreamFactory.CheckpointStateOutputStream outStream =
+					streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
+
+				((StreamCheckpointedOperator) operator).snapshotState(outStream, checkpointId, timestamp);
+				legacyStateHandle = outStream.closeAndGetHandle();
+		}
+
 		return new OperatorStateHandles(
 			0,
-			null,
+			legacyStateHandle,
 			keyedManaged != null ? Collections.singletonList(keyedManaged) : null,
 			keyedRaw != null ? Collections.singletonList(keyedRaw) : null,
 			opManaged != null ? Collections.singletonList(opManaged) : null,
@@ -523,7 +535,6 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 	 * the operator implements this interface.
 	 */
 	@Deprecated
-	@SuppressWarnings("deprecation")
 	public StreamStateHandle snapshotLegacy(long checkpointId, long timestamp) throws Exception {
 
 		CheckpointStreamFactory.CheckpointStateOutputStream outStream = stateBackend.createStreamFactory(