You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/02/23 13:22:58 UTC

flink git commit: [FLINK-5716] Add StreamStatusMaintainer mock in StreamSourceOperatorTest

Repository: flink
Updated Branches:
  refs/heads/master 234b90528 -> 8dac43613


[FLINK-5716] Add StreamStatusMaintainer mock in StreamSourceOperatorTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8dac4361
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8dac4361
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8dac4361

Branch: refs/heads/master
Commit: 8dac43613fbc937b2467b024a5f2524735fae5cf
Parents: 234b905
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Feb 23 14:21:24 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Feb 23 14:22:44 2017 +0100

----------------------------------------------------------------------
 .../streaming/runtime/operators/StreamSourceOperatorTest.java  | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8dac4361/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
index ae74c9c..912b448 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.StoppableFunction;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -295,6 +294,9 @@ public class StreamSourceOperatorTest {
 
 		Environment env = new DummyEnvironment("MockTwoInputTask", 1, 0);
 
+		StreamStatusMaintainer streamStatusMaintainer = mock(StreamStatusMaintainer.class);
+		when(streamStatusMaintainer.getStreamStatus()).thenReturn(StreamStatus.ACTIVE);
+
 		StreamTask<?, ?> mockTask = mock(StreamTask.class);
 		when(mockTask.getName()).thenReturn("Mock Task");
 		when(mockTask.getCheckpointLock()).thenReturn(new Object());
@@ -302,7 +304,7 @@ public class StreamSourceOperatorTest {
 		when(mockTask.getEnvironment()).thenReturn(env);
 		when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
 		when(mockTask.getAccumulatorMap()).thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap());
-		when(mockTask.getStreamStatusMaintainer()).thenReturn(mock(StreamStatusMaintainer.class));
+		when(mockTask.getStreamStatusMaintainer()).thenReturn(streamStatusMaintainer);
 
 		doAnswer(new Answer<ProcessingTimeService>() {
 			@Override