You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/05/22 08:43:51 UTC

[15/17] flink git commit: [hotfix][tests] Introduce MockEnvironmentBuilder to deduplicate MockEnvironment constructors

[hotfix][tests] Introduce MockEnvironmentBuilder to deduplicate MockEnvironment constructors


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4948e2ea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4948e2ea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4948e2ea

Branch: refs/heads/master
Commit: 4948e2ea80869b2aaff610f6e7e0963d05da350e
Parents: cdd99ca
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Wed May 9 11:12:18 2018 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue May 22 16:42:29 2018 +0800

----------------------------------------------------------------------
 .../kafka/FlinkKafkaConsumerBaseTest.java       |  13 +-
 .../kinesis/testutils/TestRuntimeContext.java   |  13 +-
 .../operators/testutils/MockEnvironment.java    | 102 +--------------
 .../testutils/MockEnvironmentBuilder.java       | 125 +++++++++++++++++++
 .../operators/testutils/TaskTestBase.java       |   8 +-
 .../source/InputFormatSourceFunctionTest.java   |  13 +-
 .../StreamOperatorSnapshotRestoreTest.java      |  29 ++---
 .../operators/async/AsyncWaitOperatorTest.java  |  13 +-
 .../operators/StreamOperatorChainingTest.java   |  14 +--
 .../streaming/runtime/tasks/StreamTaskTest.java |  35 +++---
 .../util/AbstractStreamOperatorTestHarness.java |  21 ++--
 .../streaming/util/SourceFunctionUtil.java      |  15 +--
 .../PojoSerializerUpgradeTest.java              |  25 ++--
 13 files changed, 224 insertions(+), 202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4948e2ea/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index b226ff1..4605015 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -33,10 +33,9 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
-import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
@@ -834,12 +833,10 @@ public class FlinkKafkaConsumerBaseTest {
 
 			super(
 				new MockStreamOperator(),
-				new MockEnvironment(
-					"mockTask",
-					4 * MemoryManager.DEFAULT_PAGE_SIZE,
-					null,
-					16,
-					new TestTaskStateManager()),
+				new MockEnvironmentBuilder()
+					.setTaskName("mockTask")
+					.setMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE)
+					.build(),
 				Collections.emptyMap());
 
 			this.isCheckpointingEnabled = isCheckpointingEnabled;

http://git-wip-us.apache.org/repos/asf/flink/blob/4948e2ea/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java
index ce0bd97..740d2f2 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java
@@ -21,8 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
@@ -45,12 +44,10 @@ public class TestRuntimeContext extends StreamingRuntimeContext {
 
 		super(
 			new TestStreamOperator(),
-			new MockEnvironment(
-				"mockTask",
-				4 * MemoryManager.DEFAULT_PAGE_SIZE,
-				null,
-				16,
-				new TestTaskStateManager()),
+			new MockEnvironmentBuilder()
+				.setTaskName("mockTask")
+				.setMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE)
+				.build(),
 			Collections.emptyMap());
 
 		this.isCheckpointingEnabled = isCheckpointingEnabled;

