You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/10/26 21:36:48 UTC
[2/9] flink git commit: [FLINK-4907] Add Num-Subtasks/Subtask-Index
Parameter to Operator Test Harnesses
[FLINK-4907] Add Num-Subtasks/Subtask-Index Parameter to Operator Test Harnesses
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e396a5a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e396a5a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e396a5a7
Branch: refs/heads/master
Commit: e396a5a783f32e1eedef4008bf02c10525f65050
Parents: 9dc2635
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Oct 26 13:38:30 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Oct 26 23:26:28 2016 +0200
----------------------------------------------------------------------
.../StreamOperatorSnapshotRestoreTest.java | 8 ++++++--
.../util/AbstractStreamOperatorTestHarness.java | 19 +++++++++++--------
.../KeyedOneInputStreamOperatorTestHarness.java | 8 +++++---
.../KeyedTwoInputStreamOperatorTestHarness.java | 8 +++++---
.../util/OneInputStreamOperatorTestHarness.java | 8 +++++---
.../util/TwoInputStreamOperatorTestHarness.java | 8 +++++---
.../streaming/util/WindowingTestHarness.java | 2 +-
7 files changed, 38 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e396a5a7/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
index 997ceb7..c02a7c3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
@@ -68,7 +68,9 @@ public class StreamOperatorSnapshotRestoreTest {
}
},
TypeInformation.of(Integer.class),
- MAX_PARALLELISM);
+ MAX_PARALLELISM,
+ 1 /* num subtasks */,
+ 0 /* subtask index */);
testHarness.open();
@@ -92,7 +94,9 @@ public class StreamOperatorSnapshotRestoreTest {
}
},
TypeInformation.of(Integer.class),
- MAX_PARALLELISM);
+ MAX_PARALLELISM,
+ 1 /* num subtasks */,
+ 0 /* subtask index */);
testHarness.initializeState(handles);
http://git-wip-us.apache.org/repos/asf/flink/blob/e396a5a7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 7189ce0..dfc0af0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -66,8 +66,6 @@ import static org.mockito.Mockito.*;
*/
public class AbstractStreamOperatorTestHarness<OUT> {
- protected final static int DEFAULT_MAX_PARALLELISM = 1;
-
final protected StreamOperator<OUT> operator;
final protected ConcurrentLinkedQueue<Object> outputList;
@@ -80,6 +78,8 @@ public class AbstractStreamOperatorTestHarness<OUT> {
final protected StreamTask<?, ?> mockTask;
+ final Environment environment;
+
ClosableRegistry closableRegistry;
// use this as default for tests
@@ -97,7 +97,9 @@ public class AbstractStreamOperatorTestHarness<OUT> {
public AbstractStreamOperatorTestHarness(
StreamOperator<OUT> operator,
- int maxParallelism) throws Exception {
+ int maxParallelism,
+ int numSubtasks,
+ int subtaskIndex) throws Exception {
this.operator = operator;
this.outputList = new ConcurrentLinkedQueue<>();
Configuration underlyingConfig = new Configuration();
@@ -107,7 +109,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
this.closableRegistry = new ClosableRegistry();
this.checkpointLock = new Object();
- final Environment env = new MockEnvironment(
+ environment = new MockEnvironment(
"MockTask",
3 * 1024 * 1024,
new MockInputSplitProvider(),
@@ -115,7 +117,8 @@ public class AbstractStreamOperatorTestHarness<OUT> {
underlyingConfig,
executionConfig,
maxParallelism,
- 1, 0);
+ numSubtasks,
+ subtaskIndex);
mockTask = mock(StreamTask.class);
processingTimeService = new TestProcessingTimeService();
@@ -125,7 +128,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
when(mockTask.getConfiguration()).thenReturn(config);
when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig);
- when(mockTask.getEnvironment()).thenReturn(env);
+ when(mockTask.getEnvironment()).thenReturn(environment);
when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
when(mockTask.getUserCodeClassLoader()).thenReturn(this.getClass().getClassLoader());
when(mockTask.getCancelables()).thenReturn(this.closableRegistry);
@@ -159,9 +162,9 @@ public class AbstractStreamOperatorTestHarness<OUT> {
final Collection<OperatorStateHandle> stateHandles = (Collection<OperatorStateHandle>) invocationOnMock.getArguments()[1];
OperatorStateBackend osb;
if (null == stateHandles) {
- osb = stateBackend.createOperatorStateBackend(env, operator.getClass().getSimpleName());
+ osb = stateBackend.createOperatorStateBackend(environment, operator.getClass().getSimpleName());
} else {
- osb = stateBackend.restoreOperatorStateBackend(env, operator.getClass().getSimpleName(), stateHandles);
+ osb = stateBackend.restoreOperatorStateBackend(environment, operator.getClass().getSimpleName(), stateHandles);
}
mockTask.getCancelables().registerClosable(osb);
return osb;
http://git-wip-us.apache.org/repos/asf/flink/blob/e396a5a7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index 25563a3..7d87eb8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -65,8 +65,10 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
OneInputStreamOperator<IN, OUT> operator,
final KeySelector<IN, K> keySelector,
TypeInformation<K> keyType,
- int maxParallelism) throws Exception {
- super(operator, maxParallelism);
+ int maxParallelism,
+ int numSubtasks,
+ int subtaskIndex) throws Exception {
+ super(operator, maxParallelism, numSubtasks, subtaskIndex);
ClosureCleaner.clean(keySelector, false);
config.setStatePartitioner(0, keySelector);
@@ -80,7 +82,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
OneInputStreamOperator<IN, OUT> operator,
final KeySelector<IN, K> keySelector,
TypeInformation<K> keyType) throws Exception {
- this(operator, keySelector, keyType, DEFAULT_MAX_PARALLELISM);
+ this(operator, keySelector, keyType, 1, 1, 0);
}
private void setupMockTaskCreateKeyedBackend() {
http://git-wip-us.apache.org/repos/asf/flink/blob/e396a5a7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
index 1a01ea3..0aa91d9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
@@ -57,8 +57,10 @@ public class KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT>
KeySelector<IN1, K> keySelector1,
KeySelector<IN2, K> keySelector2,
TypeInformation<K> keyType,
- int maxParallelism) throws Exception {
- super(operator, maxParallelism);
+ int maxParallelism,
+ int numSubtasks,
+ int subtaskIndex) throws Exception {
+ super(operator, maxParallelism, numSubtasks, subtaskIndex);
ClosureCleaner.clean(keySelector1, false);
ClosureCleaner.clean(keySelector2, false);
@@ -74,7 +76,7 @@ public class KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT>
final KeySelector<IN1, K> keySelector1,
final KeySelector<IN2, K> keySelector2,
TypeInformation<K> keyType) throws Exception {
- this(operator, keySelector1, keySelector2, keyType, DEFAULT_MAX_PARALLELISM);
+ this(operator, keySelector1, keySelector2, keyType, 1, 1, 0);
}
private void setupMockTaskCreateKeyedBackend() {
http://git-wip-us.apache.org/repos/asf/flink/blob/e396a5a7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 8be9c63..105922b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -36,13 +36,15 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
private final OneInputStreamOperator<IN, OUT> oneInputOperator;
public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator) throws Exception {
- this(operator, DEFAULT_MAX_PARALLELISM);
+ this(operator, 1, 1, 0);
}
public OneInputStreamOperatorTestHarness(
OneInputStreamOperator<IN, OUT> operator,
- int maxParallelism) throws Exception {
- super(operator, maxParallelism);
+ int maxParallelism,
+ int numTubtasks,
+ int subtaskIndex) throws Exception {
+ super(operator, maxParallelism, numTubtasks, subtaskIndex);
this.oneInputOperator = operator;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e396a5a7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
index c6f6918..90bdcb2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
@@ -35,13 +35,15 @@ public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT>extends AbstractStr
private final TwoInputStreamOperator<IN1, IN2, OUT> twoInputOperator;
public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator) throws Exception {
- this(operator, DEFAULT_MAX_PARALLELISM);
+ this(operator, 1, 1, 0);
}
public TwoInputStreamOperatorTestHarness(
TwoInputStreamOperator<IN1, IN2, OUT> operator,
- int maxParallelism) throws Exception {
- super(operator, maxParallelism);
+ int maxParallelism,
+ int numSubtasks,
+ int subtaskIndex) throws Exception {
+ super(operator, maxParallelism, numSubtasks, subtaskIndex);
this.twoInputOperator = operator;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e396a5a7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
index db3a89c..25deb54 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
@@ -76,7 +76,7 @@ public class WindowingTestHarness<K, IN, W extends Window> {
trigger,
allowedLateness);
- testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, keyType, 1);
+ testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, keyType);
}
/**