You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/10/26 21:36:48 UTC

[2/9] flink git commit: [FLINK-4907] Add Num-Subtasks/Subtask-Index Parameter to Operator Test Harnesses

[FLINK-4907] Add Num-Subtasks/Subtask-Index Parameter to Operator Test Harnesses


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e396a5a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e396a5a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e396a5a7

Branch: refs/heads/master
Commit: e396a5a783f32e1eedef4008bf02c10525f65050
Parents: 9dc2635
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Oct 26 13:38:30 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Oct 26 23:26:28 2016 +0200

----------------------------------------------------------------------
 .../StreamOperatorSnapshotRestoreTest.java       |  8 ++++++--
 .../util/AbstractStreamOperatorTestHarness.java  | 19 +++++++++++--------
 .../KeyedOneInputStreamOperatorTestHarness.java  |  8 +++++---
 .../KeyedTwoInputStreamOperatorTestHarness.java  |  8 +++++---
 .../util/OneInputStreamOperatorTestHarness.java  |  8 +++++---
 .../util/TwoInputStreamOperatorTestHarness.java  |  8 +++++---
 .../streaming/util/WindowingTestHarness.java     |  2 +-
 7 files changed, 38 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e396a5a7/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
index 997ceb7..c02a7c3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
@@ -68,7 +68,9 @@ public class StreamOperatorSnapshotRestoreTest {
 							}
 						},
 						TypeInformation.of(Integer.class),
-						MAX_PARALLELISM);
+						MAX_PARALLELISM,
+						1 /* num subtasks */,
+						0 /* subtask index */);
 
 		testHarness.open();
 
@@ -92,7 +94,9 @@ public class StreamOperatorSnapshotRestoreTest {
 					}
 				},
 				TypeInformation.of(Integer.class),
