You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/08 13:29:50 UTC

[flink] branch master updated (b9704be -> aedc4f5)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from b9704be  [FLINK-12231][runtime] Introduce SchedulerNG interface
     new f392616  [hotfix][tests] Avoid mock NetworkEnvironment in tests
     new aedc4f5  [FLINK-11726][network] Refactor the creations of ResultPartition and SingleInputGate into NetworkEnvironment

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/io/network/NetworkEnvironment.java     | 113 ++++++++++++++++++-
 .../network/metrics/InputBufferPoolUsageGauge.java |  33 +++---
 .../io/network/metrics/InputBuffersGauge.java      |  27 ++---
 .../consumer => metrics}/InputChannelMetrics.java  |  12 +-
 .../consumer => metrics}/InputGateMetrics.java     |  20 ++--
 .../OutputBufferPoolUsageGauge.java}               |  47 ++++----
 .../network/metrics/OutputBuffersGauge.java}       |  31 ++++--
 .../ResultPartitionMetrics.java                    |  19 ++--
 .../io/network/partition/ResultPartition.java      |   2 +-
 .../partition/consumer/LocalInputChannel.java      |   1 +
 .../partition/consumer/RemoteInputChannel.java     |   1 +
 .../partition/consumer/SingleInputGate.java        |   3 +-
 .../partition/consumer/UnknownInputChannel.java    |   1 +
 .../runtime/metrics/groups/TaskIOMetricGroup.java  | 124 ---------------------
 .../runtime/taskexecutor/TaskManagerServices.java  |   8 +-
 .../NetworkEnvironmentConfiguration.java           |  11 ++
 .../org/apache/flink/runtime/taskmanager/Task.java | 100 ++++++-----------
 .../io/network/NetworkEnvironmentBuilder.java      |  15 ++-
 .../network/partition/InputChannelTestUtils.java   |   2 +-
 .../partition/consumer/SingleInputGateTest.java    |   1 +
 .../runtime/taskexecutor/TaskExecutorTest.java     |  15 ++-
 .../runtime/taskmanager/TaskAsyncCallTest.java     |  17 ++-
 .../apache/flink/runtime/taskmanager/TaskTest.java | 107 +++++++++---------
 .../runtime/util/JvmExitOnFatalErrorTest.java      |   3 +-
 .../tasks/InterruptSensitiveRestoreTest.java       |   3 +-
 .../runtime/tasks/StreamTaskTerminationTest.java   |   3 +-
 .../streaming/runtime/tasks/StreamTaskTest.java    |   4 +-
 .../runtime/tasks/SynchronousCheckpointITCase.java |   6 +-
 .../tasks/TaskCheckpointingBehaviourTest.java      |   3 +-
 29 files changed, 374 insertions(+), 358 deletions(-)
 copy flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java => flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBufferPoolUsageGauge.java (52%)
 copy flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java => flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBuffersGauge.java (59%)
 rename flink-runtime/src/main/java/org/apache/flink/runtime/io/network/{partition/consumer => metrics}/InputChannelMetrics.java (87%)
 rename flink-runtime/src/main/java/org/apache/flink/runtime/io/network/{partition/consumer => metrics}/InputGateMetrics.java (86%)
 copy flink-runtime/src/main/java/org/apache/flink/runtime/io/network/{LocalConnectionManager.java => metrics/OutputBufferPoolUsageGauge.java} (52%)
 copy flink-runtime/src/main/java/org/apache/flink/runtime/{taskmanager/NoOpTaskActions.java => io/network/metrics/OutputBuffersGauge.java} (58%)
 rename flink-runtime/src/main/java/org/apache/flink/runtime/io/network/{partition => metrics}/ResultPartitionMetrics.java (85%)


[flink] 01/02: [hotfix][tests] Avoid mock NetworkEnvironment in tests

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f392616f404a6471d99d64eeab5dd1dd3da1a4a0
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Thu Apr 25 17:05:21 2019 +0800

    [hotfix][tests] Avoid mock NetworkEnvironment in tests
---
 .../runtime/taskexecutor/TaskExecutorTest.java     |  15 +--
 .../runtime/taskmanager/TaskAsyncCallTest.java     |  17 +++-
 .../apache/flink/runtime/taskmanager/TaskTest.java | 107 +++++++++++----------
 .../runtime/util/JvmExitOnFatalErrorTest.java      |   3 +-
 .../tasks/InterruptSensitiveRestoreTest.java       |   3 +-
 .../runtime/tasks/StreamTaskTerminationTest.java   |   3 +-
 .../streaming/runtime/tasks/StreamTaskTest.java    |   4 +-
 .../runtime/tasks/SynchronousCheckpointITCase.java |   6 +-
 .../tasks/TaskCheckpointingBehaviourTest.java      |   3 +-
 9 files changed, 87 insertions(+), 74 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index b47ad53..bf57e2a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -197,6 +197,8 @@ public class TaskExecutorTest extends TestLogger {
 
 	private SettableLeaderRetrievalService jobManagerLeaderRetriever;
 
+	private NetworkEnvironment networkEnvironment;
+
 	@Before
 	public void setup() throws IOException {
 		rpc = new TestingRpcService();
@@ -220,6 +222,8 @@ public class TaskExecutorTest extends TestLogger {
 		jobManagerLeaderRetriever = new SettableLeaderRetrievalService();
 		haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever);
 		haServices.setJobMasterLeaderRetriever(jobId, jobManagerLeaderRetriever);
+
+		networkEnvironment = new NetworkEnvironmentBuilder().build();
 	}
 
 	@After
@@ -239,6 +243,10 @@ public class TaskExecutorTest extends TestLogger {
 			dummyBlobCacheService = null;
 		}
 
+		if (networkEnvironment != null) {
+			networkEnvironment.shutdown();
+		}
+
 		testingFatalErrorHandler.rethrowError();
 	}
 
@@ -262,7 +270,6 @@ public class TaskExecutorTest extends TestLogger {
 			MemoryType.HEAP,
 			false);
 
-		final NetworkEnvironment networkEnvironment = new NetworkEnvironmentBuilder().build();
 		networkEnvironment.start();
 
 		final KvStateService kvStateService = new KvStateService(new KvStateRegistry(), null, null);
