You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/08 13:29:51 UTC

[flink] 01/02: [hotfix][tests] Avoid mock NetworkEnvironment in tests

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f392616f404a6471d99d64eeab5dd1dd3da1a4a0
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Thu Apr 25 17:05:21 2019 +0800

    [hotfix][tests] Avoid mock NetworkEnvironment in tests
---
 .../runtime/taskexecutor/TaskExecutorTest.java     |  15 +--
 .../runtime/taskmanager/TaskAsyncCallTest.java     |  17 +++-
 .../apache/flink/runtime/taskmanager/TaskTest.java | 107 +++++++++++----------
 .../runtime/util/JvmExitOnFatalErrorTest.java      |   3 +-
 .../tasks/InterruptSensitiveRestoreTest.java       |   3 +-
 .../runtime/tasks/StreamTaskTerminationTest.java   |   3 +-
 .../streaming/runtime/tasks/StreamTaskTest.java    |   4 +-
 .../runtime/tasks/SynchronousCheckpointITCase.java |   6 +-
 .../tasks/TaskCheckpointingBehaviourTest.java      |   3 +-
 9 files changed, 87 insertions(+), 74 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index b47ad53..bf57e2a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -197,6 +197,8 @@ public class TaskExecutorTest extends TestLogger {
 
 	private SettableLeaderRetrievalService jobManagerLeaderRetriever;
 
+	private NetworkEnvironment networkEnvironment;
+
 	@Before
 	public void setup() throws IOException {
 		rpc = new TestingRpcService();
@@ -220,6 +222,8 @@ public class TaskExecutorTest extends TestLogger {
 		jobManagerLeaderRetriever = new SettableLeaderRetrievalService();
 		haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever);
 		haServices.setJobMasterLeaderRetriever(jobId, jobManagerLeaderRetriever);
+
+		networkEnvironment = new NetworkEnvironmentBuilder().build();
 	}
 
 	@After
@@ -239,6 +243,10 @@ public class TaskExecutorTest extends TestLogger {
 			dummyBlobCacheService = null;
 		}
 
+		if (networkEnvironment != null) {
+			networkEnvironment.shutdown();
+		}
+
 		testingFatalErrorHandler.rethrowError();
 	}
 
@@ -262,7 +270,6 @@ public class TaskExecutorTest extends TestLogger {
 			MemoryType.HEAP,
 			false);
 
-		final NetworkEnvironment networkEnvironment = new NetworkEnvironmentBuilder().build();
 		networkEnvironment.start();
 
 		final KvStateService kvStateService = new KvStateService(new KvStateRegistry(), null, null);
