You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/05/22 10:26:35 UTC

[flink] 10/10: [FLINK-12331][network] Refactor NetworkEnvironment#setupInputGate() to InputGate#setup()

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3d81425de91ecbab715f8a7d169e0a7796d19c51
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Fri May 10 22:14:07 2019 +0200

    [FLINK-12331][network] Refactor NetworkEnvironment#setupInputGate() to InputGate#setup()
    
    Move input gate setup from NetworkEnvironment to InputGate.
    This eliminates tie between Task and NetworkEnvironment.
    Task does not need depend on NetworkEnvironment
    and can trigger setup from InputGate directly.
---
 .../runtime/io/network/NetworkEnvironment.java     | 57 ----------------------
 .../io/network/partition/consumer/InputGate.java   |  5 ++
 .../partition/consumer/SingleInputGate.java        | 19 +++++++-
 .../partition/consumer/SingleInputGateFactory.java | 55 ++++++++++++++++++---
 .../network/partition/consumer/UnionInputGate.java |  4 ++
 .../org/apache/flink/runtime/taskmanager/Task.java | 28 ++++++-----
 .../runtime/io/network/NetworkEnvironmentTest.java | 49 +++++++++----------
 .../network/partition/InputGateFairnessTest.java   |  8 ++-
 .../network/partition/ResultPartitionBuilder.java  |  2 +-
 .../partition/consumer/InputGateTestBase.java      | 16 ++++--
 .../partition/consumer/SingleInputGateBuilder.java | 25 +++++++++-
 .../partition/consumer/SingleInputGateTest.java    | 16 +++---
 .../apache/flink/runtime/taskmanager/TaskTest.java | 52 ++++++++++++++++++--
 .../runtime/io/BarrierBufferMassiveRandomTest.java |  4 ++
 .../flink/streaming/runtime/io/MockInputGate.java  |  4 ++
 .../StreamNetworkBenchmarkEnvironment.java         |  2 +-
 16 files changed, 219 insertions(+), 127 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index de669bb..1f2ee7e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.metrics.InputBufferPoolUsageGauge;
 import org.apache.flink.runtime.io.network.metrics.InputBuffersGauge;
@@ -47,9 +46,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory;
 import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
-import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskActions;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
@@ -190,60 +187,6 @@ public class NetworkEnvironment {
 		return config;
 	}
 
