You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/05/14 15:53:00 UTC

[2/2] flink git commit: [hotfix][tests] Fix minor mocking issues in AbstractStreamOperatorTest

[hotfix][tests] Fix minor mocking issues in AbstractStreamOperatorTest

(cherry picked from commit bcd028d)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2d34b903
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2d34b903
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2d34b903

Branch: refs/heads/release-1.5
Commit: 2d34b9030199db0fae84964b2c04b84083d515ef
Parents: b907af2
Author: Stefan Richter <s....@data-artisans.com>
Authored: Mon May 14 15:09:50 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Mon May 14 17:52:28 2018 +0200

----------------------------------------------------------------------
 .../streaming/api/operators/AbstractStreamOperatorTest.java   | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2d34b903/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 904ff64..f0195a2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -495,7 +495,7 @@ public class AbstractStreamOperatorTest {
 
 		final CloseableRegistry closeableRegistry = new CloseableRegistry();
 
-		StateSnapshotContextSynchronousImpl context = mock(StateSnapshotContextSynchronousImpl.class);
+		StateSnapshotContextSynchronousImpl context = spy(new StateSnapshotContextSynchronousImpl(0L, 0L));
 
 		whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context);
 
@@ -573,7 +573,7 @@ public class AbstractStreamOperatorTest {
 		RunnableFuture<SnapshotResult<KeyedStateHandle>> futureKeyedStateHandle = mock(RunnableFuture.class);
 		RunnableFuture<SnapshotResult<OperatorStateHandle>> futureOperatorStateHandle = mock(RunnableFuture.class);
 
-		StateSnapshotContextSynchronousImpl context = mock(StateSnapshotContextSynchronousImpl.class);
+		StateSnapshotContextSynchronousImpl context = spy(new StateSnapshotContextSynchronousImpl(checkpointId, timestamp));
 		when(context.getKeyedStateStreamFuture()).thenReturn(futureKeyedStateHandle);
 		when(context.getOperatorStateStreamFuture()).thenReturn(futureOperatorStateHandle);
 
@@ -582,7 +582,6 @@ public class AbstractStreamOperatorTest {
 		whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context);
 		whenNew(OperatorSnapshotFutures.class).withAnyArguments().thenReturn(operatorSnapshotResult);
 
-		CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
 		StreamTask<Void, AbstractStreamOperator<Void>> containingTask = mock(StreamTask.class);
 		when(containingTask.getCancelables()).thenReturn(closeableRegistry);
 
@@ -600,7 +599,7 @@ public class AbstractStreamOperatorTest {
 		when(operatorStateBackend.snapshot(
 			eq(checkpointId),
 			eq(timestamp),
-			eq(streamFactory),
+			any(CheckpointStreamFactory.class),
 			any(CheckpointOptions.class))).thenReturn(futureManagedOperatorStateHandle);
 
 		AbstractKeyedStateBackend<?> keyedStateBackend = mock(AbstractKeyedStateBackend.class);