@@ -699,8 +706,6 @@ public class TaskExecutorTest extends TestLogger {
 		when(taskSlotTable.tryMarkSlotActive(eq(jobId), eq(allocationId))).thenReturn(true);
 		when(taskSlotTable.addTask(any(Task.class))).thenReturn(true);
 
-		final NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
-
 		final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
 
 		final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
@@ -958,13 +963,11 @@ public class TaskExecutorTest extends TestLogger {
 		rpc.registerGateway(resourceManagerGateway.getAddress(), resourceManagerGateway);
 		rpc.registerGateway(jobMasterGateway.getAddress(), jobMasterGateway);
 
-		final NetworkEnvironment networkMock = mock(NetworkEnvironment.class, Mockito.RETURNS_MOCKS);
-
 		final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
 
 		final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
 			.setTaskManagerLocation(taskManagerLocation)
-			.setNetworkEnvironment(networkMock)
+			.setNetworkEnvironment(networkEnvironment)
 			.setTaskSlotTable(taskSlotTable)
 			.setJobLeaderService(jobLeaderService)
 			.setJobManagerTable(jobManagerTable)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 9e06da0..1f743e4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -41,11 +41,11 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -60,6 +60,7 @@ import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -105,6 +106,8 @@ public class TaskAsyncCallTest extends TestLogger {
 
 	private static final List<ClassLoader> classLoaders = Collections.synchronizedList(new ArrayList<>());
 
+	private NetworkEnvironment networkEnvironment;
+
 	@Before
 	public void createQueuesAndActors() {
 		numCalls = 1000;
@@ -114,9 +117,17 @@ public class TaskAsyncCallTest extends TestLogger {
 		notifyCheckpointCompleteLatch = new OneShotLatch();
 		stopLatch = new OneShotLatch();
 
+		networkEnvironment = new NetworkEnvironmentBuilder().build();
+
 		classLoaders.clear();
 	}
 
+	@After
+	public void teardown() {
+		if (networkEnvironment != null) {
+			networkEnvironment.shutdown();
+		}
+	}
 
 	// ------------------------------------------------------------------------
 	//  Tests 
@@ -211,13 +222,9 @@ public class TaskAsyncCallTest extends TestLogger {
 		LibraryCacheManager libCache = mock(LibraryCacheManager.class);
 		when(libCache.getClassLoader(any(JobID.class))).thenReturn(new TestUserCodeClassLoader());
 
-		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 		ResultPartitionConsumableNotifier consumableNotifier = new NoOpResultPartitionConsumableNotifier();
 		PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
 		Executor executor = mock(Executor.class);
-		NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
-		when(networkEnvironment.getResultPartitionManager()).thenReturn(partitionManager);
-
 		TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
 
 		JobInformation jobInformation = new JobInformation(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 6a4ac80..4c878db 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -45,12 +45,12 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -67,10 +67,12 @@ import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.WrappingRuntimeException;
 
+import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -99,7 +101,6 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -114,6 +115,8 @@ public class TaskTest extends TestLogger {
 	private static OneShotLatch awaitLatch;
 	private static OneShotLatch triggerLatch;
 
+	private NetworkEnvironment networkEnvironment;
+
 	@ClassRule
 	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
 
@@ -121,12 +124,21 @@ public class TaskTest extends TestLogger {
 	public void setup() {
 		awaitLatch = new OneShotLatch();
 		triggerLatch = new OneShotLatch();
+
+		networkEnvironment = new NetworkEnvironmentBuilder().build();
+	}
+
+	@After
+	public void teardown() {
+		if (networkEnvironment != null) {
+			networkEnvironment.shutdown();
+		}
 	}
 
 	@Test
 	public void testRegularExecution() throws Exception {
 		final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setTaskManagerActions(taskManagerActions)
 			.build();
 
@@ -151,7 +163,7 @@ public class TaskTest extends TestLogger {
 
 	@Test
 	public void testCancelRightAway() throws Exception {
-		final Task task = new TaskBuilder().build();
+		final Task task = createTaskBuilder().build();
 		task.cancelExecution();
 
 		assertEquals(ExecutionState.CANCELING, task.getExecutionState());
@@ -166,7 +178,7 @@ public class TaskTest extends TestLogger {
 
 	@Test
 	public void testFailExternallyRightAway() throws Exception {
-		final Task task = new TaskBuilder().build();
+		final Task task = createTaskBuilder().build();
 		task.failExternally(new Exception("fail externally"));
 
 		assertEquals(ExecutionState.FAILED, task.getExecutionState());
@@ -180,7 +192,7 @@ public class TaskTest extends TestLogger {
 	@Test
 	public void testLibraryCacheRegistrationFailed() throws Exception {
 		final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setTaskManagerActions(taskManagerActions)
 			.setLibraryCacheManager(mock(LibraryCacheManager.class)) // inactive manager
 			.build();
@@ -226,7 +238,7 @@ public class TaskTest extends TestLogger {
 				FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
 				new String[0]);
 
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setRequiredJarFileBlobKeys(Collections.singletonList(missingKey))
 			.setLibraryCacheManager(libraryCacheManager)
 			.build();
@@ -251,39 +263,35 @@ public class TaskTest extends TestLogger {
 
 	@Test
 	public void testExecutionFailsInNetworkRegistration() throws Exception {
-		// mock a network manager that rejects registration
-		final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 		final ResultPartitionConsumableNotifier consumableNotifier = new NoOpResultPartitionConsumableNotifier();
 		final PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
 
-		final NetworkEnvironment network = mock(NetworkEnvironment.class);
-		when(network.getResultPartitionManager()).thenReturn(partitionManager);
-		doThrow(new RuntimeException("buffers")).when(network).registerTask(any(Task.class));
-
 		final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
-		final Task task = new TaskBuilder()
+		final Task task = new TaskBuilder(networkEnvironment)
 			.setTaskManagerActions(taskManagerActions)
 			.setConsumableNotifier(consumableNotifier)
 			.setPartitionProducerStateChecker(partitionProducerStateChecker)
-			.setNetworkEnvironment(network)
 			.build();
 
+		// shut down the network to make the following task registration failure
+		networkEnvironment.shutdown();
+
 		// should fail
 		task.run();
 
 		// verify final state
 		assertEquals(ExecutionState.FAILED, task.getExecutionState());
 		assertTrue(task.isCanceledOrFailed());
-		assertTrue(task.getFailureCause().getMessage().contains("buffers"));
+		assertTrue(task.getFailureCause().getMessage().contains("NetworkEnvironment is shut down"));
 
 		taskManagerActions.validateListenerMessage(
-			ExecutionState.FAILED, task, new RuntimeException("buffers"));
+			ExecutionState.FAILED, task, new IllegalStateException("NetworkEnvironment is shut down"));
 	}
 
 	@Test
 	public void testInvokableInstantiationFailed() throws Exception {
 		final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setTaskManagerActions(taskManagerActions)
 			.setInvokable(InvokableNonInstantiable.class)
 			.build();
@@ -303,7 +311,7 @@ public class TaskTest extends TestLogger {
 	@Test
 	public void testExecutionFailsInInvoke() throws Exception {
 		final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setInvokable(InvokableWithExceptionInInvoke.class)
 			.setTaskManagerActions(taskManagerActions)
 			.build();
@@ -323,7 +331,7 @@ public class TaskTest extends TestLogger {
 	@Test
 	public void testFailWithWrappedException() throws Exception {
 		final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setInvokable(FailingInvokableWithChainedException.class)
 			.setTaskManagerActions(taskManagerActions)
 			.build();
@@ -343,7 +351,7 @@ public class TaskTest extends TestLogger {
 	@Test
 	public void testCancelDuringInvoke() throws Exception {
 		final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setInvokable(InvokableBlockingInInvoke.class)
 			.setTaskManagerActions(taskManagerActions)
 			.build();
@@ -371,7 +379,7 @@ public class TaskTest extends TestLogger {
 	@Test
 	public void testFailExternallyDuringInvoke() throws Exception {
 		final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setInvokable(InvokableBlockingInInvoke.class)
 			.setTaskManagerActions(taskManagerActions)
 			.build();
@@ -397,7 +405,7 @@ public class TaskTest extends TestLogger {
 	@Test
 	public void testCanceledAfterExecutionFailedInInvoke() throws Exception {
 		final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setInvokable(InvokableWithExceptionInInvoke.class)
 			.setTaskManagerActions(taskManagerActions)
 			.build();
@@ -418,7 +426,7 @@ public class TaskTest extends TestLogger {
 	@Test
 	public void testExecutionFailsAfterCanceling() throws Exception {
 		final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setInvokable(InvokableWithExceptionOnTrigger.class)
 			.setTaskManagerActions(taskManagerActions)
 			.build();
@@ -449,7 +457,7 @@ public class TaskTest extends TestLogger {
 	@Test
 	public void testExecutionFailsAfterTaskMarkedFailed() throws Exception {
 		final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setInvokable(InvokableWithExceptionOnTrigger.class)
 			.setTaskManagerActions(taskManagerActions)
 			.build();
@@ -478,7 +486,7 @@ public class TaskTest extends TestLogger {
 
 	@Test
 	public void testCancelTaskException() throws Exception {
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setInvokable(InvokableWithCancelTaskExceptionInInvoke.class)
 			.build();
 
@@ -492,7 +500,7 @@ public class TaskTest extends TestLogger {
 
 	@Test
 	public void testCancelTaskExceptionAfterTaskMarkedFailed() throws Exception {
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setInvokable(InvokableWithCancelTaskExceptionInInvoke.class)
 			.build();
 
@@ -523,7 +531,7 @@ public class TaskTest extends TestLogger {
 		final SingleInputGate inputGate = mock(SingleInputGate.class);
 		when(inputGate.getConsumedResultId()).thenReturn(resultId);
 
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setInvokable(InvokableBlockingInInvoke.class)
 			.build();
 
@@ -571,8 +579,6 @@ public class TaskTest extends TestLogger {
 		final PartitionProducerStateChecker partitionChecker = mock(PartitionProducerStateChecker.class);
 
 		final ResultPartitionConsumableNotifier consumableNotifier = new NoOpResultPartitionConsumableNotifier();
-		final NetworkEnvironment network = mock(NetworkEnvironment.class);
-		when(network.getResultPartitionManager()).thenReturn(mock(ResultPartitionManager.class));
 
 		// Test all branches of trigger partition state check
 		{
@@ -580,9 +586,8 @@ public class TaskTest extends TestLogger {
 			setup();
 
 			// PartitionProducerDisposedException
-			final Task task =  new TaskBuilder()
+			final Task task = createTaskBuilder()
 				.setInvokable(InvokableBlockingInInvoke.class)
-				.setNetworkEnvironment(network)
 				.setConsumableNotifier(consumableNotifier)
 				.setPartitionProducerStateChecker(partitionChecker)
 				.setExecutor(Executors.directExecutor())
@@ -602,9 +607,8 @@ public class TaskTest extends TestLogger {
 			setup();
 
 			// Any other exception
-			final Task task =  new TaskBuilder()
+			final Task task = createTaskBuilder()
 				.setInvokable(InvokableBlockingInInvoke.class)
-				.setNetworkEnvironment(network)
 				.setConsumableNotifier(consumableNotifier)
 				.setPartitionProducerStateChecker(partitionChecker)
 				.setExecutor(Executors.directExecutor())
@@ -626,9 +630,8 @@ public class TaskTest extends TestLogger {
 
 			// TimeoutException handled special => retry
 			// Any other exception
-			final Task task =  new TaskBuilder()
+			final Task task = createTaskBuilder()
 				.setInvokable(InvokableBlockingInInvoke.class)
-				.setNetworkEnvironment(network)
 				.setConsumableNotifier(consumableNotifier)
 				.setPartitionProducerStateChecker(partitionChecker)
 				.setExecutor(Executors.directExecutor())
@@ -664,9 +667,8 @@ public class TaskTest extends TestLogger {
 			setup();
 
 			// Success
-			final Task task =  new TaskBuilder()
+			final Task task =  createTaskBuilder()
 				.setInvokable(InvokableBlockingInInvoke.class)
-				.setNetworkEnvironment(network)
 				.setConsumableNotifier(consumableNotifier)
 				.setPartitionProducerStateChecker(partitionChecker)
 				.setExecutor(Executors.directExecutor())
@@ -711,7 +713,7 @@ public class TaskTest extends TestLogger {
 		config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL.key(), 5);
 		config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT.key(), 60 * 1000);
 
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setInvokable(InvokableBlockingInCancel.class)
 			.setTaskManagerConfig(config)
 			.setTaskManagerActions(taskManagerActions)
@@ -738,7 +740,7 @@ public class TaskTest extends TestLogger {
 		config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, 5);
 		config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 50);
 
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setInvokable(InvokableInterruptibleSharedLockInInvokeAndCancel.class)
 			.setTaskManagerConfig(config)
 			.setTaskManagerActions(taskManagerActions)
@@ -765,7 +767,7 @@ public class TaskTest extends TestLogger {
 		config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, 5);
 		config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 50);
 
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setInvokable(InvokableUnInterruptibleBlockingInvoke.class)
 			.setTaskManagerConfig(config)
 			.setTaskManagerActions(taskManagerActions)
@@ -804,7 +806,7 @@ public class TaskTest extends TestLogger {
 		executionConfig.setTaskCancellationInterval(interval + 1337);
 		executionConfig.setTaskCancellationTimeout(timeout - 1337);
 
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setInvokable(InvokableBlockingInInvoke.class)
 			.setTaskManagerConfig(config)
 			.setExecutionConfig(executionConfig)
@@ -917,13 +919,17 @@ public class TaskTest extends TestLogger {
 		}
 	}
 
-	private final class TaskBuilder {
+	private TaskBuilder createTaskBuilder() {
+		return new TaskBuilder(networkEnvironment);
+	}
+
+	private static final class TaskBuilder {
 		private Class<? extends AbstractInvokable> invokable;
 		private TaskManagerActions taskManagerActions;
 		private LibraryCacheManager libraryCacheManager;
 		private ResultPartitionConsumableNotifier consumableNotifier;
 		private PartitionProducerStateChecker partitionProducerStateChecker;
-		private NetworkEnvironment networkEnvironment;
+		private final NetworkEnvironment networkEnvironment;
 		private KvStateService kvStateService;
 		private Executor executor;
 		private Configuration taskManagerConfig;
@@ -940,10 +946,6 @@ public class TaskTest extends TestLogger {
 			consumableNotifier = new NoOpResultPartitionConsumableNotifier();
 			partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
 
-			final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
-			networkEnvironment = mock(NetworkEnvironment.class);
-			when(networkEnvironment.getResultPartitionManager()).thenReturn(partitionManager);
-
 			kvStateService = new KvStateService(new KvStateRegistry(), null, null);
 
 			executor = TestingUtils.defaultExecutor();
@@ -954,6 +956,10 @@ public class TaskTest extends TestLogger {
 			requiredJarFileBlobKeys = Collections.emptyList();
 		}
 
+		private TaskBuilder(NetworkEnvironment networkEnvironment) {
+			this.networkEnvironment = Preconditions.checkNotNull(networkEnvironment);
+		}
+
 		TaskBuilder setInvokable(Class<? extends AbstractInvokable> invokable) {
 			this.invokable = invokable;
 			return this;
@@ -979,11 +985,6 @@ public class TaskTest extends TestLogger {
 			return this;
 		}
 
-		TaskBuilder setNetworkEnvironment(NetworkEnvironment networkEnvironment) {
-			this.networkEnvironment = networkEnvironment;
-			return this;
-		}
-
 		TaskBuilder setKvStateService(KvStateService kvStateService) {
 			this.kvStateService = kvStateService;
 			return this;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index cff1a26..1b2ec49 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
@@ -167,7 +168,7 @@ public class JvmExitOnFatalErrorTest {
 				final MemoryManager memoryManager = new MemoryManager(1024 * 1024, 1);
 				final IOManager ioManager = new IOManagerAsync();
 
-				final NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
+				final NetworkEnvironment networkEnvironment = new NetworkEnvironmentBuilder().build();
 
 				final TaskManagerRuntimeInfo tmInfo = TaskManagerConfiguration.fromConfiguration(taskManagerConfig);
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 9e1e793..24b1047 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
@@ -180,7 +181,7 @@ public class InterruptSensitiveRestoreTest {
 			StreamStateHandle state,
 			int mode) throws IOException {
 
-		NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
+		NetworkEnvironment networkEnvironment = new NetworkEnvironmentBuilder().build();
 
 		Collection<KeyedStateHandle> keyedStateFromBackend = Collections.emptyList();
 		Collection<KeyedStateHandle> keyedStateFromStream = Collections.emptyList();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index 566e740..8918b0a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
@@ -149,7 +150,7 @@ public class StreamTaskTerminationTest extends TestLogger {
 
 		final TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo();
 
-		final NetworkEnvironment networkEnv = mock(NetworkEnvironment.class);
+		final NetworkEnvironment networkEnv = new NetworkEnvironmentBuilder().build();
 
 		BlobCacheService blobService =
 			new BlobCacheService(mock(PermanentBlobCache.class), mock(TransientBlobCache.class));
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 2fe91e0..e24949d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -50,6 +50,7 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
@@ -899,8 +900,7 @@ public class StreamTaskTest extends TestLogger {
 		PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
 		Executor executor = mock(Executor.class);
 
-		NetworkEnvironment network = mock(NetworkEnvironment.class);
-		when(network.getResultPartitionManager()).thenReturn(partitionManager);
+		NetworkEnvironment network = new NetworkEnvironmentBuilder().build();
 
 		JobInformation jobInformation = new JobInformation(
 			new JobID(),
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
index 32b3392..222133a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
@@ -43,11 +43,11 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -259,12 +259,10 @@ public class SynchronousCheckpointITCase {
 		LibraryCacheManager libCache = mock(LibraryCacheManager.class);
 		when(libCache.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader());
 
-		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 		ResultPartitionConsumableNotifier consumableNotifier = new NoOpResultPartitionConsumableNotifier();
 		PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
 		Executor executor = mock(Executor.class);
-		NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
-		when(networkEnvironment.getResultPartitionManager()).thenReturn(partitionManager);
+		NetworkEnvironment networkEnvironment = new NetworkEnvironmentBuilder().build();
 
 		TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index 6733858..244c8aa 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
@@ -223,7 +224,7 @@ public class TaskCheckpointingBehaviourTest extends TestLogger {
 				TestStreamTask.class.getName(),
 				taskConfig);
 
-		NetworkEnvironment network = mock(NetworkEnvironment.class);
+		NetworkEnvironment network = new NetworkEnvironmentBuilder().build();
 
 		BlobCacheService blobService =
 			new BlobCacheService(mock(PermanentBlobCache.class), mock(TransientBlobCache.class));