-	// --------------------------------------------------------------------------------------------
-	//  Task operations
-	// --------------------------------------------------------------------------------------------
-
-	public void registerTask(Task task) throws IOException {
-		final ResultPartition[] producedPartitions = task.getProducedPartitions();
-
-		synchronized (lock) {
-			if (isShutdown) {
-				throw new IllegalStateException("NetworkEnvironment is shut down");
-			}
-
-			for (final ResultPartition partition : producedPartitions) {
-				partition.setup();
-			}
-
-			// Setup the buffer pool for each buffer reader
-			final SingleInputGate[] inputGates = task.getAllInputGates();
-			for (SingleInputGate gate : inputGates) {
-				setupInputGate(gate);
-			}
-		}
-	}
-
-	@VisibleForTesting
-	public void setupInputGate(SingleInputGate gate) throws IOException {
-		BufferPool bufferPool = null;
-		int maxNumberOfMemorySegments;
-		try {
-			if (config.isCreditBased()) {
-				maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ?
-					config.floatingNetworkBuffersPerGate() : Integer.MAX_VALUE;
-
-				// assign exclusive buffers to input channels directly and use the rest for floating buffers
-				gate.assignExclusiveSegments();
-				bufferPool = networkBufferPool.createBufferPool(0, maxNumberOfMemorySegments);
-			} else {
-				maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ?
-					gate.getNumberOfInputChannels() * config.networkBuffersPerChannel() +
-						config.floatingNetworkBuffersPerGate() : Integer.MAX_VALUE;
-
-				bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(),
-					maxNumberOfMemorySegments);
-			}
-			gate.setBufferPool(bufferPool);
-		} catch (Throwable t) {
-			if (bufferPool != null) {
-				bufferPool.lazyDestroy();
-			}
-
-			ExceptionUtils.rethrowIOException(t);
-		}
-	}
-
 	/**
 	 * Batch release intermediate result partitions.
 	 *
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index 03ac822..7b87d32 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -129,4 +129,9 @@ public abstract class InputGate implements AutoCloseable {
 			this.moreAvailable = moreAvailable;
 		}
 	}
+
+	/**
+	 * Setup gate, potentially heavy-weight, blocking operation comparing to just creation.
+	 */
+	public abstract void setup() throws IOException;
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 087d912..6c23698 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.util.function.SupplierWithException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -173,6 +174,8 @@ public class SingleInputGate extends InputGate {
 
 	private final Counter numBytesIn;
 
+	private final SupplierWithException<BufferPool, IOException> bufferPoolFactory;
+
 	public SingleInputGate(
 		String owningTaskName,
 		JobID jobId,
@@ -182,13 +185,15 @@ public class SingleInputGate extends InputGate {
 		int numberOfInputChannels,
 		TaskActions taskActions,
 		Counter numBytesIn,
-		boolean isCreditBased) {
+		boolean isCreditBased,
+		SupplierWithException<BufferPool, IOException> bufferPoolFactory) {
 
 		this.owningTaskName = checkNotNull(owningTaskName);
 		this.jobId = checkNotNull(jobId);
 
 		this.consumedResultId = checkNotNull(consumedResultId);
 		this.consumedPartitionType = checkNotNull(consumedPartitionType);
+		this.bufferPoolFactory = checkNotNull(bufferPoolFactory);
 
 		checkArgument(consumedSubpartitionIndex >= 0);
 		this.consumedSubpartitionIndex = consumedSubpartitionIndex;
@@ -207,6 +212,18 @@ public class SingleInputGate extends InputGate {
 		this.isCreditBased = isCreditBased;
 	}
 
+	@Override
+	public void setup() throws IOException {
+		checkState(this.bufferPool == null, "Bug in input gate setup logic: Already registered buffer pool.");
+		if (isCreditBased) {
+			// assign exclusive buffers to input channels directly and use the rest for floating buffers
+			assignExclusiveSegments();
+		}
+
+		BufferPool bufferPool = bufferPoolFactory.get();
+		setBufferPool(bufferPool);
+	}
+
 	// ------------------------------------------------------------------------
 	// Properties
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
index e3ed90e..cf2820d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
@@ -18,14 +18,17 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.memory.MemorySegmentProvider;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventPublisher;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
@@ -33,12 +36,15 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.util.function.SupplierWithException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 
+import java.io.IOException;
+
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -64,21 +70,27 @@ public class SingleInputGateFactory {
 	private final TaskEventPublisher taskEventPublisher;
 
 	@Nonnull
-	private final MemorySegmentProvider memorySegmentProvider;
+	private final NetworkBufferPool networkBufferPool;
+
+	private final int networkBuffersPerChannel;
+
+	private final int floatingNetworkBuffersPerGate;
 
 	public SingleInputGateFactory(
 			@Nonnull NetworkEnvironmentConfiguration networkConfig,
 			@Nonnull ConnectionManager connectionManager,
 			@Nonnull ResultPartitionManager partitionManager,
 			@Nonnull TaskEventPublisher taskEventPublisher,
-			@Nonnull MemorySegmentProvider memorySegmentProvider) {
+			@Nonnull NetworkBufferPool networkBufferPool) {
 		this.isCreditBased = networkConfig.isCreditBased();
 		this.partitionRequestInitialBackoff = networkConfig.partitionRequestInitialBackoff();
 		this.partitionRequestMaxBackoff = networkConfig.partitionRequestMaxBackoff();
+		this.networkBuffersPerChannel = networkConfig.networkBuffersPerChannel();
+		this.floatingNetworkBuffersPerGate = networkConfig.floatingNetworkBuffersPerGate();
 		this.connectionManager = connectionManager;
 		this.partitionManager = partitionManager;
 		this.taskEventPublisher = taskEventPublisher;
-		this.memorySegmentProvider = memorySegmentProvider;
+		this.networkBufferPool = networkBufferPool;
 	}
 
 	/**
@@ -101,7 +113,8 @@ public class SingleInputGateFactory {
 
 		final SingleInputGate inputGate = new SingleInputGate(
 			owningTaskName, jobId, consumedResultId, consumedPartitionType, consumedSubpartitionIndex,
-			icdd.length, taskActions, numBytesInCounter, isCreditBased);
+			icdd.length, taskActions, numBytesInCounter, isCreditBased,
+			createBufferPoolFactory(icdd.length, consumedPartitionType));
 
 		// Create the input channels. There is one input channel for each consumed partition.
 		final InputChannel[] inputChannels = new InputChannel[icdd.length];
@@ -131,7 +144,7 @@ public class SingleInputGateFactory {
 					partitionRequestInitialBackoff,
 					partitionRequestMaxBackoff,
 					metrics,
-					memorySegmentProvider);
+					networkBufferPool);
 
 				numRemoteChannels++;
 			}
@@ -143,7 +156,7 @@ public class SingleInputGateFactory {
 					partitionRequestInitialBackoff,
 					partitionRequestMaxBackoff,
 					metrics,
-					memorySegmentProvider);
+					networkBufferPool);
 
 				numUnknownChannels++;
 			}
@@ -163,4 +176,32 @@ public class SingleInputGateFactory {
 
 		return inputGate;
 	}
+
+	private SupplierWithException<BufferPool, IOException> createBufferPoolFactory(int size, ResultPartitionType type) {
+		return createBufferPoolFactory(
+			networkBufferPool, isCreditBased, networkBuffersPerChannel, floatingNetworkBuffersPerGate, size, type);
+	}
+
+	@VisibleForTesting
+	static SupplierWithException<BufferPool, IOException> createBufferPoolFactory(
+		BufferPoolFactory bufferPoolFactory,
+		boolean isCreditBased,
+		int networkBuffersPerChannel,
+		int floatingNetworkBuffersPerGate,
+		int size,
+		ResultPartitionType type) {
+
+		if (isCreditBased) {
+			int maxNumberOfMemorySegments = type.isBounded() ?
+				floatingNetworkBuffersPerGate : Integer.MAX_VALUE;
+
+			return () -> bufferPoolFactory.createBufferPool(0, maxNumberOfMemorySegments);
+		} else {
+			int maxNumberOfMemorySegments = type.isBounded() ?
+				size * networkBuffersPerChannel +
+					floatingNetworkBuffersPerGate : Integer.MAX_VALUE;
+
+			return () -> bufferPoolFactory.createBufferPool(size, maxNumberOfMemorySegments);
+		}
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 76e0390..9777f78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -278,6 +278,10 @@ public class UnionInputGate extends InputGate {
 	}
 
 	@Override
+	public void setup() {
+	}
+
+	@Override
 	public void close() throws IOException {
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index ac446e2..1d95231 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -50,6 +50,7 @@ import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
@@ -216,9 +217,6 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
 	/** The cache for user-defined files that the invokable requires. */
 	private final FileCache fileCache;
 
-	/** The gateway to the network stack, which handles inputs and produced results. */
-	private final NetworkEnvironment network;
-
 	/** The service for kvState registration of this task. */
 	private final KvStateService kvStateService;
 
@@ -352,7 +350,6 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
 		this.blobService = Preconditions.checkNotNull(blobService);
 		this.libraryCache = Preconditions.checkNotNull(libraryCache);
 		this.fileCache = Preconditions.checkNotNull(fileCache);
-		this.network = Preconditions.checkNotNull(networkEnvironment);
 		this.kvStateService = Preconditions.checkNotNull(kvStateService);
 		this.taskManagerConfig = Preconditions.checkNotNull(taskManagerConfig);
 
@@ -438,14 +435,6 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
 		return this.taskConfiguration;
 	}
 
-	public SingleInputGate[] getAllInputGates() {
-		return inputGates;
-	}
-
-	public ResultPartition[] getProducedPartitions() {
-		return producedPartitions;
-	}
-
 	public SingleInputGate getInputGateById(IntermediateDataSetID id) {
 		return inputGatesById.get(id);
 	}
@@ -607,7 +596,7 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
 
 			LOG.info("Registering task at network: {}.", this);
 
-			network.registerTask(this);
+			setupPartionsAndGates(producedPartitions, inputGates);
 
 			for (ResultPartition partition : producedPartitions) {
 				taskEventDispatcher.registerPartition(partition.getPartitionId());
@@ -836,6 +825,19 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
 		}
 	}
 
+	@VisibleForTesting
+	public static void setupPartionsAndGates(
+		ResultPartitionWriter[] producedPartitions, InputGate[] inputGates) throws IOException {
+
+		for (ResultPartitionWriter partition : producedPartitions) {
+			partition.setup();
+		}
+
+		for (InputGate gate : inputGates) {
+			gate.setup();
+		}
+	}
+
 	/**
 	 * Releases network resources before task exits. We should also fail the partition to release if the task
 	 * has failed, is canceled, or is being canceled at the moment.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index 0fc1af7..d6e1d48 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -20,9 +20,11 @@ package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemorySegmentProvider;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
 import org.apache.flink.runtime.taskmanager.Task;
@@ -40,10 +42,8 @@ import java.util.List;
 import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createDummyConnectionManager;
 import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition;
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.spy;
 
 /**
@@ -64,7 +64,7 @@ public class NetworkEnvironmentTest {
 	public ExpectedException expectedException = ExpectedException.none();
 
 	/**
-	 * Verifies that {@link NetworkEnvironment#registerTask(Task)} sets up (un)bounded buffer pool
+	 * Verifies that {@link Task#setupPartionsAndGates(ResultPartitionWriter[], InputGate[])}} sets up (un)bounded buffer pool
 	 * instances for various types of input and output channels.
 	 */
 	@Test
@@ -81,18 +81,13 @@ public class NetworkEnvironmentTest {
 		final ResultPartition[] resultPartitions = new ResultPartition[] {rp1, rp2, rp3, rp4};
 
 		// input gates
-		SingleInputGate ig1 = createSingleInputGate(ResultPartitionType.PIPELINED, 2);
-		SingleInputGate ig2 = createSingleInputGate(ResultPartitionType.BLOCKING, 2);
-		SingleInputGate ig3 = createSingleInputGate(ResultPartitionType.PIPELINED_BOUNDED, 2);
-		SingleInputGate ig4 = createSingleInputGate(ResultPartitionType.PIPELINED_BOUNDED, 8);
+		SingleInputGate ig1 = createSingleInputGate(network, ResultPartitionType.PIPELINED, 2);
+		SingleInputGate ig2 = createSingleInputGate(network, ResultPartitionType.BLOCKING, 2);
+		SingleInputGate ig3 = createSingleInputGate(network, ResultPartitionType.PIPELINED_BOUNDED, 2);
+		SingleInputGate ig4 = createSingleInputGate(network, ResultPartitionType.PIPELINED_BOUNDED, 8);
 		final SingleInputGate[] inputGates = new SingleInputGate[] {ig1, ig2, ig3, ig4};
 
-		// overall task to register
-		Task task = mock(Task.class);
-		when(task.getProducedPartitions()).thenReturn(resultPartitions);
-		when(task.getAllInputGates()).thenReturn(inputGates);
-
-		network.registerTask(task);
+		Task.setupPartionsAndGates(resultPartitions, inputGates);
 
 		// verify buffer pools for the result partitions
 		assertEquals(rp1.getNumberOfSubpartitions(), rp1.getBufferPool().getNumberOfRequiredMemorySegments());
@@ -133,7 +128,7 @@ public class NetworkEnvironmentTest {
 	}
 
 	/**
-	 * Verifies that {@link NetworkEnvironment#registerTask(Task)} sets up (un)bounded buffer pool
+	 * Verifies that {@link Task#setupPartionsAndGates(ResultPartitionWriter[], InputGate[])}} sets up (un)bounded buffer pool
 	 * instances for various types of input and output channels working with the bare minimum of
 	 * required buffers.
 	 */
@@ -153,7 +148,7 @@ public class NetworkEnvironmentTest {
 	}
 
 	/**
-	 * Verifies that {@link NetworkEnvironment#registerTask(Task)} fails if the bare minimum of
+	 * Verifies that {@link Task#setupPartionsAndGates(ResultPartitionWriter[], InputGate[])}} fails if the bare minimum of
 	 * required buffers is not available (we are one buffer short).
 	 */
 	@Test
@@ -189,10 +184,10 @@ public class NetworkEnvironmentTest {
 		final ResultPartition[] resultPartitions = new ResultPartition[] {rp1, rp2, rp3, rp4};
 
 		// input gates
-		SingleInputGate ig1 = createSingleInputGate(ResultPartitionType.PIPELINED, 2);
-		SingleInputGate ig2 = createSingleInputGate(ResultPartitionType.BLOCKING, 2);
-		SingleInputGate ig3 = createSingleInputGate(ResultPartitionType.PIPELINED_BOUNDED, 2);
-		SingleInputGate ig4 = createSingleInputGate(ResultPartitionType.PIPELINED_BOUNDED, 4);
+		SingleInputGate ig1 = createSingleInputGate(network, ResultPartitionType.PIPELINED, 2);
+		SingleInputGate ig2 = createSingleInputGate(network, ResultPartitionType.BLOCKING, 2);
+		SingleInputGate ig3 = createSingleInputGate(network, ResultPartitionType.PIPELINED_BOUNDED, 2);
+		SingleInputGate ig4 = createSingleInputGate(network, ResultPartitionType.PIPELINED_BOUNDED, 4);
 		final SingleInputGate[] inputGates = new SingleInputGate[] {ig1, ig2, ig3, ig4};
 
 		// set up remote input channels for the exclusive buffers of the credit-based flow control
@@ -213,12 +208,7 @@ public class NetworkEnvironmentTest {
 			createRemoteInputChannel(ig3, 3, rp4, connManager, network.getNetworkBufferPool());
 		}
 
-		// overall task to register
-		Task task = mock(Task.class);
-		when(task.getProducedPartitions()).thenReturn(resultPartitions);
-		when(task.getAllInputGates()).thenReturn(inputGates);
-
-		network.registerTask(task);
+		Task.setupPartionsAndGates(resultPartitions, inputGates);
 
 		// verify buffer pools for the result partitions
 		assertEquals(Integer.MAX_VALUE, rp1.getBufferPool().getMaxNumberOfMemorySegments());
@@ -260,8 +250,10 @@ public class NetworkEnvironmentTest {
 
 	/**
 	 * Helper to create spy of a {@link SingleInputGate} for use by a {@link Task} inside
-	 * {@link NetworkEnvironment#registerTask(Task)}.
+	 * {@link Task#setupPartionsAndGates(ResultPartitionWriter[], InputGate[])}}.
 	 *
+	 * @param network
+	 * 	    network enviroment to create buffer pool factory for {@link SingleInputGate}
 	 * @param partitionType
 	 * 		the consumed partition type
 	 * @param numberOfChannels
@@ -269,11 +261,14 @@ public class NetworkEnvironmentTest {
 	 *
 	 * @return input gate with some fake settiFngs
 	 */
-	private SingleInputGate createSingleInputGate(ResultPartitionType partitionType, int numberOfChannels) {
+	private SingleInputGate createSingleInputGate(
+		NetworkEnvironment network, ResultPartitionType partitionType, int numberOfChannels) {
+
 		return spy(new SingleInputGateBuilder()
 			.setNumberOfChannels(numberOfChannels)
 			.setResultPartitionType(partitionType)
 			.setIsCreditBased(enableCreditBasedFlowControl)
+			.setupBufferPoolFactory(network)
 			.build());
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
index 6ed5f21..14ebabc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
@@ -33,6 +34,7 @@ import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
 import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.util.function.SupplierWithException;
 
 import org.junit.Test;
 
@@ -311,6 +313,9 @@ public class InputGateFairnessTest {
 	// ------------------------------------------------------------------------
 
 	private static class FairnessVerifyingInputGate extends SingleInputGate {
+		private static final SupplierWithException<BufferPool, IOException> STUB_BUFFER_POOL_FACTORY = () -> {
+			throw new UnsupportedOperationException();
+		};
 
 		private final ArrayDeque<InputChannel> channelsWithData;
 
@@ -327,7 +332,8 @@ public class InputGateFairnessTest {
 				boolean isCreditBased) {
 
 			super(owningTaskName, jobId, consumedResultId, ResultPartitionType.PIPELINED,
-				consumedSubpartitionIndex, numberOfInputChannels, taskActions, new SimpleCounter(), isCreditBased);
+				consumedSubpartitionIndex, numberOfInputChannels, taskActions, new SimpleCounter(),
+				isCreditBased, STUB_BUFFER_POOL_FACTORY);
 
 			try {
 				Field f = SingleInputGate.class.getDeclaredField("inputChannelsWithData");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
index 0e25368..6a7c4b1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
@@ -56,7 +56,7 @@ public class ResultPartitionBuilder {
 
 	private boolean sendScheduleOrUpdateConsumersMessage = false;
 
-	private NetworkBufferPool networkBufferPool = new NetworkBufferPool(1, 1,1);
+	private NetworkBufferPool networkBufferPool = new NetworkBufferPool(1, 1, 1);
 
 	private int networkBuffersPerChannel = 1;
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateTestBase.java
index 4d254c0..99584c6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateTestBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 
 import org.junit.runner.RunWith;
@@ -74,17 +75,22 @@ public abstract class InputGateTestBase {
 	}
 
 	protected SingleInputGate createInputGate(int numberOfInputChannels) {
-		return createInputGate(numberOfInputChannels, ResultPartitionType.PIPELINED);
+		return createInputGate(null, numberOfInputChannels, ResultPartitionType.PIPELINED);
 	}
 
 	protected SingleInputGate createInputGate(
-			int numberOfInputChannels, ResultPartitionType partitionType) {
-		SingleInputGate inputGate = new SingleInputGateBuilder()
+		NetworkEnvironment environment, int numberOfInputChannels, ResultPartitionType partitionType) {
+
+		SingleInputGateBuilder builder = new SingleInputGateBuilder()
 			.setNumberOfChannels(numberOfInputChannels)
 			.setResultPartitionType(partitionType)
-			.setIsCreditBased(enableCreditBasedFlowControl)
-			.build();
+			.setIsCreditBased(enableCreditBasedFlowControl);
+
+		if (environment != null) {
+			builder = builder.setupBufferPoolFactory(environment);
+		}
 
+		SingleInputGate inputGate = builder.build();
 		assertEquals(partitionType, inputGate.getConsumedPartitionType());
 		return inputGate;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
index 28394a3..6f27baf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
@@ -21,10 +21,16 @@ package org.apache.flink.runtime.io.network.partition.consumer;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
 import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.util.function.SupplierWithException;
+
+import java.io.IOException;
 
 /**
  * Utility class to encapsulate the logic of building a {@link SingleInputGate} instance.
@@ -47,6 +53,10 @@ public class SingleInputGateBuilder {
 
 	private boolean isCreditBased = true;
 
+	private SupplierWithException<BufferPool, IOException> bufferPoolFactory = () -> {
+		throw new UnsupportedOperationException();
+	};
+
 	public SingleInputGateBuilder setResultPartitionType(ResultPartitionType partitionType) {
 		this.partitionType = partitionType;
 		return this;
@@ -67,6 +77,18 @@ public class SingleInputGateBuilder {
 		return this;
 	}
 
+	public SingleInputGateBuilder setupBufferPoolFactory(NetworkEnvironment environment) {
+		NetworkEnvironmentConfiguration config = environment.getConfiguration();
+		this.bufferPoolFactory = SingleInputGateFactory.createBufferPoolFactory(
+			environment.getNetworkBufferPool(),
+			config.isCreditBased(),
+			config.networkBuffersPerChannel(),
+			config.floatingNetworkBuffersPerGate(),
+			numberOfChannels,
+			partitionType);
+		return this;
+	}
+
 	public SingleInputGate build() {
 		return new SingleInputGate(
 			"Single Input Gate",
@@ -77,6 +99,7 @@ public class SingleInputGateBuilder {
 			numberOfChannels,
 			taskActions,
 			numBytesInCounter,
-			isCreditBased);
+			isCreditBased,
+			bufferPoolFactory);
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index dd031f8..93c3375 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -391,17 +391,17 @@ public class SingleInputGateTest extends InputGateTestBase {
 	 */
 	@Test
 	public void testRequestBuffersWithRemoteInputChannel() throws Exception {
-		final SingleInputGate inputGate = createInputGate(1, ResultPartitionType.PIPELINED_BOUNDED);
+		final NetworkEnvironment network = createNetworkEnvironment();
+		final SingleInputGate inputGate = createInputGate(network, 1, ResultPartitionType.PIPELINED_BOUNDED);
 		int buffersPerChannel = 2;
 		int extraNetworkBuffersPerGate = 8;
-		final NetworkEnvironment network = createNetworkEnvironment();
 
 		try {
 			RemoteInputChannel remote =
 				InputChannelBuilder.newBuilder()
 					.setupFromNetworkEnvironment(network)
 					.buildRemoteAndSetToGate(inputGate);
-			network.setupInputGate(inputGate);
+			inputGate.setup();
 
 			NetworkBufferPool bufferPool = network.getNetworkBufferPool();
 			if (enableCreditBasedFlowControl) {
@@ -427,16 +427,16 @@ public class SingleInputGateTest extends InputGateTestBase {
 	 */
 	@Test
 	public void testRequestBuffersWithUnknownInputChannel() throws Exception {
-		final SingleInputGate inputGate = createInputGate(1, ResultPartitionType.PIPELINED_BOUNDED);
+		final NetworkEnvironment network = createNetworkEnvironment();
+		final SingleInputGate inputGate = createInputGate(network, 1, ResultPartitionType.PIPELINED_BOUNDED);
 		int buffersPerChannel = 2;
 		int extraNetworkBuffersPerGate = 8;
-		final NetworkEnvironment network = createNetworkEnvironment();
 
 		try {
 			final ResultPartitionID resultPartitionId = new ResultPartitionID();
 			addUnknownInputChannel(network, inputGate, resultPartitionId, 0);
 
-			network.setupInputGate(inputGate);
+			inputGate.setup();
 			NetworkBufferPool bufferPool = network.getNetworkBufferPool();
 
 			if (enableCreditBasedFlowControl) {
@@ -478,8 +478,8 @@ public class SingleInputGateTest extends InputGateTestBase {
 	 */
 	@Test
 	public void testUpdateUnknownInputChannel() throws Exception {
-		final SingleInputGate inputGate = createInputGate(2);
 		final NetworkEnvironment network = createNetworkEnvironment();
+		final SingleInputGate inputGate = createInputGate(network, 2, ResultPartitionType.PIPELINED);
 
 		try {
 			final ResultPartitionID localResultPartitionId = new ResultPartitionID();
@@ -488,7 +488,7 @@ public class SingleInputGateTest extends InputGateTestBase {
 			final ResultPartitionID remoteResultPartitionId = new ResultPartitionID();
 			addUnknownInputChannel(network, inputGate, remoteResultPartitionId, 1);
 
-			network.setupInputGate(inputGate);
+			inputGate.setup();
 
 			assertThat(inputGate.getInputChannels().get(remoteResultPartitionId.getPartitionId()),
 				is(instanceOf((UnknownInputChannel.class))));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 4c878db..75c7c89 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -33,6 +33,10 @@ import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -51,8 +55,10 @@ import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
@@ -262,7 +268,29 @@ public class TaskTest extends TestLogger {
 	}
 
 	@Test
-	public void testExecutionFailsInNetworkRegistration() throws Exception {
+	public void testExecutionFailsInNetworkRegistrationForPartitions() throws Exception {
+		ResultPartitionDeploymentDescriptor dummyPartition = new ResultPartitionDeploymentDescriptor(
+			new IntermediateDataSetID(), new IntermediateResultPartitionID(),
+			ResultPartitionType.PIPELINED, 1, 1, true);
+		testExecutionFailsInNetworkRegistration(Collections.singleton(dummyPartition), Collections.emptyList());
+	}
+
+	@Test
+	public void testExecutionFailsInNetworkRegistrationForGates() throws Exception {
+		InputChannelDeploymentDescriptor dummyChannel =
+			new InputChannelDeploymentDescriptor(new ResultPartitionID(), ResultPartitionLocation.createLocal());
+		InputGateDeploymentDescriptor dummyGate = new InputGateDeploymentDescriptor(
+			new IntermediateDataSetID(), ResultPartitionType.PIPELINED, 0,
+			new InputChannelDeploymentDescriptor[] { dummyChannel });
+		testExecutionFailsInNetworkRegistration(Collections.emptyList(), Collections.singleton(dummyGate));
+	}
+
+	private void testExecutionFailsInNetworkRegistration(
+		Collection<ResultPartitionDeploymentDescriptor> resultPartitions,
+		Collection<InputGateDeploymentDescriptor> inputGates) throws Exception {
+
+		final String errorMessage = "Network buffer pool has already been destroyed.";
+
 		final ResultPartitionConsumableNotifier consumableNotifier = new NoOpResultPartitionConsumableNotifier();
 		final PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
 
@@ -271,6 +299,8 @@ public class TaskTest extends TestLogger {
 			.setTaskManagerActions(taskManagerActions)
 			.setConsumableNotifier(consumableNotifier)
 			.setPartitionProducerStateChecker(partitionProducerStateChecker)
+			.setResultPartitions(resultPartitions)
+			.setInputGates(inputGates)
 			.build();
 
 		// shut down the network to make the following task registration failure
@@ -282,10 +312,10 @@ public class TaskTest extends TestLogger {
 		// verify final state
 		assertEquals(ExecutionState.FAILED, task.getExecutionState());
 		assertTrue(task.isCanceledOrFailed());
-		assertTrue(task.getFailureCause().getMessage().contains("NetworkEnvironment is shut down"));
+		assertTrue(task.getFailureCause().getMessage().contains(errorMessage));
 
 		taskManagerActions.validateListenerMessage(
-			ExecutionState.FAILED, task, new IllegalStateException("NetworkEnvironment is shut down"));
+			ExecutionState.FAILED, task, new IllegalStateException(errorMessage));
 	}
 
 	@Test
@@ -935,6 +965,8 @@ public class TaskTest extends TestLogger {
 		private Configuration taskManagerConfig;
 		private ExecutionConfig executionConfig;
 		private Collection<PermanentBlobKey> requiredJarFileBlobKeys;
+		private Collection<ResultPartitionDeploymentDescriptor> resultPartitions = Collections.emptyList();
+		private Collection<InputGateDeploymentDescriptor> inputGates = Collections.emptyList();
 
 		{
 			invokable = TestInvokableCorrect.class;
@@ -1010,6 +1042,16 @@ public class TaskTest extends TestLogger {
 			return this;
 		}
 
+		public TaskBuilder setResultPartitions(Collection<ResultPartitionDeploymentDescriptor> resultPartitions) {
+			this.resultPartitions = resultPartitions;
+			return this;
+		}
+
+		public TaskBuilder setInputGates(Collection<InputGateDeploymentDescriptor> inputGates) {
+			this.inputGates = inputGates;
+			return this;
+		}
+
 		private Task build() throws Exception {
 			final JobID jobId = new JobID();
 			final JobVertexID jobVertexId = new JobVertexID();
@@ -1046,8 +1088,8 @@ public class TaskTest extends TestLogger {
 				new AllocationID(),
 				0,
 				0,
-				Collections.emptyList(),
-				Collections.emptyList(),
+				resultPartitions,
+				inputGates,
 				0,
 				mock(MemoryManager.class),
 				mock(IOManager.class),
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index 2ea957d..4284074 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -204,6 +204,10 @@ public class BarrierBufferMassiveRandomTest {
 		}
 
 		@Override
+		public void setup() {
+		}
+
+		@Override
 		public void close() {
 		}
 	}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index 30941ab..b37dee4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -65,6 +65,10 @@ public class MockInputGate extends InputGate {
 	}
 
 	@Override
+	public void setup() {
+	}
+
+	@Override
 	public int getNumberOfInputChannels() {
 		return numberOfChannels;
 	}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index 4b542e0..af59220 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -264,7 +264,7 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 					InputChannelTestUtils.newUnregisteredInputChannelMetrics(),
 					new SimpleCounter());
 
-			environment.setupInputGate(gate);
+			gate.setup();
 			gates[channel] = gate;
 		}