-				MAX_PARALLELISM);
+				MAX_PARALLELISM,
+				1 /* num subtasks */,
+				0 /* subtask index */);
 
 		testHarness.initializeState(handles);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e396a5a7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 7189ce0..dfc0af0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -66,8 +66,6 @@ import static org.mockito.Mockito.*;
  */
 public class AbstractStreamOperatorTestHarness<OUT> {
 
-	protected final static int DEFAULT_MAX_PARALLELISM = 1;
-
 	final protected StreamOperator<OUT> operator;
 
 	final protected ConcurrentLinkedQueue<Object> outputList;
@@ -80,6 +78,8 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 
 	final protected StreamTask<?, ?> mockTask;
 
+	final Environment environment;
+
 	ClosableRegistry closableRegistry;
 
 	// use this as default for tests
@@ -97,7 +97,9 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 
 	public AbstractStreamOperatorTestHarness(
 			StreamOperator<OUT> operator,
-			int maxParallelism) throws Exception {
+			int maxParallelism,
+			int numSubtasks,
+			int subtaskIndex) throws Exception {
 		this.operator = operator;
 		this.outputList = new ConcurrentLinkedQueue<>();
 		Configuration underlyingConfig = new Configuration();
@@ -107,7 +109,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 		this.closableRegistry = new ClosableRegistry();
 		this.checkpointLock = new Object();
 
-		final Environment env = new MockEnvironment(
+		environment = new MockEnvironment(
 				"MockTask",
 				3 * 1024 * 1024,
 				new MockInputSplitProvider(),
@@ -115,7 +117,8 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 				underlyingConfig,
 				executionConfig,
 				maxParallelism,
-				1, 0);
+				numSubtasks,
+				subtaskIndex);
 
 		mockTask = mock(StreamTask.class);
 		processingTimeService = new TestProcessingTimeService();
@@ -125,7 +128,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 		when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
 		when(mockTask.getConfiguration()).thenReturn(config);
 		when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig);
-		when(mockTask.getEnvironment()).thenReturn(env);
+		when(mockTask.getEnvironment()).thenReturn(environment);
 		when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
 		when(mockTask.getUserCodeClassLoader()).thenReturn(this.getClass().getClassLoader());
 		when(mockTask.getCancelables()).thenReturn(this.closableRegistry);
@@ -159,9 +162,9 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 					final Collection<OperatorStateHandle> stateHandles = (Collection<OperatorStateHandle>) invocationOnMock.getArguments()[1];
 					OperatorStateBackend osb;
 					if (null == stateHandles) {
-						osb = stateBackend.createOperatorStateBackend(env, operator.getClass().getSimpleName());
+						osb = stateBackend.createOperatorStateBackend(environment, operator.getClass().getSimpleName());
 					} else {
-						osb = stateBackend.restoreOperatorStateBackend(env, operator.getClass().getSimpleName(), stateHandles);
+						osb = stateBackend.restoreOperatorStateBackend(environment, operator.getClass().getSimpleName(), stateHandles);
 					}
 					mockTask.getCancelables().registerClosable(osb);
 					return osb;

http://git-wip-us.apache.org/repos/asf/flink/blob/e396a5a7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index 25563a3..7d87eb8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -65,8 +65,10 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 			OneInputStreamOperator<IN, OUT> operator,
 			final KeySelector<IN, K> keySelector,
 			TypeInformation<K> keyType,
-			int maxParallelism) throws Exception {
-		super(operator, maxParallelism);
+			int maxParallelism,
+			int numSubtasks,
+			int subtaskIndex) throws Exception {
+		super(operator, maxParallelism, numSubtasks, subtaskIndex);
 
 		ClosureCleaner.clean(keySelector, false);
 		config.setStatePartitioner(0, keySelector);
@@ -80,7 +82,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 			OneInputStreamOperator<IN, OUT> operator,
 			final KeySelector<IN, K> keySelector,
 			TypeInformation<K> keyType) throws Exception {
-		this(operator, keySelector, keyType, DEFAULT_MAX_PARALLELISM);
+		this(operator, keySelector, keyType, 1, 1, 0);
 	}
 
 	private void setupMockTaskCreateKeyedBackend() {

http://git-wip-us.apache.org/repos/asf/flink/blob/e396a5a7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
index 1a01ea3..0aa91d9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
@@ -57,8 +57,10 @@ public class KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT>
 			KeySelector<IN1, K> keySelector1,
 			KeySelector<IN2, K> keySelector2,
 			TypeInformation<K> keyType,
-			int maxParallelism) throws Exception {
-		super(operator, maxParallelism);
+			int maxParallelism,
+			int numSubtasks,
+			int subtaskIndex) throws Exception {
+		super(operator, maxParallelism, numSubtasks, subtaskIndex);
 
 		ClosureCleaner.clean(keySelector1, false);
 		ClosureCleaner.clean(keySelector2, false);
@@ -74,7 +76,7 @@ public class KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT>
 			final KeySelector<IN1, K> keySelector1,
 			final KeySelector<IN2, K> keySelector2,
 			TypeInformation<K> keyType) throws Exception {
-		this(operator, keySelector1, keySelector2, keyType, DEFAULT_MAX_PARALLELISM);
+		this(operator, keySelector1, keySelector2, keyType, 1, 1, 0);
 	}
 
 	private void setupMockTaskCreateKeyedBackend() {

http://git-wip-us.apache.org/repos/asf/flink/blob/e396a5a7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 8be9c63..105922b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -36,13 +36,15 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
 	private final OneInputStreamOperator<IN, OUT> oneInputOperator;
 
 	public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator) throws Exception {
-		this(operator, DEFAULT_MAX_PARALLELISM);
+		this(operator, 1, 1, 0);
 	}
 
 	public OneInputStreamOperatorTestHarness(
 			OneInputStreamOperator<IN, OUT> operator,
-			int maxParallelism) throws Exception {
-		super(operator, maxParallelism);
+			int maxParallelism,
+			int numTubtasks,
+			int subtaskIndex) throws Exception {
+		super(operator, maxParallelism, numTubtasks, subtaskIndex);
 
 		this.oneInputOperator = operator;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e396a5a7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
index c6f6918..90bdcb2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
@@ -35,13 +35,15 @@ public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT>extends AbstractStr
 	private final TwoInputStreamOperator<IN1, IN2, OUT> twoInputOperator;
 
 	public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator) throws Exception {
-		this(operator, DEFAULT_MAX_PARALLELISM);
+		this(operator, 1, 1, 0);
 	}
 		
 	public TwoInputStreamOperatorTestHarness(
 			TwoInputStreamOperator<IN1, IN2, OUT> operator,
-			int maxParallelism) throws Exception {
-		super(operator, maxParallelism);
+			int maxParallelism,
+			int numSubtasks,
+			int subtaskIndex) throws Exception {
+		super(operator, maxParallelism, numSubtasks, subtaskIndex);
 
 		this.twoInputOperator = operator;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e396a5a7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
index db3a89c..25deb54 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
@@ -76,7 +76,7 @@ public class WindowingTestHarness<K, IN, W extends Window> {
 				trigger,
 				allowedLateness);
 
-		testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, keyType, 1);
+		testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, keyType);
 	}
 
 	/**