http://git-wip-us.apache.org/repos/asf/flink/blob/4948e2ea/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index ce19a5e..4bf94e9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -45,7 +45,6 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.TaskStateManager;
-import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.types.Record;
@@ -109,106 +108,11 @@ public class MockEnvironment implements Environment, AutoCloseable {
 
 	private Optional<Throwable> actualExternalFailureCause = Optional.empty();
 
-	public MockEnvironment() {
-		this(
-			"mock-task",
-			1024 * MemoryManager.DEFAULT_PAGE_SIZE,
-			null,
-			16,
-			new TestTaskStateManager());
+	public static MockEnvironmentBuilder builder() {
+		return new MockEnvironmentBuilder();
 	}
 
-	public MockEnvironment(
-		String taskName,
-		long memorySize,
-		MockInputSplitProvider inputSplitProvider,
-		int bufferSize,
-		TaskStateManager taskStateManager) {
-		this(
-			taskName,
-			memorySize,
-			inputSplitProvider,
-			bufferSize,
-			new Configuration(),
-			new ExecutionConfig(),
-			taskStateManager);
-	}
-
-	public MockEnvironment(
-		String taskName,
-		long memorySize,
-		MockInputSplitProvider inputSplitProvider,
-		int bufferSize, Configuration taskConfiguration,
-		ExecutionConfig executionConfig,
-		TaskStateManager taskStateManager) {
-		this(
-			taskName,
-			memorySize,
-			inputSplitProvider,
-			bufferSize,
-			taskConfiguration,
-			executionConfig,
-			taskStateManager,
-			1,
-			1,
-			0);
-	}
-
-	public MockEnvironment(
-			String taskName,
-			long memorySize,
-			MockInputSplitProvider inputSplitProvider,
-			int bufferSize,
-			Configuration taskConfiguration,
-			ExecutionConfig executionConfig,
-			TaskStateManager taskStateManager,
-			int maxParallelism,
-			int parallelism,
-			int subtaskIndex) {
-		this(
-			taskName,
-			memorySize,
-			inputSplitProvider,
-			bufferSize,
-			taskConfiguration,
-			executionConfig,
-			taskStateManager,
-			maxParallelism,
-			parallelism,
-			subtaskIndex,
-			Thread.currentThread().getContextClassLoader());
-
-	}
-
-	public MockEnvironment(
-			String taskName,
-			long memorySize,
-			MockInputSplitProvider inputSplitProvider,
-			int bufferSize,
-			Configuration taskConfiguration,
-			ExecutionConfig executionConfig,
-			TaskStateManager taskStateManager,
-			int maxParallelism,
-			int parallelism,
-			int subtaskIndex,
-			ClassLoader userCodeClassLoader) {
-		this(
-			new JobID(),
-			new JobVertexID(),
-			taskName,
-			memorySize,
-			inputSplitProvider,
-			bufferSize,
-			taskConfiguration,
-			executionConfig,
-			taskStateManager,
-			maxParallelism,
-			parallelism,
-			subtaskIndex,
-			userCodeClassLoader);
-	}
-
-	public MockEnvironment(
+	protected MockEnvironment(
 		JobID jobID,
 		JobVertexID jobVertexID,
 		String taskName,

http://git-wip-us.apache.org/repos/asf/flink/blob/4948e2ea/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
new file mode 100644
index 0000000..dfb10d4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.state.TaskStateManager;
+import org.apache.flink.runtime.state.TestTaskStateManager;
+
+public class MockEnvironmentBuilder {
+	private String taskName = "mock-task";
+	private long memorySize = 1024 * MemoryManager.DEFAULT_PAGE_SIZE;
+	private MockInputSplitProvider inputSplitProvider = null;
+	private int bufferSize = 16;
+	private TaskStateManager taskStateManager = new TestTaskStateManager();
+	private Configuration taskConfiguration = new Configuration();
+	private ExecutionConfig executionConfig = new ExecutionConfig();
+	private int maxParallelism = 1;
+	private int parallelism = 1;
+	private int subtaskIndex = 0;
+	private ClassLoader userCodeClassLoader = Thread.currentThread().getContextClassLoader();
+	private JobID jobID = new JobID();
+	private JobVertexID jobVertexID = new JobVertexID();
+
+	public MockEnvironmentBuilder setTaskName(String taskName) {
+		this.taskName = taskName;
+		return this;
+	}
+
+	public MockEnvironmentBuilder setMemorySize(long memorySize) {
+		this.memorySize = memorySize;
+		return this;
+	}
+
+	public MockEnvironmentBuilder setInputSplitProvider(MockInputSplitProvider inputSplitProvider) {
+		this.inputSplitProvider = inputSplitProvider;
+		return this;
+	}
+
+	public MockEnvironmentBuilder setBufferSize(int bufferSize) {
+		this.bufferSize = bufferSize;
+		return this;
+	}
+
+	public MockEnvironmentBuilder setTaskStateManager(TaskStateManager taskStateManager) {
+		this.taskStateManager = taskStateManager;
+		return this;
+	}
+
+	public MockEnvironmentBuilder setTaskConfiguration(Configuration taskConfiguration) {
+		this.taskConfiguration = taskConfiguration;
+		return this;
+	}
+
+	public MockEnvironmentBuilder setExecutionConfig(ExecutionConfig executionConfig) {
+		this.executionConfig = executionConfig;
+		return this;
+	}
+
+	public MockEnvironmentBuilder setMaxParallelism(int maxParallelism) {
+		this.maxParallelism = maxParallelism;
+		return this;
+	}
+
+	public MockEnvironmentBuilder setParallelism(int parallelism) {
+		this.parallelism = parallelism;
+		return this;
+	}
+
+	public MockEnvironmentBuilder setSubtaskIndex(int subtaskIndex) {
+		this.subtaskIndex = subtaskIndex;
+		return this;
+	}
+
+	public MockEnvironmentBuilder setUserCodeClassLoader(ClassLoader userCodeClassLoader) {
+		this.userCodeClassLoader = userCodeClassLoader;
+		return this;
+	}
+
+	public MockEnvironmentBuilder setJobID(JobID jobID) {
+		this.jobID = jobID;
+		return this;
+	}
+
+	public MockEnvironmentBuilder setJobVertexID(JobVertexID jobVertexID) {
+		this.jobVertexID = jobVertexID;
+		return this;
+	}
+
+	public MockEnvironment build() {
+		return new MockEnvironment(
+			jobID,
+			jobVertexID,
+			taskName,
+			memorySize,
+			inputSplitProvider,
+			bufferSize,
+			taskConfiguration,
+			executionConfig,
+			taskStateManager,
+			maxParallelism,
+			parallelism,
+			subtaskIndex,
+			userCodeClassLoader);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4948e2ea/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
index a40992c..16485ca 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
@@ -54,8 +54,12 @@ public abstract class TaskTestBase extends TestLogger {
 	public void initEnvironment(long memorySize, int bufferSize) {
 		this.memorySize = memorySize;
 		this.inputSplitProvider = new MockInputSplitProvider();
-		TestTaskStateManager taskStateManager = new TestTaskStateManager();
-		this.mockEnv = new MockEnvironment("mock task", this.memorySize, this.inputSplitProvider, bufferSize, taskStateManager);
+		this.mockEnv = new MockEnvironmentBuilder()
+			.setTaskName("mock task")
+			.setMemorySize(this.memorySize)
+			.setInputSplitProvider(this.inputSplitProvider)
+			.setBufferSize(bufferSize)
+			.build();
 	}
 
 	public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record> input, int groupId) {

http://git-wip-us.apache.org/repos/asf/flink/blob/4948e2ea/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
index 18c8ac5..84a45d8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -66,12 +66,11 @@ public class InputFormatSourceFunctionTest {
 		final LifeCycleTestInputFormat format = new LifeCycleTestInputFormat();
 		final InputFormatSourceFunction<Integer> reader = new InputFormatSourceFunction<>(format, TypeInformation.of(Integer.class));
 
-		try (MockEnvironment environment = new MockEnvironment(
-			"no",
-			4 * MemoryManager.DEFAULT_PAGE_SIZE,
-			null,
-			16,
-			new TestTaskStateManager())) {
+		try (MockEnvironment environment =
+				new MockEnvironmentBuilder()
+					.setTaskName("no")
+					.setMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE)
+					.build()) {
 
 			reader.setRuntimeContext(new MockRuntimeContext(format, noOfSplits, environment));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4948e2ea/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
index 6d011a3..a38ffa6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.api.operators;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -27,7 +26,6 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
@@ -36,6 +34,7 @@ import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
@@ -148,20 +147,18 @@ public class StreamOperatorSnapshotRestoreTest extends TestLogger {
 		LocalRecoveryConfig localRecoveryConfig =
 			new LocalRecoveryConfig(mode != ONLY_JM_RECOVERY, directoryProvider);
 
-		MockEnvironment mockEnvironment = new MockEnvironment(
-			jobID,
-			jobVertexID,
-			"test",
-			1024L * 1024L,
-			new MockInputSplitProvider(),
-			1024 * 1024,
-			new Configuration(),
-			new ExecutionConfig(),
-			new TestTaskStateManager(localRecoveryConfig),
-			MAX_PARALLELISM,
-			1,
-			subtaskIdx,
-			getClass().getClassLoader());
+		MockEnvironment mockEnvironment = new MockEnvironmentBuilder()
+			.setJobID(jobID)
+			.setJobVertexID(jobVertexID)
+			.setTaskName("test")
+			.setMemorySize(1024L * 1024L)
+			.setInputSplitProvider(new MockInputSplitProvider())
+			.setBufferSize(1024 * 1024)
+			.setTaskStateManager(new TestTaskStateManager(localRecoveryConfig))
+			.setMaxParallelism(MAX_PARALLELISM)
+			.setSubtaskIndex(subtaskIdx)
+			.setUserCodeClassLoader(getClass().getClassLoader())
+			.build();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(

http://git-wip-us.apache.org/repos/asf/flink/blob/4948e2ea/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 35e2fbd..cdd77d3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.streaming.api.datastream.AsyncDataStream;
@@ -650,12 +651,12 @@ public class AsyncWaitOperatorTest extends TestLogger {
 
 	@Nonnull
 	private MockEnvironment createMockEnvironment() {
-		return new MockEnvironment(
-			"foobarTask",
-			1024 * 1024L,
-			new MockInputSplitProvider(),
-			4 * 1024,
-			new TestTaskStateManager());
+		return new MockEnvironmentBuilder()
+			.setTaskName("foobarTask")
+			.setMemorySize(1024 * 1024L)
+			.setInputSplitProvider(new MockInputSplitProvider())
+			.setBufferSize(4 * 1024)
+			.build();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/4948e2ea/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
index e980ab7..fd6a953 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
@@ -25,8 +25,8 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SplitStream;
@@ -170,12 +170,12 @@ public class StreamOperatorChainingTest {
 	}
 
 	private MockEnvironment createMockEnvironment(String taskName) {
-		return new MockEnvironment(
-			taskName,
-			3 * 1024 * 1024,
-			new MockInputSplitProvider(),
-			1024,
-			new TestTaskStateManager());
+		return new MockEnvironmentBuilder()
+			.setTaskName(taskName)
+			.setMemorySize(3 * 1024 * 1024)
+			.setInputSplitProvider(new MockInputSplitProvider())
+			.setBufferSize(1024)
+			.build();
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/4948e2ea/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 73a575e..34d2395 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -62,6 +62,7 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
@@ -328,7 +329,7 @@ public class StreamTaskTest extends TestLogger {
 		TaskInfo mockTaskInfo = mock(TaskInfo.class);
 		when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar");
 		when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0);
-		Environment mockEnvironment = new MockEnvironment();
+		Environment mockEnvironment = new MockEnvironmentBuilder().build();
 
 		StreamTask<?, ?> streamTask = new EmptyStreamTask(mockEnvironment);
 		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
@@ -398,7 +399,7 @@ public class StreamTaskTest extends TestLogger {
 		final long checkpointId = 42L;
 		final long timestamp = 1L;
 
-		MockEnvironment mockEnvironment = new MockEnvironment();
+		MockEnvironment mockEnvironment = new MockEnvironmentBuilder().build();
 		StreamTask<?, ?> streamTask = spy(new EmptyStreamTask(mockEnvironment));
 		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
 
@@ -501,12 +502,10 @@ public class StreamTaskTest extends TestLogger {
 			null,
 			checkpointResponder);
 
-		MockEnvironment mockEnvironment = new MockEnvironment(
-			"mock-task",
-			1024 * MemoryManager.DEFAULT_PAGE_SIZE,
-			null,
-			16,
-			taskStateManager);
+		MockEnvironment mockEnvironment = new MockEnvironmentBuilder()
+			.setTaskName("mock-task")
+			.setTaskStateManager(taskStateManager)
+			.build();
 
 		StreamTask<?, ?> streamTask = new EmptyStreamTask(mockEnvironment);
 		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
@@ -599,7 +598,7 @@ public class StreamTaskTest extends TestLogger {
 		final OneShotLatch createSubtask = new OneShotLatch();
 		final OneShotLatch completeSubtask = new OneShotLatch();
 
-		Environment mockEnvironment = spy(new MockEnvironment());
+		Environment mockEnvironment = spy(new MockEnvironmentBuilder().build());
 
 		whenNew(OperatorSnapshotFinalizer.class).
 			withAnyArguments().
@@ -685,7 +684,7 @@ public class StreamTaskTest extends TestLogger {
 		final long checkpointId = 42L;
 		final long timestamp = 1L;
 
-		Environment mockEnvironment = spy(new MockEnvironment());
+		Environment mockEnvironment = spy(new MockEnvironmentBuilder().build());
 
 		// latch blocks until the async checkpoint thread acknowledges
 		final OneShotLatch checkpointCompletedLatch = new OneShotLatch();
@@ -765,14 +764,14 @@ public class StreamTaskTest extends TestLogger {
 		streamConfig.setStreamOperator(new BlockingCloseStreamOperator());
 		streamConfig.setOperatorID(new OperatorID());
 
-		try (MockEnvironment mockEnvironment = new MockEnvironment(
-				"Test Task",
-				32L * 1024L,
-				new MockInputSplitProvider(),
-				1,
-				taskConfiguration,
-				new ExecutionConfig(),
-				new TestTaskStateManager())) {
+		try (MockEnvironment mockEnvironment =
+				new MockEnvironmentBuilder()
+					.setTaskName("Test Task")
+					.setMemorySize(32L * 1024L)
+					.setInputSplitProvider(new MockInputSplitProvider())
+					.setBufferSize(1)
+					.setTaskConfiguration(taskConfiguration)
+					.build()) {
 			StreamTask<Void, BlockingCloseStreamOperator> streamTask = new NoOpStreamTask<>(mockEnvironment);
 			final AtomicReference<Throwable> atomicThrowable = new AtomicReference<>(null);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4948e2ea/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index ed2da18..26ad3ab 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
@@ -137,17 +138,15 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
 			int subtaskIndex) throws Exception {
 		this(
 			operator,
-			new MockEnvironment(
-				"MockTask",
-				3 * 1024 * 1024,
-				new MockInputSplitProvider(),
-				1024,
-				new Configuration(),
-				new ExecutionConfig(),
-				new TestTaskStateManager(),
-				maxParallelism,
-				parallelism,
-				subtaskIndex),
+			new MockEnvironmentBuilder()
+				.setTaskName("MockTask")
+				.setMemorySize(3 * 1024 * 1024)
+				.setInputSplitProvider(new MockInputSplitProvider())
+				.setBufferSize(1024)
+				.setMaxParallelism(maxParallelism)
+				.setParallelism(parallelism)
+				.setSubtaskIndex(subtaskIndex)
+				.build(),
 			true);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4948e2ea/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
index 3f54081..660a333 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
@@ -51,12 +51,13 @@ public class SourceFunctionUtil {
 	}
 
 	private static <T extends Serializable> List<T> runRichSourceFunction(SourceFunction<T> sourceFunction) throws Exception {
-		try (MockEnvironment environment = new MockEnvironment(
-			"MockTask",
-			3 * 1024 * 1024,
-			new MockInputSplitProvider(),
-			1024,
-			new TestTaskStateManager())) {
+		try (MockEnvironment environment =
+				new MockEnvironmentBuilder()
+					.setTaskName("MockTask")
+					.setMemorySize(3 * 1024 * 1024)
+					.setInputSplitProvider(new MockInputSplitProvider())
+					.setBufferSize(1024)
+					.build()) {
 
 			AbstractStreamOperator<?> operator = mock(AbstractStreamOperator.class);
 			when(operator.getExecutionConfig()).thenReturn(new ExecutionConfig());

http://git-wip-us.apache.org/repos/asf/flink/blob/4948e2ea/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
index aaa96fb..fe56782 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
@@ -37,12 +37,12 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.StateBackendLoader;
-import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamMap;
@@ -351,18 +351,17 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 			OperatorSubtaskState operatorSubtaskState,
 			Iterable<Long> input) throws Exception {
 
-		try (final MockEnvironment environment = new MockEnvironment(
-			"test task",
-			32 * 1024,
-			new MockInputSplitProvider(),
-			256,
-			taskConfiguration,
-			executionConfig,
-			new TestTaskStateManager(),
-			16,
-			1,
-			0,
-			classLoader)) {
+		try (final MockEnvironment environment =
+				new MockEnvironmentBuilder()
+					.setTaskName("test task")
+					.setMemorySize(32 * 1024)
+					.setInputSplitProvider(new MockInputSplitProvider())
+					.setBufferSize(256)
+					.setTaskConfiguration(taskConfiguration)
+					.setExecutionConfig(executionConfig)
+					.setMaxParallelism(16)
+					.setUserCodeClassLoader(classLoader)
+					.build()) {
 
 			OneInputStreamOperatorTestHarness<Long, Long> harness = null;
 			try {