You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/10/26 21:36:53 UTC
[7/9] flink git commit: [FLINK-4892] Change TestHarness.snapshot() to
return OperatorStateHandles
[FLINK-4892] Change TestHarness.snapshot() to return OperatorStateHandles
This makes it symmetric with initializeState()
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d0c6842a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d0c6842a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d0c6842a
Branch: refs/heads/master
Commit: d0c6842ae6fe31fb60b00314de3bafdcfd92957e
Parents: fe1654c
Author: kl0u <kk...@gmail.com>
Authored: Thu Oct 20 16:32:51 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Oct 26 23:26:28 2016 +0200
----------------------------------------------------------------------
.../StreamOperatorSnapshotRestoreTest.java | 22 +++-----------------
.../util/AbstractStreamOperatorTestHarness.java | 22 ++++++++++++++++++--
2 files changed, 23 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d0c6842a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
index cc29172..997ceb7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
@@ -76,17 +76,7 @@ public class StreamOperatorSnapshotRestoreTest {
testHarness.processElement(new StreamRecord<>(i));
}
- OperatorSnapshotResult snapshotInProgress = testHarness.snapshot(1L, 1L);
-
- KeyGroupsStateHandle keyedManaged =
- FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getKeyedStateManagedFuture());
- KeyGroupsStateHandle keyedRaw =
- FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getKeyedStateRawFuture());
-
- OperatorStateHandle opManaged =
- FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture());
- OperatorStateHandle opRaw =
- FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateRawFuture());
+ OperatorStateHandles handles = testHarness.snapshot(1L, 1L);
testHarness.close();
@@ -104,13 +94,7 @@ public class StreamOperatorSnapshotRestoreTest {
TypeInformation.of(Integer.class),
MAX_PARALLELISM);
- testHarness.initializeState(new OperatorStateHandles(
- 0,
- null,
- Collections.singletonList(keyedManaged),
- Collections.singletonList(keyedRaw),
- Collections.singletonList(opManaged),
- Collections.singletonList(opRaw)));
+ testHarness.initializeState(handles);
testHarness.open();
@@ -221,4 +205,4 @@ public class StreamOperatorSnapshotRestoreTest {
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d0c6842a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 1124fa9..7189ce0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.ClosableRegistry;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
@@ -47,10 +48,12 @@ import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.util.FutureUtil;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.Collection;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -252,13 +255,28 @@ public class AbstractStreamOperatorTestHarness<OUT> {
/**
* Calls {@link StreamOperator#snapshotState(long, long, CheckpointStreamFactory)}.
*/
- public OperatorSnapshotResult snapshot(long checkpointId, long timestamp) throws Exception {
+ public OperatorStateHandles snapshot(long checkpointId, long timestamp) throws Exception {
CheckpointStreamFactory streamFactory = stateBackend.createStreamFactory(
new JobID(),
"test_op");
- return operator.snapshotState(checkpointId, timestamp, streamFactory);
+ OperatorSnapshotResult operatorStateResult = operator.snapshotState(checkpointId, timestamp, streamFactory);
+
+ KeyGroupsStateHandle keyedManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateManagedFuture());
+ KeyGroupsStateHandle keyedRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateRawFuture());
+
+ OperatorStateHandle opManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateManagedFuture());
+ OperatorStateHandle opRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateRawFuture());
+
+ OperatorStateHandles handles = new OperatorStateHandles(
+ 0,
+ null,
+ Collections.singletonList(keyedManaged),
+ Collections.singletonList(keyedRaw),
+ Collections.singletonList(opManaged),
+ Collections.singletonList(opRaw));
+ return handles;
}
/**