You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/05/22 08:43:51 UTC
[15/17] flink git commit: [hotfix][tests] Introduce
MockEnvironmentBuilder to deduplicate MockEnvironment constructors
[hotfix][tests] Introduce MockEnvironmentBuilder to deduplicate MockEnvironment constructors
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4948e2ea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4948e2ea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4948e2ea
Branch: refs/heads/master
Commit: 4948e2ea80869b2aaff610f6e7e0963d05da350e
Parents: cdd99ca
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Wed May 9 11:12:18 2018 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue May 22 16:42:29 2018 +0800
----------------------------------------------------------------------
.../kafka/FlinkKafkaConsumerBaseTest.java | 13 +-
.../kinesis/testutils/TestRuntimeContext.java | 13 +-
.../operators/testutils/MockEnvironment.java | 102 +--------------
.../testutils/MockEnvironmentBuilder.java | 125 +++++++++++++++++++
.../operators/testutils/TaskTestBase.java | 8 +-
.../source/InputFormatSourceFunctionTest.java | 13 +-
.../StreamOperatorSnapshotRestoreTest.java | 29 ++---
.../operators/async/AsyncWaitOperatorTest.java | 13 +-
.../operators/StreamOperatorChainingTest.java | 14 +--
.../streaming/runtime/tasks/StreamTaskTest.java | 35 +++---
.../util/AbstractStreamOperatorTestHarness.java | 21 ++--
.../streaming/util/SourceFunctionUtil.java | 15 +--
.../PojoSerializerUpgradeTest.java | 25 ++--
13 files changed, 224 insertions(+), 202 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4948e2ea/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index b226ff1..4605015 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -33,10 +33,9 @@ import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
-import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
@@ -834,12 +833,10 @@ public class FlinkKafkaConsumerBaseTest {
super(
new MockStreamOperator(),
- new MockEnvironment(
- "mockTask",
- 4 * MemoryManager.DEFAULT_PAGE_SIZE,
- null,
- 16,
- new TestTaskStateManager()),
+ new MockEnvironmentBuilder()
+ .setTaskName("mockTask")
+ .setMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE)
+ .build(),
Collections.emptyMap());
this.isCheckpointingEnabled = isCheckpointingEnabled;
http://git-wip-us.apache.org/repos/asf/flink/blob/4948e2ea/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java
index ce0bd97..740d2f2 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java
@@ -21,8 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
@@ -45,12 +44,10 @@ public class TestRuntimeContext extends StreamingRuntimeContext {
super(
new TestStreamOperator(),
- new MockEnvironment(
- "mockTask",
- 4 * MemoryManager.DEFAULT_PAGE_SIZE,
- null,
- 16,
- new TestTaskStateManager()),
+ new MockEnvironmentBuilder()
+ .setTaskName("mockTask")
+ .setMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE)
+ .build(),
Collections.emptyMap());
this.isCheckpointingEnabled = isCheckpointingEnabled;
http://git-wip-us.apache.org/repos/asf/flink/blob/4948e2ea/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index ce19a5e..4bf94e9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -45,7 +45,6 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.TaskStateManager;
-import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.types.Record;
@@ -109,106 +108,11 @@ public class MockEnvironment implements Environment, AutoCloseable {
private Optional<Throwable> actualExternalFailureCause = Optional.empty();
- public MockEnvironment() {
- this(
- "mock-task",
- 1024 * MemoryManager.DEFAULT_PAGE_SIZE,
- null,
- 16,
- new TestTaskStateManager());
+ public static MockEnvironmentBuilder builder() {
+ return new MockEnvironmentBuilder();
}
- public MockEnvironment(
- String taskName,
- long memorySize,
- MockInputSplitProvider inputSplitProvider,
- int bufferSize,
- TaskStateManager taskStateManager) {
- this(
- taskName,
- memorySize,
- inputSplitProvider,
- bufferSize,
- new Configuration(),
- new ExecutionConfig(),
- taskStateManager);
- }
-
- public MockEnvironment(
- String taskName,
- long memorySize,
- MockInputSplitProvider inputSplitProvider,
- int bufferSize, Configuration taskConfiguration,
- ExecutionConfig executionConfig,
- TaskStateManager taskStateManager) {
- this(
- taskName,
- memorySize,
- inputSplitProvider,
- bufferSize,
- taskConfiguration,
- executionConfig,
- taskStateManager,
- 1,
- 1,
- 0);
- }
-
- public MockEnvironment(
- String taskName,
- long memorySize,
- MockInputSplitProvider inputSplitProvider,
- int bufferSize,
- Configuration taskConfiguration,
- ExecutionConfig executionConfig,
- TaskStateManager taskStateManager,
- int maxParallelism,
- int parallelism,
- int subtaskIndex) {
- this(
- taskName,
- memorySize,
- inputSplitProvider,
- bufferSize,
- taskConfiguration,
- executionConfig,
- taskStateManager,
- maxParallelism,
- parallelism,
- subtaskIndex,
- Thread.currentThread().getContextClassLoader());
-
- }
-
- public MockEnvironment(
- String taskName,
- long memorySize,
- MockInputSplitProvider inputSplitProvider,
- int bufferSize,
- Configuration taskConfiguration,
- ExecutionConfig executionConfig,
- TaskStateManager taskStateManager,
- int maxParallelism,
- int parallelism,
- int subtaskIndex,
- ClassLoader userCodeClassLoader) {
- this(
- new JobID(),
- new JobVertexID(),
- taskName,
- memorySize,
- inputSplitProvider,
- bufferSize,
- taskConfiguration,
- executionConfig,
- taskStateManager,
- maxParallelism,
- parallelism,
- subtaskIndex,
- userCodeClassLoader);
- }
-
- public MockEnvironment(
+ protected MockEnvironment(
JobID jobID,
JobVertexID jobVertexID,
String taskName,
http://git-wip-us.apache.org/repos/asf/flink/blob/4948e2ea/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
new file mode 100644
index 0000000..dfb10d4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.state.TaskStateManager;
+import org.apache.flink.runtime.state.TestTaskStateManager;
+
+public class MockEnvironmentBuilder {
+ private String taskName = "mock-task";
+ private long memorySize = 1024 * MemoryManager.DEFAULT_PAGE_SIZE;
+ private MockInputSplitProvider inputSplitProvider = null;
+ private int bufferSize = 16;
+ private TaskStateManager taskStateManager = new TestTaskStateManager();
+ private Configuration taskConfiguration = new Configuration();
+ private ExecutionConfig executionConfig = new ExecutionConfig();
+ private int maxParallelism = 1;
+ private int parallelism = 1;
+ private int subtaskIndex = 0;
+ private ClassLoader userCodeClassLoader = Thread.currentThread().getContextClassLoader();
+ private JobID jobID = new JobID();
+ private JobVertexID jobVertexID = new JobVertexID();
+
+ public MockEnvironmentBuilder setTaskName(String taskName) {
+ this.taskName = taskName;
+ return this;
+ }
+
+ public MockEnvironmentBuilder setMemorySize(long memorySize) {
+ this.memorySize = memorySize;
+ return this;
+ }
+
+ public MockEnvironmentBuilder setInputSplitProvider(MockInputSplitProvider inputSplitProvider) {
+ this.inputSplitProvider = inputSplitProvider;
+ return this;
+ }
+
+ public MockEnvironmentBuilder setBufferSize(int bufferSize) {
+ this.bufferSize = bufferSize;
+ return this;
+ }
+
+ public MockEnvironmentBuilder setTaskStateManager(TaskStateManager taskStateManager) {
+ this.taskStateManager = taskStateManager;
+ return this;
+ }
+
+ public MockEnvironmentBuilder setTaskConfiguration(Configuration taskConfiguration) {
+ this.taskConfiguration = taskConfiguration;
+ return this;
+ }
+
+ public MockEnvironmentBuilder setExecutionConfig(ExecutionConfig executionConfig) {
+ this.executionConfig = executionConfig;
+ return this;
+ }
+
+ public MockEnvironmentBuilder setMaxParallelism(int maxParallelism) {
+ this.maxParallelism = maxParallelism;
+ return this;
+ }
+
+ public MockEnvironmentBuilder setParallelism(int parallelism) {
+ this.parallelism = parallelism;
+ return this;
+ }
+
+ public MockEnvironmentBuilder setSubtaskIndex(int subtaskIndex) {
+ this.subtaskIndex = subtaskIndex;
+ return this;
+ }
+
+ public MockEnvironmentBuilder setUserCodeClassLoader(ClassLoader userCodeClassLoader) {
+ this.userCodeClassLoader = userCodeClassLoader;
+ return this;
+ }
+
+ public MockEnvironmentBuilder setJobID(JobID jobID) {
+ this.jobID = jobID;
+ return this;
+ }
+
+ public MockEnvironmentBuilder setJobVertexID(JobVertexID jobVertexID) {
+ this.jobVertexID = jobVertexID;
+ return this;
+ }
+
+ public MockEnvironment build() {
+ return new MockEnvironment(
+ jobID,
+ jobVertexID,
+ taskName,
+ memorySize,
+ inputSplitProvider,
+ bufferSize,
+ taskConfiguration,
+ executionConfig,
+ taskStateManager,
+ maxParallelism,
+ parallelism,
+ subtaskIndex,
+ userCodeClassLoader);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4948e2ea/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
index a40992c..16485ca 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
@@ -54,8 +54,12 @@ public abstract class TaskTestBase extends TestLogger {
public void initEnvironment(long memorySize, int bufferSize) {
this.memorySize = memorySize;
this.inputSplitProvider = new MockInputSplitProvider();
- TestTaskStateManager taskStateManager = new TestTaskStateManager();
- this.mockEnv = new MockEnvironment("mock task", this.memorySize, this.inputSplitProvider, bufferSize, taskStateManager);
+ this.mockEnv = new MockEnvironmentBuilder()
+ .setTaskName("mock task")
+ .setMemorySize(this.memorySize)
+ .setInputSplitProvider(this.inputSplitProvider)
+ .setBufferSize(bufferSize)
+ .build();
}
public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record> input, int groupId) {
http://git-wip-us.apache.org/repos/asf/flink/blob/4948e2ea/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
index 18c8ac5..84a45d8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
@@ -66,12 +66,11 @@ public class InputFormatSourceFunctionTest {
final LifeCycleTestInputFormat format = new LifeCycleTestInputFormat();
final InputFormatSourceFunction<Integer> reader = new InputFormatSourceFunction<>(format, TypeInformation.of(Integer.class));
- try (MockEnvironment environment = new MockEnvironment(
- "no",
- 4 * MemoryManager.DEFAULT_PAGE_SIZE,
- null,
- 16,
- new TestTaskStateManager())) {
+ try (MockEnvironment environment =
+ new MockEnvironmentBuilder()
+ .setTaskName("no")
+ .setMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE)
+ .build()) {
reader.setRuntimeContext(new MockRuntimeContext(format, noOfSplits, environment));
http://git-wip-us.apache.org/repos/asf/flink/blob/4948e2ea/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
index 6d011a3..a38ffa6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.api.operators;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -27,7 +26,6 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
@@ -36,6 +34,7 @@ import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
@@ -148,20 +147,18 @@ public class StreamOperatorSnapshotRestoreTest extends TestLogger {
LocalRecoveryConfig localRecoveryConfig =
new LocalRecoveryConfig(mode != ONLY_JM_RECOVERY, directoryProvider);
- MockEnvironment mockEnvironment = new MockEnvironment(
- jobID,
- jobVertexID,
- "test",
- 1024L * 1024L,
- new MockInputSplitProvider(),
- 1024 * 1024,
- new Configuration(),
- new ExecutionConfig(),
- new TestTaskStateManager(localRecoveryConfig),
- MAX_PARALLELISM,
- 1,
- subtaskIdx,
- getClass().getClassLoader());
+ MockEnvironment mockEnvironment = new MockEnvironmentBuilder()
+ .setJobID(jobID)
+ .setJobVertexID(jobVertexID)
+ .setTaskName("test")
+ .setMemorySize(1024L * 1024L)
+ .setInputSplitProvider(new MockInputSplitProvider())
+ .setBufferSize(1024 * 1024)
+ .setTaskStateManager(new TestTaskStateManager(localRecoveryConfig))
+ .setMaxParallelism(MAX_PARALLELISM)
+ .setSubtaskIndex(subtaskIdx)
+ .setUserCodeClassLoader(getClass().getClassLoader())
+ .build();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(
http://git-wip-us.apache.org/repos/asf/flink/blob/4948e2ea/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 35e2fbd..cdd77d3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
@@ -650,12 +651,12 @@ public class AsyncWaitOperatorTest extends TestLogger {
@Nonnull
private MockEnvironment createMockEnvironment() {
- return new MockEnvironment(
- "foobarTask",
- 1024 * 1024L,
- new MockInputSplitProvider(),
- 4 * 1024,
- new TestTaskStateManager());
+ return new MockEnvironmentBuilder()
+ .setTaskName("foobarTask")
+ .setMemorySize(1024 * 1024L)
+ .setInputSplitProvider(new MockInputSplitProvider())
+ .setBufferSize(4 * 1024)
+ .build();
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/4948e2ea/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
index e980ab7..fd6a953 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
@@ -25,8 +25,8 @@ import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SplitStream;
@@ -170,12 +170,12 @@ public class StreamOperatorChainingTest {
}
private MockEnvironment createMockEnvironment(String taskName) {
- return new MockEnvironment(
- taskName,
- 3 * 1024 * 1024,
- new MockInputSplitProvider(),
- 1024,
- new TestTaskStateManager());
+ return new MockEnvironmentBuilder()
+ .setTaskName(taskName)
+ .setMemorySize(3 * 1024 * 1024)
+ .setInputSplitProvider(new MockInputSplitProvider())
+ .setBufferSize(1024)
+ .build();
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/4948e2ea/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 73a575e..34d2395 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -62,6 +62,7 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
@@ -328,7 +329,7 @@ public class StreamTaskTest extends TestLogger {
TaskInfo mockTaskInfo = mock(TaskInfo.class);
when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar");
when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0);
- Environment mockEnvironment = new MockEnvironment();
+ Environment mockEnvironment = new MockEnvironmentBuilder().build();
StreamTask<?, ?> streamTask = new EmptyStreamTask(mockEnvironment);
CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
@@ -398,7 +399,7 @@ public class StreamTaskTest extends TestLogger {
final long checkpointId = 42L;
final long timestamp = 1L;
- MockEnvironment mockEnvironment = new MockEnvironment();
+ MockEnvironment mockEnvironment = new MockEnvironmentBuilder().build();
StreamTask<?, ?> streamTask = spy(new EmptyStreamTask(mockEnvironment));
CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
@@ -501,12 +502,10 @@ public class StreamTaskTest extends TestLogger {
null,
checkpointResponder);
- MockEnvironment mockEnvironment = new MockEnvironment(
- "mock-task",
- 1024 * MemoryManager.DEFAULT_PAGE_SIZE,
- null,
- 16,
- taskStateManager);
+ MockEnvironment mockEnvironment = new MockEnvironmentBuilder()
+ .setTaskName("mock-task")
+ .setTaskStateManager(taskStateManager)
+ .build();
StreamTask<?, ?> streamTask = new EmptyStreamTask(mockEnvironment);
CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
@@ -599,7 +598,7 @@ public class StreamTaskTest extends TestLogger {
final OneShotLatch createSubtask = new OneShotLatch();
final OneShotLatch completeSubtask = new OneShotLatch();
- Environment mockEnvironment = spy(new MockEnvironment());
+ Environment mockEnvironment = spy(new MockEnvironmentBuilder().build());
whenNew(OperatorSnapshotFinalizer.class).
withAnyArguments().
@@ -685,7 +684,7 @@ public class StreamTaskTest extends TestLogger {
final long checkpointId = 42L;
final long timestamp = 1L;
- Environment mockEnvironment = spy(new MockEnvironment());
+ Environment mockEnvironment = spy(new MockEnvironmentBuilder().build());
// latch blocks until the async checkpoint thread acknowledges
final OneShotLatch checkpointCompletedLatch = new OneShotLatch();
@@ -765,14 +764,14 @@ public class StreamTaskTest extends TestLogger {
streamConfig.setStreamOperator(new BlockingCloseStreamOperator());
streamConfig.setOperatorID(new OperatorID());
- try (MockEnvironment mockEnvironment = new MockEnvironment(
- "Test Task",
- 32L * 1024L,
- new MockInputSplitProvider(),
- 1,
- taskConfiguration,
- new ExecutionConfig(),
- new TestTaskStateManager())) {
+ try (MockEnvironment mockEnvironment =
+ new MockEnvironmentBuilder()
+ .setTaskName("Test Task")
+ .setMemorySize(32L * 1024L)
+ .setInputSplitProvider(new MockInputSplitProvider())
+ .setBufferSize(1)
+ .setTaskConfiguration(taskConfiguration)
+ .build()) {
StreamTask<Void, BlockingCloseStreamOperator> streamTask = new NoOpStreamTask<>(mockEnvironment);
final AtomicReference<Throwable> atomicThrowable = new AtomicReference<>(null);
http://git-wip-us.apache.org/repos/asf/flink/blob/4948e2ea/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index ed2da18..26ad3ab 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
@@ -137,17 +138,15 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
int subtaskIndex) throws Exception {
this(
operator,
- new MockEnvironment(
- "MockTask",
- 3 * 1024 * 1024,
- new MockInputSplitProvider(),
- 1024,
- new Configuration(),
- new ExecutionConfig(),
- new TestTaskStateManager(),
- maxParallelism,
- parallelism,
- subtaskIndex),
+ new MockEnvironmentBuilder()
+ .setTaskName("MockTask")
+ .setMemorySize(3 * 1024 * 1024)
+ .setInputSplitProvider(new MockInputSplitProvider())
+ .setBufferSize(1024)
+ .setMaxParallelism(maxParallelism)
+ .setParallelism(parallelism)
+ .setSubtaskIndex(subtaskIndex)
+ .build(),
true);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4948e2ea/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
index 3f54081..660a333 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
@@ -51,12 +51,13 @@ public class SourceFunctionUtil {
}
private static <T extends Serializable> List<T> runRichSourceFunction(SourceFunction<T> sourceFunction) throws Exception {
- try (MockEnvironment environment = new MockEnvironment(
- "MockTask",
- 3 * 1024 * 1024,
- new MockInputSplitProvider(),
- 1024,
- new TestTaskStateManager())) {
+ try (MockEnvironment environment =
+ new MockEnvironmentBuilder()
+ .setTaskName("MockTask")
+ .setMemorySize(3 * 1024 * 1024)
+ .setInputSplitProvider(new MockInputSplitProvider())
+ .setBufferSize(1024)
+ .build()) {
AbstractStreamOperator<?> operator = mock(AbstractStreamOperator.class);
when(operator.getExecutionConfig()).thenReturn(new ExecutionConfig());
http://git-wip-us.apache.org/repos/asf/flink/blob/4948e2ea/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
index aaa96fb..fe56782 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
@@ -37,12 +37,12 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendLoader;
-import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamMap;
@@ -351,18 +351,17 @@ public class PojoSerializerUpgradeTest extends TestLogger {
OperatorSubtaskState operatorSubtaskState,
Iterable<Long> input) throws Exception {
- try (final MockEnvironment environment = new MockEnvironment(
- "test task",
- 32 * 1024,
- new MockInputSplitProvider(),
- 256,
- taskConfiguration,
- executionConfig,
- new TestTaskStateManager(),
- 16,
- 1,
- 0,
- classLoader)) {
+ try (final MockEnvironment environment =
+ new MockEnvironmentBuilder()
+ .setTaskName("test task")
+ .setMemorySize(32 * 1024)
+ .setInputSplitProvider(new MockInputSplitProvider())
+ .setBufferSize(256)
+ .setTaskConfiguration(taskConfiguration)
+ .setExecutionConfig(executionConfig)
+ .setMaxParallelism(16)
+ .setUserCodeClassLoader(classLoader)
+ .build()) {
OneInputStreamOperatorTestHarness<Long, Long> harness = null;
try {