@@ -699,8 +706,6 @@ public class TaskExecutorTest extends TestLogger {
 		when(taskSlotTable.tryMarkSlotActive(eq(jobId), eq(allocationId))).thenReturn(true);
 		when(taskSlotTable.addTask(any(Task.class))).thenReturn(true);
 
-		final NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
-
 		final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
 
 		final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
@@ -958,13 +963,11 @@ public class TaskExecutorTest extends TestLogger {
 		rpc.registerGateway(resourceManagerGateway.getAddress(), resourceManagerGateway);
 		rpc.registerGateway(jobMasterGateway.getAddress(), jobMasterGateway);
 
-		final NetworkEnvironment networkMock = mock(NetworkEnvironment.class, Mockito.RETURNS_MOCKS);
-
 		final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
 
 		final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
 			.setTaskManagerLocation(taskManagerLocation)
-			.setNetworkEnvironment(networkMock)
+			.setNetworkEnvironment(networkEnvironment)
 			.setTaskSlotTable(taskSlotTable)
 			.setJobLeaderService(jobLeaderService)
 			.setJobManagerTable(jobManagerTable)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 9e06da0..1f743e4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -41,11 +41,11 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -60,6 +60,7 @@ import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -105,6 +106,8 @@ public class TaskAsyncCallTest extends TestLogger {
 
 	private static final List<ClassLoader> classLoaders = Collections.synchronizedList(new ArrayList<>());
 
+	private NetworkEnvironment networkEnvironment;
+
 	@Before
 	public void createQueuesAndActors() {
 		numCalls = 1000;
@@ -114,9 +117,17 @@ public class TaskAsyncCallTest extends TestLogger {
 		notifyCheckpointCompleteLatch = new OneShotLatch();
 		stopLatch = new OneShotLatch();
 
+		networkEnvironment = new NetworkEnvironmentBuilder().build();
+
 		classLoaders.clear();
 	}
 
+	@After
+	public void teardown() {
+		if (networkEnvironment != null) {
+			networkEnvironment.shutdown();
+		}
+	}
 
 	// ------------------------------------------------------------------------
 	//  Tests 
@@ -211,13 +222,9 @@ public class TaskAsyncCallTest extends TestLogger {
 		LibraryCacheManager libCache = mock(LibraryCacheManager.class);
 		when(libCache.getClassLoader(any(JobID.class))).thenReturn(new TestUserCodeClassLoader());
 
-		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 		ResultPartitionConsumableNotifier consumableNotifier = new NoOpResultPartitionConsumableNotifier();
 		PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
 		Executor executor = mock(Executor.class);
-		NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
-		when(networkEnvironment.getResultPartitionManager()).thenReturn(partitionManager);
-
 		TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
 
 		JobInformation jobInformation = new JobInformation(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 6a4ac80..4c878db 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -45,12 +45,12 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -67,10 +67,12 @@ import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.WrappingRuntimeException;
 
+import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -99,7 +101,6 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -114,6 +115,8 @@ public class TaskTest extends TestLogger {
 	private static OneShotLatch awaitLatch;
 	private static OneShotLatch triggerLatch;
 
+	private NetworkEnvironment networkEnvironment;
+
 	@ClassRule
 	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
 
@@ -121,12 +124,21 @@ public class TaskTest extends TestLogger {
 	public void setup() {
 		awaitLatch = new OneShotLatch();
 		triggerLatch = new OneShotLatch();
+
+		networkEnvironment = new NetworkEnvironmentBuilder().build();
+	}
+
+	@After
+	public void teardown() {
+		if (networkEnvironment != null) {
+			networkEnvironment.shutdown();
+		}
 	}
 
 	@Test
 	public void testRegularExecution() throws Exception {
 		final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setTaskManagerActions(taskManagerActions)
 			.build();
 
@@ -151,7 +163,7 @@ public class TaskTest extends TestLogger {
 
 	@Test
 	public void testCancelRightAway() throws Exception {
-		final Task task = new TaskBuilder().build();
+		final Task task = createTaskBuilder().build();
 		task.cancelExecution();
 
 		assertEquals(ExecutionState.CANCELING, task.getExecutionState());
@@ -166,7 +178,7 @@ public class TaskTest extends TestLogger {
 
 	@Test
 	public void testFailExternallyRightAway() throws Exception {
-		final Task task = new TaskBuilder().build();
+		final Task task = createTaskBuilder().build();
 		task.failExternally(new Exception("fail externally"));
 
 		assertEquals(ExecutionState.FAILED, task.getExecutionState());
@@ -180,7 +192,7 @@ public class TaskTest extends TestLogger {
 	@Test
 	public void testLibraryCacheRegistrationFailed() throws Exception {
 		final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setTaskManagerActions(taskManagerActions)
 			.setLibraryCacheManager(mock(LibraryCacheManager.class)) // inactive manager
 			.build();
@@ -226,7 +238,7 @@ public class TaskTest extends TestLogger {
 				FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
 				new String[0]);
 
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setRequiredJarFileBlobKeys(Collections.singletonList(missingKey))
 			.setLibraryCacheManager(libraryCacheManager)
 			.build();
@@ -251,39 +263,35 @@ public class TaskTest extends TestLogger {
 
 	@Test
 	public void testExecutionFailsInNetworkRegistration() throws Exception {
-		// mock a network manager that rejects registration
-		final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 		final ResultPartitionConsumableNotifier consumableNotifier = new NoOpResultPartitionConsumableNotifier();
 		final PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
 
-		final NetworkEnvironment network = mock(NetworkEnvironment.class);
-		when(network.getResultPartitionManager()).thenReturn(partitionManager);
-		doThrow(new RuntimeException("buffers")).when(network).registerTask(any(Task.class));
-
 		final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
-		final Task task = new TaskBuilder()
+		final Task task = new TaskBuilder(networkEnvironment)
 			.setTaskManagerActions(taskManagerActions)
 			.setConsumableNotifier(consumableNotifier)
 			.setPartitionProducerStateChecker(partitionProducerStateChecker)
-			.setNetworkEnvironment(network)
 			.build();
 
+		// shut down the network to make the following task registration failure
+		networkEnvironment.shutdown();
+
 		// should fail
 		task.run();
 
 		// verify final state
 		assertEquals(ExecutionState.FAILED, task.getExecutionState());
 		assertTrue(task.isCanceledOrFailed());
-		assertTrue(task.getFailureCause().getMessage().contains("buffers"));
+		assertTrue(task.getFailureCause().getMessage().contains("NetworkEnvironment is shut down"));
 
 		taskManagerActions.validateListenerMessage(
-			ExecutionState.FAILED, task, new RuntimeException("buffers"));
+			ExecutionState.FAILED, task, new IllegalStateException("NetworkEnvironment is shut down"));
 	}
 
 	@Test
 	public void testInvokableInstantiationFailed() throws Exception {
 		final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setTaskManagerActions(taskManagerActions)
 			.setInvokable(InvokableNonInstantiable.class)
 			.build();
@@ -303,7 +311,7 @@ public class TaskTest extends TestLogger {
 	@Test
 	public void testExecutionFailsInInvoke() throws Exception {
 		final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setInvokable(InvokableWithExceptionInInvoke.class)
 			.setTaskManagerActions(taskManagerActions)
 			.build();
@@ -323,7 +331,7 @@ public class TaskTest extends TestLogger {
 	@Test
 	public void testFailWithWrappedException() throws Exception {
 		final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setInvokable(FailingInvokableWithChainedException.class)
 			.setTaskManagerActions(taskManagerActions)
 			.build();
@@ -343,7 +351,7 @@ public class TaskTest extends TestLogger {
 	@Test
 	public void testCancelDuringInvoke() throws Exception {
 		final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setInvokable(InvokableBlockingInInvoke.class)
 			.setTaskManagerActions(taskManagerActions)
 			.build();
@@ -371,7 +379,7 @@ public class TaskTest extends TestLogger {
 	@Test
 	public void testFailExternallyDuringInvoke() throws Exception {
 		final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setInvokable(InvokableBlockingInInvoke.class)
 			.setTaskManagerActions(taskManagerActions)
 			.build();
@@ -397,7 +405,7 @@ public class TaskTest extends TestLogger {
 	@Test
 	public void testCanceledAfterExecutionFailedInInvoke() throws Exception {
 		final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setInvokable(InvokableWithExceptionInInvoke.class)
 			.setTaskManagerActions(taskManagerActions)
 			.build();
@@ -418,7 +426,7 @@ public class TaskTest extends TestLogger {
 	@Test
 	public void testExecutionFailsAfterCanceling() throws Exception {
 		final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setInvokable(InvokableWithExceptionOnTrigger.class)
 			.setTaskManagerActions(taskManagerActions)
 			.build();
@@ -449,7 +457,7 @@ public class TaskTest extends TestLogger {
 	@Test
 	public void testExecutionFailsAfterTaskMarkedFailed() throws Exception {
 		final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setInvokable(InvokableWithExceptionOnTrigger.class)
 			.setTaskManagerActions(taskManagerActions)
 			.build();
@@ -478,7 +486,7 @@ public class TaskTest extends TestLogger {
 
 	@Test
 	public void testCancelTaskException() throws Exception {
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setInvokable(InvokableWithCancelTaskExceptionInInvoke.class)
 			.build();
 
@@ -492,7 +500,7 @@ public class TaskTest extends TestLogger {
 
 	@Test
 	public void testCancelTaskExceptionAfterTaskMarkedFailed() throws Exception {
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setInvokable(InvokableWithCancelTaskExceptionInInvoke.class)
 			.build();
 
@@ -523,7 +531,7 @@ public class TaskTest extends TestLogger {
 		final SingleInputGate inputGate = mock(SingleInputGate.class);
 		when(inputGate.getConsumedResultId()).thenReturn(resultId);
 
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setInvokable(InvokableBlockingInInvoke.class)
 			.build();
 
@@ -571,8 +579,6 @@ public class TaskTest extends TestLogger {
 		final PartitionProducerStateChecker partitionChecker = mock(PartitionProducerStateChecker.class);
 
 		final ResultPartitionConsumableNotifier consumableNotifier = new NoOpResultPartitionConsumableNotifier();
-		final NetworkEnvironment network = mock(NetworkEnvironment.class);
-		when(network.getResultPartitionManager()).thenReturn(mock(ResultPartitionManager.class));
 
 		// Test all branches of trigger partition state check
 		{
@@ -580,9 +586,8 @@ public class TaskTest extends TestLogger {
 			setup();
 
 			// PartitionProducerDisposedException
-			final Task task =  new TaskBuilder()
+			final Task task = createTaskBuilder()
 				.setInvokable(InvokableBlockingInInvoke.class)
-				.setNetworkEnvironment(network)
 				.setConsumableNotifier(consumableNotifier)
 				.setPartitionProducerStateChecker(partitionChecker)
 				.setExecutor(Executors.directExecutor())
@@ -602,9 +607,8 @@ public class TaskTest extends TestLogger {
 			setup();
 
 			// Any other exception
-			final Task task =  new TaskBuilder()
+			final Task task = createTaskBuilder()
 				.setInvokable(InvokableBlockingInInvoke.class)
-				.setNetworkEnvironment(network)
 				.setConsumableNotifier(consumableNotifier)
 				.setPartitionProducerStateChecker(partitionChecker)
 				.setExecutor(Executors.directExecutor())
@@ -626,9 +630,8 @@ public class TaskTest extends TestLogger {
 
 			// TimeoutException handled special => retry
 			// Any other exception
-			final Task task =  new TaskBuilder()
+			final Task task = createTaskBuilder()
 				.setInvokable(InvokableBlockingInInvoke.class)
-				.setNetworkEnvironment(network)
 				.setConsumableNotifier(consumableNotifier)
 				.setPartitionProducerStateChecker(partitionChecker)
 				.setExecutor(Executors.directExecutor())
@@ -664,9 +667,8 @@ public class TaskTest extends TestLogger {
 			setup();
 
 			// Success
-			final Task task =  new TaskBuilder()
+			final Task task =  createTaskBuilder()
 				.setInvokable(InvokableBlockingInInvoke.class)
-				.setNetworkEnvironment(network)
 				.setConsumableNotifier(consumableNotifier)
 				.setPartitionProducerStateChecker(partitionChecker)
 				.setExecutor(Executors.directExecutor())
@@ -711,7 +713,7 @@ public class TaskTest extends TestLogger {
 		config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL.key(), 5);
 		config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT.key(), 60 * 1000);
 
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setInvokable(InvokableBlockingInCancel.class)
 			.setTaskManagerConfig(config)
 			.setTaskManagerActions(taskManagerActions)
@@ -738,7 +740,7 @@ public class TaskTest extends TestLogger {
 		config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, 5);
 		config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 50);
 
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setInvokable(InvokableInterruptibleSharedLockInInvokeAndCancel.class)
 			.setTaskManagerConfig(config)
 			.setTaskManagerActions(taskManagerActions)
@@ -765,7 +767,7 @@ public class TaskTest extends TestLogger {
 		config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, 5);
 		config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 50);
 
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setInvokable(InvokableUnInterruptibleBlockingInvoke.class)
 			.setTaskManagerConfig(config)
 			.setTaskManagerActions(taskManagerActions)
@@ -804,7 +806,7 @@ public class TaskTest extends TestLogger {
 		executionConfig.setTaskCancellationInterval(interval + 1337);
 		executionConfig.setTaskCancellationTimeout(timeout - 1337);
 
-		final Task task = new TaskBuilder()
+		final Task task = createTaskBuilder()
 			.setInvokable(InvokableBlockingInInvoke.class)
 			.setTaskManagerConfig(config)
 			.setExecutionConfig(executionConfig)
@@ -917,13 +919,17 @@ public class TaskTest extends TestLogger {
 		}
 	}
 
-	private final class TaskBuilder {
+	private TaskBuilder createTaskBuilder() {
+		return new TaskBuilder(networkEnvironment);
+	}
+
+	private static final class TaskBuilder {
 		private Class<? extends AbstractInvokable> invokable;
 		private TaskManagerActions taskManagerActions;
 		private LibraryCacheManager libraryCacheManager;
 		private ResultPartitionConsumableNotifier consumableNotifier;
 		private PartitionProducerStateChecker partitionProducerStateChecker;
-		private NetworkEnvironment networkEnvironment;
+		private final NetworkEnvironment networkEnvironment;
 		private KvStateService kvStateService;
 		private Executor executor;
 		private Configuration taskManagerConfig;
@@ -940,10 +946,6 @@ public class TaskTest extends TestLogger {
 			consumableNotifier = new NoOpResultPartitionConsumableNotifier();
 			partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
 
-			final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
-			networkEnvironment = mock(NetworkEnvironment.class);
-			when(networkEnvironment.getResultPartitionManager()).thenReturn(partitionManager);
-
 			kvStateService = new KvStateService(new KvStateRegistry(), null, null);
 
 			executor = TestingUtils.defaultExecutor();
@@ -954,6 +956,10 @@ public class TaskTest extends TestLogger {
 			requiredJarFileBlobKeys = Collections.emptyList();
 		}
 
+		private TaskBuilder(NetworkEnvironment networkEnvironment) {
+			this.networkEnvironment = Preconditions.checkNotNull(networkEnvironment);
+		}
+
 		TaskBuilder setInvokable(Class<? extends AbstractInvokable> invokable) {
 			this.invokable = invokable;
 			return this;
@@ -979,11 +985,6 @@ public class TaskTest extends TestLogger {
 			return this;
 		}
 
-		TaskBuilder setNetworkEnvironment(NetworkEnvironment networkEnvironment) {
-			this.networkEnvironment = networkEnvironment;
-			return this;
-		}
-
 		TaskBuilder setKvStateService(KvStateService kvStateService) {
 			this.kvStateService = kvStateService;
 			return this;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index cff1a26..1b2ec49 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
@@ -167,7 +168,7 @@ public class JvmExitOnFatalErrorTest {
 				final MemoryManager memoryManager = new MemoryManager(1024 * 1024, 1);
 				final IOManager ioManager = new IOManagerAsync();
 
-				final NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
+				final NetworkEnvironment networkEnvironment = new NetworkEnvironmentBuilder().build();
 
 				final TaskManagerRuntimeInfo tmInfo = TaskManagerConfiguration.fromConfiguration(taskManagerConfig);
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 9e1e793..24b1047 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
@@ -180,7 +181,7 @@ public class InterruptSensitiveRestoreTest {
 			StreamStateHandle state,
 			int mode) throws IOException {
 
-		NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
+		NetworkEnvironment networkEnvironment = new NetworkEnvironmentBuilder().build();
 
 		Collection<KeyedStateHandle> keyedStateFromBackend = Collections.emptyList();
 		Collection<KeyedStateHandle> keyedStateFromStream = Collections.emptyList();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index 566e740..8918b0a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
@@ -149,7 +150,7 @@ public class StreamTaskTerminationTest extends TestLogger {
 
 		final TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo();
 
-		final NetworkEnvironment networkEnv = mock(NetworkEnvironment.class);
+		final NetworkEnvironment networkEnv = new NetworkEnvironmentBuilder().build();
 
 		BlobCacheService blobService =
 			new BlobCacheService(mock(PermanentBlobCache.class), mock(TransientBlobCache.class));
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 2fe91e0..e24949d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -50,6 +50,7 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
@@ -899,8 +900,7 @@ public class StreamTaskTest extends TestLogger {
 		PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
 		Executor executor = mock(Executor.class);
 
-		NetworkEnvironment network = mock(NetworkEnvironment.class);
-		when(network.getResultPartitionManager()).thenReturn(partitionManager);
+		NetworkEnvironment network = new NetworkEnvironmentBuilder().build();
 
 		JobInformation jobInformation = new JobInformation(
 			new JobID(),
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
index 32b3392..222133a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
@@ -43,11 +43,11 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -259,12 +259,10 @@ public class SynchronousCheckpointITCase {
 		LibraryCacheManager libCache = mock(LibraryCacheManager.class);
 		when(libCache.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader());
 
-		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 		ResultPartitionConsumableNotifier consumableNotifier = new NoOpResultPartitionConsumableNotifier();
 		PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
 		Executor executor = mock(Executor.class);
-		NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
-		when(networkEnvironment.getResultPartitionManager()).thenReturn(partitionManager);
+		NetworkEnvironment networkEnvironment = new NetworkEnvironmentBuilder().build();
 
 		TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index 6733858..244c8aa 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
@@ -223,7 +224,7 @@ public class TaskCheckpointingBehaviourTest extends TestLogger {
 				TestStreamTask.class.getName(),
 				taskConfig);
 
-		NetworkEnvironment network = mock(NetworkEnvironment.class);
+		NetworkEnvironment network = new NetworkEnvironmentBuilder().build();
 
 		BlobCacheService blobService =
 			new BlobCacheService(mock(PermanentBlobCache.class), mock(TransientBlobCache.class));


[flink] 02/02: [FLINK-11726][network] Refactor the creations of ResultPartition and SingleInputGate into NetworkEnvironment

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit aedc4f5dcca95820969af694e6d2a71a1fe698bd
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Sun May 5 17:35:53 2019 +0800

    [FLINK-11726][network] Refactor the creations of ResultPartition and SingleInputGate into NetworkEnvironment
    
    At the moment ResultPartition and SingleInputGate are created in Task. Based on new pluggable ShuffleManager, they should be created via ShuffleService#createResultPartitionWriter/InputGate.
    
    The NetworkEnvironment would be refactored into NetworkShuffleService future. So we could migrate the process of creating ResultPartition and SingleInputGate into current NetworkEnvironment. The metrics registration of network and buffers can also be done along with the creations.
---
 .../runtime/io/network/NetworkEnvironment.java     | 113 ++++++++++++++++++-
 .../network/metrics/InputBufferPoolUsageGauge.java |  51 +++++++++
 .../io/network/metrics/InputBuffersGauge.java      |  45 ++++++++
 .../consumer => metrics}/InputChannelMetrics.java  |  12 +-
 .../consumer => metrics}/InputGateMetrics.java     |  20 ++--
 .../metrics/OutputBufferPoolUsageGauge.java        |  51 +++++++++
 .../io/network/metrics/OutputBuffersGauge.java     |  45 ++++++++
 .../ResultPartitionMetrics.java                    |  19 ++--
 .../io/network/partition/ResultPartition.java      |   2 +-
 .../partition/consumer/LocalInputChannel.java      |   1 +
 .../partition/consumer/RemoteInputChannel.java     |   1 +
 .../partition/consumer/SingleInputGate.java        |   3 +-
 .../partition/consumer/UnknownInputChannel.java    |   1 +
 .../runtime/metrics/groups/TaskIOMetricGroup.java  | 124 ---------------------
 .../runtime/taskexecutor/TaskManagerServices.java  |   8 +-
 .../NetworkEnvironmentConfiguration.java           |  11 ++
 .../org/apache/flink/runtime/taskmanager/Task.java | 100 ++++++-----------
 .../io/network/NetworkEnvironmentBuilder.java      |  15 ++-
 .../network/partition/InputChannelTestUtils.java   |   2 +-
 .../partition/consumer/SingleInputGateTest.java    |   1 +
 20 files changed, 404 insertions(+), 221 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 0ee8595..b9b35c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -19,19 +19,34 @@
 package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.metrics.InputBufferPoolUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.InputBuffersGauge;
+import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
+import org.apache.flink.runtime.io.network.metrics.InputGateMetrics;
+import org.apache.flink.runtime.io.network.metrics.OutputBufferPoolUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.OutputBuffersGauge;
+import org.apache.flink.runtime.io.network.metrics.ResultPartitionMetrics;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
@@ -56,6 +71,11 @@ public class NetworkEnvironment {
 	private static final String METRIC_TOTAL_MEMORY_SEGMENT = "TotalMemorySegments";
 	private static final String METRIC_AVAILABLE_MEMORY_SEGMENT = "AvailableMemorySegments";
 
+	private static final String METRIC_OUTPUT_QUEUE_LENGTH = "outputQueueLength";
+	private static final String METRIC_OUTPUT_POOL_USAGE = "outPoolUsage";
+	private static final String METRIC_INPUT_QUEUE_LENGTH = "inputQueueLength";
+	private static final String METRIC_INPUT_POOL_USAGE = "inPoolUsage";
+
 	private final Object lock = new Object();
 
 	private final NetworkEnvironmentConfiguration config;
@@ -68,12 +88,15 @@ public class NetworkEnvironment {
 
 	private final TaskEventPublisher taskEventPublisher;
 
+	private final IOManager ioManager;
+
 	private boolean isShutdown;
 
 	public NetworkEnvironment(
 			NetworkEnvironmentConfiguration config,
 			TaskEventPublisher taskEventPublisher,
-			MetricGroup metricGroup) {
+			MetricGroup metricGroup,
+			IOManager ioManager) {
 		this.config = checkNotNull(config);
 
 		this.networkBufferPool = new NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize());
@@ -91,6 +114,8 @@ public class NetworkEnvironment {
 
 		registerNetworkMetrics(metricGroup, networkBufferPool);
 
+		this.ioManager = checkNotNull(ioManager);
+
 		isShutdown = false;
 	}
 
@@ -220,6 +245,92 @@ public class NetworkEnvironment {
 		}
 	}
 
+	// --------------------------------------------------------------------------------------------
+	//  Create Output Writers and Input Readers
+	// --------------------------------------------------------------------------------------------
+
+	public ResultPartition[] createResultPartitionWriters(
+			String taskName,
+			JobID jobId,
+			ExecutionAttemptID executionId,
+			TaskActions taskActions,
+			ResultPartitionConsumableNotifier partitionConsumableNotifier,
+			Collection<ResultPartitionDeploymentDescriptor> resultPartitionDeploymentDescriptors,
+			MetricGroup outputGroup,
+			MetricGroup buffersGroup) {
+		synchronized (lock) {
+			Preconditions.checkState(!isShutdown, "The NetworkEnvironment has already been shut down.");
+
+			ResultPartition[] resultPartitions = new ResultPartition[resultPartitionDeploymentDescriptors.size()];
+			int counter = 0;
+			for (ResultPartitionDeploymentDescriptor rpdd : resultPartitionDeploymentDescriptors) {
+				resultPartitions[counter++] = new ResultPartition(
+					taskName,
+					taskActions,
+					jobId,
+					new ResultPartitionID(rpdd.getPartitionId(), executionId),
+					rpdd.getPartitionType(),
+					rpdd.getNumberOfSubpartitions(),
+					rpdd.getMaxParallelism(),
+					resultPartitionManager,
+					partitionConsumableNotifier,
+					ioManager,
+					rpdd.sendScheduleOrUpdateConsumersMessage());
+			}
+
+			registerOutputMetrics(outputGroup, buffersGroup, resultPartitions);
+			return resultPartitions;
+		}
+	}
+
+	public SingleInputGate[] createInputGates(
+			String taskName,
+			JobID jobId,
+			TaskActions taskActions,
+			Collection<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors,
+			MetricGroup parentGroup,
+			MetricGroup inputGroup,
+			MetricGroup buffersGroup,
+			Counter numBytesInCounter) {
+		synchronized (lock) {
+			Preconditions.checkState(!isShutdown, "The NetworkEnvironment has already been shut down.");
+
+			InputChannelMetrics inputChannelMetrics = new InputChannelMetrics(parentGroup);
+			SingleInputGate[] inputGates = new SingleInputGate[inputGateDeploymentDescriptors.size()];
+			int counter = 0;
+			for (InputGateDeploymentDescriptor igdd : inputGateDeploymentDescriptors) {
+				inputGates[counter++] = SingleInputGate.create(
+					taskName,
+					jobId,
+					igdd,
+					this,
+					taskEventPublisher,
+					taskActions,
+					inputChannelMetrics,
+					numBytesInCounter);
+			}
+
+			registerInputMetrics(inputGroup, buffersGroup, inputGates);
+			return inputGates;
+		}
+	}
+
+	private void registerOutputMetrics(MetricGroup outputGroup, MetricGroup buffersGroup, ResultPartition[] resultPartitions) {
+		if (config.isNetworkDetailedMetrics()) {
+			ResultPartitionMetrics.registerQueueLengthMetrics(outputGroup, resultPartitions);
+		}
+		buffersGroup.gauge(METRIC_OUTPUT_QUEUE_LENGTH, new OutputBuffersGauge(resultPartitions));
+		buffersGroup.gauge(METRIC_OUTPUT_POOL_USAGE, new OutputBufferPoolUsageGauge(resultPartitions));
+	}
+
+	private void registerInputMetrics(MetricGroup inputGroup, MetricGroup buffersGroup, SingleInputGate[] inputGates) {
+		if (config.isNetworkDetailedMetrics()) {
+			InputGateMetrics.registerQueueLengthMetrics(inputGroup, inputGates);
+		}
+		buffersGroup.gauge(METRIC_INPUT_QUEUE_LENGTH, new InputBuffersGauge(inputGates));
+		buffersGroup.gauge(METRIC_INPUT_POOL_USAGE, new InputBufferPoolUsageGauge(inputGates));
+	}
+
 	public void start() throws IOException {
 		synchronized (lock) {
 			Preconditions.checkState(!isShutdown, "The NetworkEnvironment has already been shut down.");
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBufferPoolUsageGauge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBufferPoolUsageGauge.java
new file mode 100644
index 0000000..992f561
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBufferPoolUsageGauge.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+
+/**
+ * Gauge metric measuring the input buffer pool usage gauge for {@link SingleInputGate}s.
+ */
+public class InputBufferPoolUsageGauge implements Gauge<Float> {
+
+	private final SingleInputGate[] inputGates;
+
+	public InputBufferPoolUsageGauge(SingleInputGate[] inputGates) {
+		this.inputGates = inputGates;
+	}
+
+	@Override
+	public Float getValue() {
+		int usedBuffers = 0;
+		int bufferPoolSize = 0;
+
+		for (SingleInputGate inputGate : inputGates) {
+			usedBuffers += inputGate.getBufferPool().bestEffortGetNumOfUsedBuffers();
+			bufferPoolSize += inputGate.getBufferPool().getNumBuffers();
+		}
+
+		if (bufferPoolSize != 0) {
+			return ((float) usedBuffers) / bufferPoolSize;
+		} else {
+			return 0.0f;
+		}
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBuffersGauge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBuffersGauge.java
new file mode 100644
index 0000000..933f05f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBuffersGauge.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+
+/**
+ * Gauge metric measuring the number of queued input buffers for {@link SingleInputGate}s.
+ */
+public class InputBuffersGauge implements Gauge<Integer> {
+
+	private final SingleInputGate[] inputGates;
+
+	public InputBuffersGauge(SingleInputGate[] inputGates) {
+		this.inputGates = inputGates;
+	}
+
+	@Override
+	public Integer getValue() {
+		int totalBuffers = 0;
+
+		for (SingleInputGate inputGate : inputGates) {
+			totalBuffers += inputGate.getNumberOfQueuedBuffers();
+		}
+
+		return totalBuffers;
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputChannelMetrics.java
similarity index 87%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelMetrics.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputChannelMetrics.java
index e422a5d..51afd1d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputChannelMetrics.java
@@ -16,11 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.io.network.partition.consumer;
+package org.apache.flink.runtime.io.network.metrics;
 
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.MeterView;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.metrics.MetricNames;
 
 /**
@@ -55,19 +57,19 @@ public class InputChannelMetrics {
 		parent.meter(IO_NUM_BUFFERS_IN_REMOTE_RATE, new MeterView(numBuffersInRemote, 60));
 	}
 
-	Counter getNumBytesInLocalCounter() {
+	public Counter getNumBytesInLocalCounter() {
 		return numBytesInLocal;
 	}
 
-	Counter getNumBytesInRemoteCounter() {
+	public Counter getNumBytesInRemoteCounter() {
 		return numBytesInRemote;
 	}
 
-	Counter getNumBuffersInLocalCounter() {
+	public Counter getNumBuffersInLocalCounter() {
 		return numBuffersInLocal;
 	}
 
-	Counter getNumBuffersInRemoteCounter() {
+	public Counter getNumBuffersInRemoteCounter() {
 		return numBuffersInRemote;
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputGateMetrics.java
similarity index 86%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputGateMetrics.java
index ebb8b9d..b1d7bfd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputGateMetrics.java
@@ -16,10 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.io.network.partition.consumer;
+package org.apache.flink.runtime.io.network.metrics;
 
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 
 import java.util.Collection;
 
@@ -176,12 +179,15 @@ public class InputGateMetrics {
 	//  Static access
 	// ------------------------------------------------------------------------
 
-	public static void registerQueueLengthMetrics(MetricGroup group, SingleInputGate gate) {
-		InputGateMetrics metrics = new InputGateMetrics(gate);
+	public static void registerQueueLengthMetrics(MetricGroup parent, SingleInputGate[] gates) {
+		for (int i = 0; i < gates.length; i++) {
+			InputGateMetrics metrics = new InputGateMetrics(gates[i]);
 
-		group.gauge("totalQueueLen", metrics.getTotalQueueLenGauge());
-		group.gauge("minQueueLen", metrics.getMinQueueLenGauge());
-		group.gauge("maxQueueLen", metrics.getMaxQueueLenGauge());
-		group.gauge("avgQueueLen", metrics.getAvgQueueLenGauge());
+			MetricGroup group = parent.addGroup(i);
+			group.gauge("totalQueueLen", metrics.getTotalQueueLenGauge());
+			group.gauge("minQueueLen", metrics.getMinQueueLenGauge());
+			group.gauge("maxQueueLen", metrics.getMaxQueueLenGauge());
+			group.gauge("avgQueueLen", metrics.getAvgQueueLenGauge());
+		}
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/OutputBufferPoolUsageGauge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/OutputBufferPoolUsageGauge.java
new file mode 100644
index 0000000..9aad92c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/OutputBufferPoolUsageGauge.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+
+/**
+ * Gauge metric measuring the output buffer pool usage gauge for {@link ResultPartition}s.
+ */
+public class OutputBufferPoolUsageGauge implements Gauge<Float> {
+
+	private final ResultPartition[] resultPartitions;
+
+	public OutputBufferPoolUsageGauge(ResultPartition[] resultPartitions) {
+		this.resultPartitions = resultPartitions;
+	}
+
+	@Override
+	public Float getValue() {
+		int usedBuffers = 0;
+		int bufferPoolSize = 0;
+
+		for (ResultPartition resultPartition : resultPartitions) {
+			usedBuffers += resultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers();
+			bufferPoolSize += resultPartition.getBufferPool().getNumBuffers();
+		}
+
+		if (bufferPoolSize != 0) {
+			return ((float) usedBuffers) / bufferPoolSize;
+		} else {
+			return 0.0f;
+		}
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/OutputBuffersGauge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/OutputBuffersGauge.java
new file mode 100644
index 0000000..6c5fd67
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/OutputBuffersGauge.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+
+/**
+ * Gauge metric measuring the number of queued output buffers for {@link ResultPartition}s.
+ */
+public class OutputBuffersGauge implements Gauge<Integer> {
+
+	private final ResultPartition[] resultPartitions;
+
+	public OutputBuffersGauge(ResultPartition[] resultPartitions) {
+		this.resultPartitions = resultPartitions;
+	}
+
+	@Override
+	public Integer getValue() {
+		int totalBuffers = 0;
+
+		for (ResultPartition producedPartition : resultPartitions) {
+			totalBuffers += producedPartition.getNumberOfQueuedBuffers();
+		}
+
+		return totalBuffers;
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/ResultPartitionMetrics.java
similarity index 85%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionMetrics.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/ResultPartitionMetrics.java
index fde2ebd..2171ff3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/ResultPartitionMetrics.java
@@ -16,10 +16,12 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.io.network.partition;
+package org.apache.flink.runtime.io.network.metrics;
 
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -158,12 +160,15 @@ public class ResultPartitionMetrics {
 	//  Static access
 	// ------------------------------------------------------------------------
 
-	public static void registerQueueLengthMetrics(MetricGroup group, ResultPartition partition) {
-		ResultPartitionMetrics metrics = new ResultPartitionMetrics(partition);
+	public static void registerQueueLengthMetrics(MetricGroup parent, ResultPartition[] partitions) {
+		for (int i = 0; i < partitions.length; i++) {
+			ResultPartitionMetrics metrics = new ResultPartitionMetrics(partitions[i]);
 
-		group.gauge("totalQueueLen", metrics.getTotalQueueLenGauge());
-		group.gauge("minQueueLen", metrics.getMinQueueLenGauge());
-		group.gauge("maxQueueLen", metrics.getMaxQueueLenGauge());
-		group.gauge("avgQueueLen", metrics.getAvgQueueLenGauge());
+			MetricGroup group = parent.addGroup(i);
+			group.gauge("totalQueueLen", metrics.getTotalQueueLenGauge());
+			group.gauge("minQueueLen", metrics.getMinQueueLenGauge());
+			group.gauge("maxQueueLen", metrics.getMaxQueueLenGauge());
+			group.gauge("avgQueueLen", metrics.getAvgQueueLenGauge());
+		}
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index fb73a70..24ce27e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -446,7 +446,7 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 				this, subpartitionIndex, pendingReferences);
 	}
 
-	ResultSubpartition[] getAllPartitions() {
+	public ResultSubpartition[] getAllPartitions() {
 		return subpartitions;
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index d41f5c8..feb6b72 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition.consumer;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.TaskEventPublisher;
+import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 6286e12..87e3f62 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferListener;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
 import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 9444845..d40af83 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
@@ -657,7 +658,7 @@ public class SingleInputGate implements InputGate {
 
 	// ------------------------------------------------------------------------
 
-	Map<IntermediateResultPartitionID, InputChannel> getInputChannels() {
+	public Map<IntermediateResultPartitionID, InputChannel> getInputChannels() {
 		return inputChannels;
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index 1775b27..0ed01e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventPublisher;
+import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index 7e1329e..ebf447a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -19,16 +19,11 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MeterView;
-import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
-import org.apache.flink.runtime.io.network.partition.ResultPartition;
-import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.metrics.MetricNames;
-import org.apache.flink.runtime.taskmanager.Task;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -97,125 +92,6 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
 	}
 
 	// ============================================================================================
-	// Buffer metrics
-	// ============================================================================================
-
-	/**
-	 * Initialize Buffer Metrics for a task.
-	 */
-	public void initializeBufferMetrics(Task task) {
-		final MetricGroup buffers = addGroup("buffers");
-		buffers.gauge("inputQueueLength", new InputBuffersGauge(task));
-		buffers.gauge("outputQueueLength", new OutputBuffersGauge(task));
-		buffers.gauge("inPoolUsage", new InputBufferPoolUsageGauge(task));
-		buffers.gauge("outPoolUsage", new OutputBufferPoolUsageGauge(task));
-	}
-
-	/**
-	 * Gauge measuring the number of queued input buffers of a task.
-	 */
-	private static final class InputBuffersGauge implements Gauge<Integer> {
-
-		private final Task task;
-
-		public InputBuffersGauge(Task task) {
-			this.task = task;
-		}
-
-		@Override
-		public Integer getValue() {
-			int totalBuffers = 0;
-
-			for (SingleInputGate inputGate : task.getAllInputGates()) {
-				totalBuffers += inputGate.getNumberOfQueuedBuffers();
-			}
-
-			return totalBuffers;
-		}
-	}
-
-	/**
-	 * Gauge measuring the number of queued output buffers of a task.
-	 */
-	private static final class OutputBuffersGauge implements Gauge<Integer> {
-
-		private final Task task;
-
-		public OutputBuffersGauge(Task task) {
-			this.task = task;
-		}
-
-		@Override
-		public Integer getValue() {
-			int totalBuffers = 0;
-
-			for (ResultPartition producedPartition : task.getProducedPartitions()) {
-				totalBuffers += producedPartition.getNumberOfQueuedBuffers();
-			}
-
-			return totalBuffers;
-		}
-	}
-
-	/**
-	 * Gauge measuring the input buffer pool usage gauge of a task.
-	 */
-	private static final class InputBufferPoolUsageGauge implements Gauge<Float> {
-
-		private final Task task;
-
-		public InputBufferPoolUsageGauge(Task task) {
-			this.task = task;
-		}
-
-		@Override
-		public Float getValue() {
-			int usedBuffers = 0;
-			int bufferPoolSize = 0;
-
-			for (SingleInputGate inputGate : task.getAllInputGates()) {
-				usedBuffers += inputGate.getBufferPool().bestEffortGetNumOfUsedBuffers();
-				bufferPoolSize += inputGate.getBufferPool().getNumBuffers();
-			}
-
-			if (bufferPoolSize != 0) {
-				return ((float) usedBuffers) / bufferPoolSize;
-			} else {
-				return 0.0f;
-			}
-		}
-	}
-
-	/**
-	 * Gauge measuring the output buffer pool usage gauge of a task.
-	 */
-	private static final class OutputBufferPoolUsageGauge implements Gauge<Float> {
-
-		private final Task task;
-
-		public OutputBufferPoolUsageGauge(Task task) {
-			this.task = task;
-		}
-
-		@Override
-		public Float getValue() {
-			int usedBuffers = 0;
-			int bufferPoolSize = 0;
-
-			for (ResultPartition resultPartition : task.getProducedPartitions()) {
-				usedBuffers += resultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers();
-				bufferPoolSize += resultPartition.getBufferPool().getNumBuffers();
-			}
-
-			if (bufferPoolSize != 0) {
-				return ((float) usedBuffers) / bufferPoolSize;
-			} else {
-				return 0.0f;
-			}
-		}
-	}
-
-	// ============================================================================================
 	// Metric Reuse
 	// ============================================================================================
 	public void reuseRecordsInputCounter(Counter numRecordsInCounter) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index a3e9066..395a207 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -243,8 +243,11 @@ public class TaskManagerServices {
 
 		final TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
 
+		// start the I/O manager, it will create some temp directories.
+		final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
+
 		final NetworkEnvironment network = new NetworkEnvironment(
-			taskManagerServicesConfiguration.getNetworkConfig(), taskEventDispatcher, taskManagerMetricGroup);
+			taskManagerServicesConfiguration.getNetworkConfig(), taskEventDispatcher, taskManagerMetricGroup, ioManager);
 		network.start();
 
 		final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration);
@@ -258,9 +261,6 @@ public class TaskManagerServices {
 		// this call has to happen strictly after the network stack has been initialized
 		final MemoryManager memoryManager = createMemoryManager(taskManagerServicesConfiguration, freeHeapMemoryWithDefrag, maxJvmHeapMemory);
 
-		// start the I/O manager, it will create some temp directories.
-		final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
-
 		final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();
 
 		final List<ResourceProfile> resourceProfiles = new ArrayList<>(taskManagerServicesConfiguration.getNumberOfSlots());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
index 5cc6412..449ad8f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
@@ -60,6 +60,8 @@ public class NetworkEnvironmentConfiguration {
 
 	private final boolean isCreditBased;
 
+	private final boolean isNetworkDetailedMetrics;
+
 	private final NettyConfig nettyConfig;
 
 	public NetworkEnvironmentConfiguration(
@@ -70,6 +72,7 @@ public class NetworkEnvironmentConfiguration {
 			int networkBuffersPerChannel,
 			int floatingNetworkBuffersPerGate,
 			boolean isCreditBased,
+			boolean isNetworkDetailedMetrics,
 			@Nullable NettyConfig nettyConfig) {
 
 		this.numNetworkBuffers = numNetworkBuffers;
@@ -79,6 +82,7 @@ public class NetworkEnvironmentConfiguration {
 		this.networkBuffersPerChannel = networkBuffersPerChannel;
 		this.floatingNetworkBuffersPerGate = floatingNetworkBuffersPerGate;
 		this.isCreditBased = isCreditBased;
+		this.isNetworkDetailedMetrics = isNetworkDetailedMetrics;
 		this.nettyConfig = nettyConfig;
 	}
 
@@ -116,6 +120,10 @@ public class NetworkEnvironmentConfiguration {
 		return isCreditBased;
 	}
 
+	public boolean isNetworkDetailedMetrics() {
+		return isNetworkDetailedMetrics;
+	}
+
 	// ------------------------------------------------------------------------
 
 	/**
@@ -150,6 +158,8 @@ public class NetworkEnvironmentConfiguration {
 
 		boolean isCreditBased = nettyConfig != null && configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
 
+		boolean isNetworkDetailedMetrics = configuration.getBoolean(TaskManagerOptions.NETWORK_DETAILED_METRICS);
+
 		return new NetworkEnvironmentConfiguration(
 			numberOfNetworkBuffers,
 			pageSize,
@@ -158,6 +168,7 @@ public class NetworkEnvironmentConfiguration {
 			buffersPerChannel,
 			extraBuffersPerGate,
 			isCreditBased,
+			isNetworkDetailedMetrics,
 			nettyConfig);
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 707febe..ac446e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -54,10 +54,7 @@ import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionMetrics;
-import org.apache.flink.runtime.io.network.partition.consumer.InputChannelMetrics;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGateMetrics;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -368,52 +365,39 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
 
 		final String taskNameWithSubtaskAndId = taskNameWithSubtask + " (" + executionId + ')';
 
-		// Produced intermediate result partitions
-		this.producedPartitions = new ResultPartition[resultPartitionDeploymentDescriptors.size()];
+		// add metrics for buffers
+		final MetricGroup buffersGroup = metrics.getIOMetricGroup().addGroup("buffers");
+
+		// similar to MetricUtils.instantiateNetworkMetrics() but inside this IOMetricGroup
+		final MetricGroup networkGroup = metrics.getIOMetricGroup().addGroup("Network");
+		final MetricGroup outputGroup = networkGroup.addGroup("Output");
+		final MetricGroup inputGroup = networkGroup.addGroup("Input");
+
+		// produced intermediate result partitions
+		this.producedPartitions = networkEnvironment.createResultPartitionWriters(
+			taskNameWithSubtaskAndId,
+			jobId,
+			executionId,
+			this,
+			resultPartitionConsumableNotifier,
+			resultPartitionDeploymentDescriptors,
+			outputGroup,
+			buffersGroup);
+
+		// consumed intermediate result partitions
+		this.inputGates = networkEnvironment.createInputGates(
+			taskNameWithSubtaskAndId,
+			jobId,
+			this,
+			inputGateDeploymentDescriptors,
+			metrics.getIOMetricGroup(),
+			inputGroup,
+			buffersGroup,
+			metrics.getIOMetricGroup().getNumBytesInCounter());
 
-		int counter = 0;
-
-		for (ResultPartitionDeploymentDescriptor desc: resultPartitionDeploymentDescriptors) {
-			ResultPartitionID partitionId = new ResultPartitionID(desc.getPartitionId(), executionId);
-
-			this.producedPartitions[counter] = new ResultPartition(
-				taskNameWithSubtaskAndId,
-				this,
-				jobId,
-				partitionId,
-				desc.getPartitionType(),
-				desc.getNumberOfSubpartitions(),
-				desc.getMaxParallelism(),
-				networkEnvironment.getResultPartitionManager(),
-				resultPartitionConsumableNotifier,
-				ioManager,
-				desc.sendScheduleOrUpdateConsumersMessage());
-
-			++counter;
-		}
-
-		// Consumed intermediate result partitions
-		this.inputGates = new SingleInputGate[inputGateDeploymentDescriptors.size()];
 		this.inputGatesById = new HashMap<>();
-
-		counter = 0;
-
-		InputChannelMetrics inputChannelMetrics = new InputChannelMetrics(metricGroup.getIOMetricGroup());
-		for (InputGateDeploymentDescriptor inputGateDeploymentDescriptor: inputGateDeploymentDescriptors) {
-			SingleInputGate gate = SingleInputGate.create(
-				taskNameWithSubtaskAndId,
-				jobId,
-				inputGateDeploymentDescriptor,
-				networkEnvironment,
-				taskEventDispatcher,
-				this,
-				inputChannelMetrics,
-				metricGroup.getIOMetricGroup().getNumBytesInCounter());
-
-			inputGates[counter] = gate;
-			inputGatesById.put(gate.getConsumedResultId(), gate);
-
-			++counter;
+		for (SingleInputGate inputGate : inputGates) {
+			inputGatesById.put(inputGate.getConsumedResultId(), inputGate);
 		}
 
 		invokableHasBeenCanceled = new AtomicBoolean(false);
@@ -629,28 +613,6 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
 				taskEventDispatcher.registerPartition(partition.getPartitionId());
 			}
 
-			// add metrics for buffers
-			this.metrics.getIOMetricGroup().initializeBufferMetrics(this);
-
-			// register detailed network metrics, if configured
-			if (taskManagerConfig.getConfiguration().getBoolean(TaskManagerOptions.NETWORK_DETAILED_METRICS)) {
-				// similar to MetricUtils.instantiateNetworkMetrics() but inside this IOMetricGroup
-				MetricGroup networkGroup = this.metrics.getIOMetricGroup().addGroup("Network");
-				MetricGroup outputGroup = networkGroup.addGroup("Output");
-				MetricGroup inputGroup = networkGroup.addGroup("Input");
-
-				// output metrics
-				for (int i = 0; i < producedPartitions.length; i++) {
-					ResultPartitionMetrics.registerQueueLengthMetrics(
-						outputGroup.addGroup(i), producedPartitions[i]);
-				}
-
-				for (int i = 0; i < inputGates.length; i++) {
-					InputGateMetrics.registerQueueLengthMetrics(
-						inputGroup.addGroup(i), inputGates[i]);
-				}
-			}
-
 			// next, kick off the background copying of files for the distributed cache
 			try {
 				for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentBuilder.java
index 9144334..5e529fd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentBuilder.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
@@ -42,12 +44,16 @@ public class NetworkEnvironmentBuilder {
 
 	private boolean isCreditBased = true;
 
+	private boolean isNetworkDetailedMetrics = false;
+
 	private NettyConfig nettyConfig;
 
 	private TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
 
 	private MetricGroup metricGroup = UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup();
 
+	private IOManager ioManager = new IOManagerAsync();
+
 	public NetworkEnvironmentBuilder setNumNetworkBuffers(int numNetworkBuffers) {
 		this.numNetworkBuffers = numNetworkBuffers;
 		return this;
@@ -98,6 +104,11 @@ public class NetworkEnvironmentBuilder {
 		return this;
 	}
 
+	public NetworkEnvironmentBuilder setIOManager(IOManager ioManager) {
+		this.ioManager = ioManager;
+		return this;
+	}
+
 	public NetworkEnvironment build() {
 		return new NetworkEnvironment(
 			new NetworkEnvironmentConfiguration(
@@ -108,8 +119,10 @@ public class NetworkEnvironmentBuilder {
 				networkBuffersPerChannel,
 				floatingNetworkBuffersPerGate,
 				isCreditBased,
+				isNetworkDetailedMetrics,
 				nettyConfig),
 			taskEventDispatcher,
-			metricGroup);
+			metricGroup,
+			ioManager);
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
index ffec7f4..4feee4e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
@@ -23,8 +23,8 @@ import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
 import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
-import org.apache.flink.runtime.io.network.partition.consumer.InputChannelMetrics;
 import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 2c510bb..71e4f5a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;