You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/10/26 21:36:53 UTC

[7/9] flink git commit: [FLINK-4892] Change TestHarness.snapshot() to return OperatorStateHandles

[FLINK-4892] Change TestHarness.snapshot() to return OperatorStateHandles

This makes it symmetric with initializeState()


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d0c6842a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d0c6842a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d0c6842a

Branch: refs/heads/master
Commit: d0c6842ae6fe31fb60b00314de3bafdcfd92957e
Parents: fe1654c
Author: kl0u <kk...@gmail.com>
Authored: Thu Oct 20 16:32:51 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Oct 26 23:26:28 2016 +0200

----------------------------------------------------------------------
 .../StreamOperatorSnapshotRestoreTest.java      | 22 +++-----------------
 .../util/AbstractStreamOperatorTestHarness.java | 22 ++++++++++++++++++--
 2 files changed, 23 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d0c6842a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
index cc29172..997ceb7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
@@ -76,17 +76,7 @@ public class StreamOperatorSnapshotRestoreTest {
 			testHarness.processElement(new StreamRecord<>(i));
 		}
 
-		OperatorSnapshotResult snapshotInProgress = testHarness.snapshot(1L, 1L);
-
-		KeyGroupsStateHandle keyedManaged =
-				FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getKeyedStateManagedFuture());
-		KeyGroupsStateHandle keyedRaw =
-				FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getKeyedStateRawFuture());
-
-		OperatorStateHandle opManaged =
-				FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture());
-		OperatorStateHandle opRaw =
-				FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateRawFuture());
+		OperatorStateHandles handles = testHarness.snapshot(1L, 1L);
 
 		testHarness.close();
 
@@ -104,13 +94,7 @@ public class StreamOperatorSnapshotRestoreTest {
 				TypeInformation.of(Integer.class),
 				MAX_PARALLELISM);
 
-		testHarness.initializeState(new OperatorStateHandles(
-				0,
-				null,
-				Collections.singletonList(keyedManaged),
-				Collections.singletonList(keyedRaw),
-				Collections.singletonList(opManaged),
-				Collections.singletonList(opRaw)));
+		testHarness.initializeState(handles);
 
 		testHarness.open();
 
@@ -221,4 +205,4 @@ public class StreamOperatorSnapshotRestoreTest {
 		}
 	}
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0c6842a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 1124fa9..7189ce0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.ClosableRegistry;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
@@ -47,10 +48,12 @@ import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.util.FutureUtil;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -252,13 +255,28 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 	/**
 	 * Calls {@link StreamOperator#snapshotState(long, long, CheckpointStreamFactory)}.
 	 */
-	public OperatorSnapshotResult snapshot(long checkpointId, long timestamp) throws Exception {
+	public OperatorStateHandles snapshot(long checkpointId, long timestamp) throws Exception {
 
 		CheckpointStreamFactory streamFactory = stateBackend.createStreamFactory(
 				new JobID(),
 				"test_op");
 
-		return operator.snapshotState(checkpointId, timestamp, streamFactory);
+		OperatorSnapshotResult operatorStateResult = operator.snapshotState(checkpointId, timestamp, streamFactory);
+
+		KeyGroupsStateHandle keyedManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateManagedFuture());
+		KeyGroupsStateHandle keyedRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateRawFuture());
+
+		OperatorStateHandle opManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateManagedFuture());
+		OperatorStateHandle opRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateRawFuture());
+
+		OperatorStateHandles handles = new OperatorStateHandles(
+			0,
+			null,
+			Collections.singletonList(keyedManaged),
+			Collections.singletonList(keyedRaw),
+			Collections.singletonList(opManaged),
+			Collections.singletonList(opRaw));
+		return handles;
 	}
 
 	/**