You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/05/22 10:26:35 UTC
[flink] 10/10: [FLINK-12331][network] Refactor
NetworkEnvironment#setupInputGate() to InputGate#setup()
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3d81425de91ecbab715f8a7d169e0a7796d19c51
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Fri May 10 22:14:07 2019 +0200
[FLINK-12331][network] Refactor NetworkEnvironment#setupInputGate() to InputGate#setup()
Move input gate setup from NetworkEnvironment to InputGate.
This eliminates tie between Task and NetworkEnvironment.
Task does not need depend on NetworkEnvironment
and can trigger setup from InputGate directly.
---
.../runtime/io/network/NetworkEnvironment.java | 57 ----------------------
.../io/network/partition/consumer/InputGate.java | 5 ++
.../partition/consumer/SingleInputGate.java | 19 +++++++-
.../partition/consumer/SingleInputGateFactory.java | 55 ++++++++++++++++++---
.../network/partition/consumer/UnionInputGate.java | 4 ++
.../org/apache/flink/runtime/taskmanager/Task.java | 28 ++++++-----
.../runtime/io/network/NetworkEnvironmentTest.java | 49 +++++++++----------
.../network/partition/InputGateFairnessTest.java | 8 ++-
.../network/partition/ResultPartitionBuilder.java | 2 +-
.../partition/consumer/InputGateTestBase.java | 16 ++++--
.../partition/consumer/SingleInputGateBuilder.java | 25 +++++++++-
.../partition/consumer/SingleInputGateTest.java | 16 +++---
.../apache/flink/runtime/taskmanager/TaskTest.java | 52 ++++++++++++++++++--
.../runtime/io/BarrierBufferMassiveRandomTest.java | 4 ++
.../flink/streaming/runtime/io/MockInputGate.java | 4 ++
.../StreamNetworkBenchmarkEnvironment.java | 2 +-
16 files changed, 219 insertions(+), 127 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index de669bb..1f2ee7e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.metrics.InputBufferPoolUsageGauge;
import org.apache.flink.runtime.io.network.metrics.InputBuffersGauge;
@@ -47,9 +46,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
-import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskActions;
-import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
@@ -190,60 +187,6 @@ public class NetworkEnvironment {
return config;
}
- // --------------------------------------------------------------------------------------------
- // Task operations
- // --------------------------------------------------------------------------------------------
-
- public void registerTask(Task task) throws IOException {
- final ResultPartition[] producedPartitions = task.getProducedPartitions();
-
- synchronized (lock) {
- if (isShutdown) {
- throw new IllegalStateException("NetworkEnvironment is shut down");
- }
-
- for (final ResultPartition partition : producedPartitions) {
- partition.setup();
- }
-
- // Setup the buffer pool for each buffer reader
- final SingleInputGate[] inputGates = task.getAllInputGates();
- for (SingleInputGate gate : inputGates) {
- setupInputGate(gate);
- }
- }
- }
-
- @VisibleForTesting
- public void setupInputGate(SingleInputGate gate) throws IOException {
- BufferPool bufferPool = null;
- int maxNumberOfMemorySegments;
- try {
- if (config.isCreditBased()) {
- maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ?
- config.floatingNetworkBuffersPerGate() : Integer.MAX_VALUE;
-
- // assign exclusive buffers to input channels directly and use the rest for floating buffers
- gate.assignExclusiveSegments();
- bufferPool = networkBufferPool.createBufferPool(0, maxNumberOfMemorySegments);
- } else {
- maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ?
- gate.getNumberOfInputChannels() * config.networkBuffersPerChannel() +
- config.floatingNetworkBuffersPerGate() : Integer.MAX_VALUE;
-
- bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(),
- maxNumberOfMemorySegments);
- }
- gate.setBufferPool(bufferPool);
- } catch (Throwable t) {
- if (bufferPool != null) {
- bufferPool.lazyDestroy();
- }
-
- ExceptionUtils.rethrowIOException(t);
- }
- }
-
/**
* Batch release intermediate result partitions.
*
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index 03ac822..7b87d32 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -129,4 +129,9 @@ public abstract class InputGate implements AutoCloseable {
this.moreAvailable = moreAvailable;
}
}
+
+ /**
+ * Setup gate, potentially heavy-weight, blocking operation comparing to just creation.
+ */
+ public abstract void setup() throws IOException;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 087d912..6c23698 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -173,6 +174,8 @@ public class SingleInputGate extends InputGate {
private final Counter numBytesIn;
+ private final SupplierWithException<BufferPool, IOException> bufferPoolFactory;
+
public SingleInputGate(
String owningTaskName,
JobID jobId,
@@ -182,13 +185,15 @@ public class SingleInputGate extends InputGate {
int numberOfInputChannels,
TaskActions taskActions,
Counter numBytesIn,
- boolean isCreditBased) {
+ boolean isCreditBased,
+ SupplierWithException<BufferPool, IOException> bufferPoolFactory) {
this.owningTaskName = checkNotNull(owningTaskName);
this.jobId = checkNotNull(jobId);
this.consumedResultId = checkNotNull(consumedResultId);
this.consumedPartitionType = checkNotNull(consumedPartitionType);
+ this.bufferPoolFactory = checkNotNull(bufferPoolFactory);
checkArgument(consumedSubpartitionIndex >= 0);
this.consumedSubpartitionIndex = consumedSubpartitionIndex;
@@ -207,6 +212,18 @@ public class SingleInputGate extends InputGate {
this.isCreditBased = isCreditBased;
}
+ @Override
+ public void setup() throws IOException {
+ checkState(this.bufferPool == null, "Bug in input gate setup logic: Already registered buffer pool.");
+ if (isCreditBased) {
+ // assign exclusive buffers to input channels directly and use the rest for floating buffers
+ assignExclusiveSegments();
+ }
+
+ BufferPool bufferPool = bufferPoolFactory.get();
+ setBufferPool(bufferPool);
+ }
+
// ------------------------------------------------------------------------
// Properties
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
index e3ed90e..cf2820d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
@@ -18,14 +18,17 @@
package org.apache.flink.runtime.io.network.partition.consumer;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.memory.MemorySegmentProvider;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
@@ -33,12 +36,15 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
+import java.io.IOException;
+
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -64,21 +70,27 @@ public class SingleInputGateFactory {
private final TaskEventPublisher taskEventPublisher;
@Nonnull
- private final MemorySegmentProvider memorySegmentProvider;
+ private final NetworkBufferPool networkBufferPool;
+
+ private final int networkBuffersPerChannel;
+
+ private final int floatingNetworkBuffersPerGate;
public SingleInputGateFactory(
@Nonnull NetworkEnvironmentConfiguration networkConfig,
@Nonnull ConnectionManager connectionManager,
@Nonnull ResultPartitionManager partitionManager,
@Nonnull TaskEventPublisher taskEventPublisher,
- @Nonnull MemorySegmentProvider memorySegmentProvider) {
+ @Nonnull NetworkBufferPool networkBufferPool) {
this.isCreditBased = networkConfig.isCreditBased();
this.partitionRequestInitialBackoff = networkConfig.partitionRequestInitialBackoff();
this.partitionRequestMaxBackoff = networkConfig.partitionRequestMaxBackoff();
+ this.networkBuffersPerChannel = networkConfig.networkBuffersPerChannel();
+ this.floatingNetworkBuffersPerGate = networkConfig.floatingNetworkBuffersPerGate();
this.connectionManager = connectionManager;
this.partitionManager = partitionManager;
this.taskEventPublisher = taskEventPublisher;
- this.memorySegmentProvider = memorySegmentProvider;
+ this.networkBufferPool = networkBufferPool;
}
/**
@@ -101,7 +113,8 @@ public class SingleInputGateFactory {
final SingleInputGate inputGate = new SingleInputGate(
owningTaskName, jobId, consumedResultId, consumedPartitionType, consumedSubpartitionIndex,
- icdd.length, taskActions, numBytesInCounter, isCreditBased);
+ icdd.length, taskActions, numBytesInCounter, isCreditBased,
+ createBufferPoolFactory(icdd.length, consumedPartitionType));
// Create the input channels. There is one input channel for each consumed partition.
final InputChannel[] inputChannels = new InputChannel[icdd.length];
@@ -131,7 +144,7 @@ public class SingleInputGateFactory {
partitionRequestInitialBackoff,
partitionRequestMaxBackoff,
metrics,
- memorySegmentProvider);
+ networkBufferPool);
numRemoteChannels++;
}
@@ -143,7 +156,7 @@ public class SingleInputGateFactory {
partitionRequestInitialBackoff,
partitionRequestMaxBackoff,
metrics,
- memorySegmentProvider);
+ networkBufferPool);
numUnknownChannels++;
}
@@ -163,4 +176,32 @@ public class SingleInputGateFactory {
return inputGate;
}
+
+ private SupplierWithException<BufferPool, IOException> createBufferPoolFactory(int size, ResultPartitionType type) {
+ return createBufferPoolFactory(
+ networkBufferPool, isCreditBased, networkBuffersPerChannel, floatingNetworkBuffersPerGate, size, type);
+ }
+
+ @VisibleForTesting
+ static SupplierWithException<BufferPool, IOException> createBufferPoolFactory(
+ BufferPoolFactory bufferPoolFactory,
+ boolean isCreditBased,
+ int networkBuffersPerChannel,
+ int floatingNetworkBuffersPerGate,
+ int size,
+ ResultPartitionType type) {
+
+ if (isCreditBased) {
+ int maxNumberOfMemorySegments = type.isBounded() ?
+ floatingNetworkBuffersPerGate : Integer.MAX_VALUE;
+
+ return () -> bufferPoolFactory.createBufferPool(0, maxNumberOfMemorySegments);
+ } else {
+ int maxNumberOfMemorySegments = type.isBounded() ?
+ size * networkBuffersPerChannel +
+ floatingNetworkBuffersPerGate : Integer.MAX_VALUE;
+
+ return () -> bufferPoolFactory.createBufferPool(size, maxNumberOfMemorySegments);
+ }
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 76e0390..9777f78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -278,6 +278,10 @@ public class UnionInputGate extends InputGate {
}
@Override
+ public void setup() {
+ }
+
+ @Override
public void close() throws IOException {
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index ac446e2..1d95231 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -50,6 +50,7 @@ import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
@@ -216,9 +217,6 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
/** The cache for user-defined files that the invokable requires. */
private final FileCache fileCache;
- /** The gateway to the network stack, which handles inputs and produced results. */
- private final NetworkEnvironment network;
-
/** The service for kvState registration of this task. */
private final KvStateService kvStateService;
@@ -352,7 +350,6 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
this.blobService = Preconditions.checkNotNull(blobService);
this.libraryCache = Preconditions.checkNotNull(libraryCache);
this.fileCache = Preconditions.checkNotNull(fileCache);
- this.network = Preconditions.checkNotNull(networkEnvironment);
this.kvStateService = Preconditions.checkNotNull(kvStateService);
this.taskManagerConfig = Preconditions.checkNotNull(taskManagerConfig);
@@ -438,14 +435,6 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
return this.taskConfiguration;
}
- public SingleInputGate[] getAllInputGates() {
- return inputGates;
- }
-
- public ResultPartition[] getProducedPartitions() {
- return producedPartitions;
- }
-
public SingleInputGate getInputGateById(IntermediateDataSetID id) {
return inputGatesById.get(id);
}
@@ -607,7 +596,7 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
LOG.info("Registering task at network: {}.", this);
- network.registerTask(this);
+ setupPartionsAndGates(producedPartitions, inputGates);
for (ResultPartition partition : producedPartitions) {
taskEventDispatcher.registerPartition(partition.getPartitionId());
@@ -836,6 +825,19 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
}
}
+ @VisibleForTesting
+ public static void setupPartionsAndGates(
+ ResultPartitionWriter[] producedPartitions, InputGate[] inputGates) throws IOException {
+
+ for (ResultPartitionWriter partition : producedPartitions) {
+ partition.setup();
+ }
+
+ for (InputGate gate : inputGates) {
+ gate.setup();
+ }
+ }
+
/**
* Releases network resources before task exits. We should also fail the partition to release if the task
* has failed, is canceled, or is being canceled at the moment.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index 0fc1af7..d6e1d48 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -20,9 +20,11 @@ package org.apache.flink.runtime.io.network;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.MemorySegmentProvider;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.taskmanager.Task;
@@ -40,10 +42,8 @@ import java.util.List;
import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createDummyConnectionManager;
import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition;
import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.spy;
/**
@@ -64,7 +64,7 @@ public class NetworkEnvironmentTest {
public ExpectedException expectedException = ExpectedException.none();
/**
- * Verifies that {@link NetworkEnvironment#registerTask(Task)} sets up (un)bounded buffer pool
+ * Verifies that {@link Task#setupPartionsAndGates(ResultPartitionWriter[], InputGate[])}} sets up (un)bounded buffer pool
* instances for various types of input and output channels.
*/
@Test
@@ -81,18 +81,13 @@ public class NetworkEnvironmentTest {
final ResultPartition[] resultPartitions = new ResultPartition[] {rp1, rp2, rp3, rp4};
// input gates
- SingleInputGate ig1 = createSingleInputGate(ResultPartitionType.PIPELINED, 2);
- SingleInputGate ig2 = createSingleInputGate(ResultPartitionType.BLOCKING, 2);
- SingleInputGate ig3 = createSingleInputGate(ResultPartitionType.PIPELINED_BOUNDED, 2);
- SingleInputGate ig4 = createSingleInputGate(ResultPartitionType.PIPELINED_BOUNDED, 8);
+ SingleInputGate ig1 = createSingleInputGate(network, ResultPartitionType.PIPELINED, 2);
+ SingleInputGate ig2 = createSingleInputGate(network, ResultPartitionType.BLOCKING, 2);
+ SingleInputGate ig3 = createSingleInputGate(network, ResultPartitionType.PIPELINED_BOUNDED, 2);
+ SingleInputGate ig4 = createSingleInputGate(network, ResultPartitionType.PIPELINED_BOUNDED, 8);
final SingleInputGate[] inputGates = new SingleInputGate[] {ig1, ig2, ig3, ig4};
- // overall task to register
- Task task = mock(Task.class);
- when(task.getProducedPartitions()).thenReturn(resultPartitions);
- when(task.getAllInputGates()).thenReturn(inputGates);
-
- network.registerTask(task);
+ Task.setupPartionsAndGates(resultPartitions, inputGates);
// verify buffer pools for the result partitions
assertEquals(rp1.getNumberOfSubpartitions(), rp1.getBufferPool().getNumberOfRequiredMemorySegments());
@@ -133,7 +128,7 @@ public class NetworkEnvironmentTest {
}
/**
- * Verifies that {@link NetworkEnvironment#registerTask(Task)} sets up (un)bounded buffer pool
+ * Verifies that {@link Task#setupPartionsAndGates(ResultPartitionWriter[], InputGate[])}} sets up (un)bounded buffer pool
* instances for various types of input and output channels working with the bare minimum of
* required buffers.
*/
@@ -153,7 +148,7 @@ public class NetworkEnvironmentTest {
}
/**
- * Verifies that {@link NetworkEnvironment#registerTask(Task)} fails if the bare minimum of
+ * Verifies that {@link Task#setupPartionsAndGates(ResultPartitionWriter[], InputGate[])}} fails if the bare minimum of
* required buffers is not available (we are one buffer short).
*/
@Test
@@ -189,10 +184,10 @@ public class NetworkEnvironmentTest {
final ResultPartition[] resultPartitions = new ResultPartition[] {rp1, rp2, rp3, rp4};
// input gates
- SingleInputGate ig1 = createSingleInputGate(ResultPartitionType.PIPELINED, 2);
- SingleInputGate ig2 = createSingleInputGate(ResultPartitionType.BLOCKING, 2);
- SingleInputGate ig3 = createSingleInputGate(ResultPartitionType.PIPELINED_BOUNDED, 2);
- SingleInputGate ig4 = createSingleInputGate(ResultPartitionType.PIPELINED_BOUNDED, 4);
+ SingleInputGate ig1 = createSingleInputGate(network, ResultPartitionType.PIPELINED, 2);
+ SingleInputGate ig2 = createSingleInputGate(network, ResultPartitionType.BLOCKING, 2);
+ SingleInputGate ig3 = createSingleInputGate(network, ResultPartitionType.PIPELINED_BOUNDED, 2);
+ SingleInputGate ig4 = createSingleInputGate(network, ResultPartitionType.PIPELINED_BOUNDED, 4);
final SingleInputGate[] inputGates = new SingleInputGate[] {ig1, ig2, ig3, ig4};
// set up remote input channels for the exclusive buffers of the credit-based flow control
@@ -213,12 +208,7 @@ public class NetworkEnvironmentTest {
createRemoteInputChannel(ig3, 3, rp4, connManager, network.getNetworkBufferPool());
}
- // overall task to register
- Task task = mock(Task.class);
- when(task.getProducedPartitions()).thenReturn(resultPartitions);
- when(task.getAllInputGates()).thenReturn(inputGates);
-
- network.registerTask(task);
+ Task.setupPartionsAndGates(resultPartitions, inputGates);
// verify buffer pools for the result partitions
assertEquals(Integer.MAX_VALUE, rp1.getBufferPool().getMaxNumberOfMemorySegments());
@@ -260,8 +250,10 @@ public class NetworkEnvironmentTest {
/**
* Helper to create spy of a {@link SingleInputGate} for use by a {@link Task} inside
- * {@link NetworkEnvironment#registerTask(Task)}.
+ * {@link Task#setupPartionsAndGates(ResultPartitionWriter[], InputGate[])}}.
*
+ * @param network
+ * network enviroment to create buffer pool factory for {@link SingleInputGate}
* @param partitionType
* the consumed partition type
* @param numberOfChannels
@@ -269,11 +261,14 @@ public class NetworkEnvironmentTest {
*
* @return input gate with some fake settiFngs
*/
- private SingleInputGate createSingleInputGate(ResultPartitionType partitionType, int numberOfChannels) {
+ private SingleInputGate createSingleInputGate(
+ NetworkEnvironment network, ResultPartitionType partitionType, int numberOfChannels) {
+
return spy(new SingleInputGateBuilder()
.setNumberOfChannels(numberOfChannels)
.setResultPartitionType(partitionType)
.setIsCreditBased(enableCreditBasedFlowControl)
+ .setupBufferPoolFactory(network)
.build());
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
index 6ed5f21..14ebabc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
@@ -33,6 +34,7 @@ import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.util.function.SupplierWithException;
import org.junit.Test;
@@ -311,6 +313,9 @@ public class InputGateFairnessTest {
// ------------------------------------------------------------------------
private static class FairnessVerifyingInputGate extends SingleInputGate {
+ private static final SupplierWithException<BufferPool, IOException> STUB_BUFFER_POOL_FACTORY = () -> {
+ throw new UnsupportedOperationException();
+ };
private final ArrayDeque<InputChannel> channelsWithData;
@@ -327,7 +332,8 @@ public class InputGateFairnessTest {
boolean isCreditBased) {
super(owningTaskName, jobId, consumedResultId, ResultPartitionType.PIPELINED,
- consumedSubpartitionIndex, numberOfInputChannels, taskActions, new SimpleCounter(), isCreditBased);
+ consumedSubpartitionIndex, numberOfInputChannels, taskActions, new SimpleCounter(),
+ isCreditBased, STUB_BUFFER_POOL_FACTORY);
try {
Field f = SingleInputGate.class.getDeclaredField("inputChannelsWithData");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
index 0e25368..6a7c4b1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
@@ -56,7 +56,7 @@ public class ResultPartitionBuilder {
private boolean sendScheduleOrUpdateConsumersMessage = false;
- private NetworkBufferPool networkBufferPool = new NetworkBufferPool(1, 1,1);
+ private NetworkBufferPool networkBufferPool = new NetworkBufferPool(1, 1, 1);
private int networkBuffersPerChannel = 1;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateTestBase.java
index 4d254c0..99584c6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateTestBase.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.junit.runner.RunWith;
@@ -74,17 +75,22 @@ public abstract class InputGateTestBase {
}
protected SingleInputGate createInputGate(int numberOfInputChannels) {
- return createInputGate(numberOfInputChannels, ResultPartitionType.PIPELINED);
+ return createInputGate(null, numberOfInputChannels, ResultPartitionType.PIPELINED);
}
protected SingleInputGate createInputGate(
- int numberOfInputChannels, ResultPartitionType partitionType) {
- SingleInputGate inputGate = new SingleInputGateBuilder()
+ NetworkEnvironment environment, int numberOfInputChannels, ResultPartitionType partitionType) {
+
+ SingleInputGateBuilder builder = new SingleInputGateBuilder()
.setNumberOfChannels(numberOfInputChannels)
.setResultPartitionType(partitionType)
- .setIsCreditBased(enableCreditBasedFlowControl)
- .build();
+ .setIsCreditBased(enableCreditBasedFlowControl);
+
+ if (environment != null) {
+ builder = builder.setupBufferPoolFactory(environment);
+ }
+ SingleInputGate inputGate = builder.build();
assertEquals(partitionType, inputGate.getConsumedPartitionType());
return inputGate;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
index 28394a3..6f27baf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
@@ -21,10 +21,16 @@ package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.util.function.SupplierWithException;
+
+import java.io.IOException;
/**
* Utility class to encapsulate the logic of building a {@link SingleInputGate} instance.
@@ -47,6 +53,10 @@ public class SingleInputGateBuilder {
private boolean isCreditBased = true;
+ private SupplierWithException<BufferPool, IOException> bufferPoolFactory = () -> {
+ throw new UnsupportedOperationException();
+ };
+
public SingleInputGateBuilder setResultPartitionType(ResultPartitionType partitionType) {
this.partitionType = partitionType;
return this;
@@ -67,6 +77,18 @@ public class SingleInputGateBuilder {
return this;
}
+ public SingleInputGateBuilder setupBufferPoolFactory(NetworkEnvironment environment) {
+ NetworkEnvironmentConfiguration config = environment.getConfiguration();
+ this.bufferPoolFactory = SingleInputGateFactory.createBufferPoolFactory(
+ environment.getNetworkBufferPool(),
+ config.isCreditBased(),
+ config.networkBuffersPerChannel(),
+ config.floatingNetworkBuffersPerGate(),
+ numberOfChannels,
+ partitionType);
+ return this;
+ }
+
public SingleInputGate build() {
return new SingleInputGate(
"Single Input Gate",
@@ -77,6 +99,7 @@ public class SingleInputGateBuilder {
numberOfChannels,
taskActions,
numBytesInCounter,
- isCreditBased);
+ isCreditBased,
+ bufferPoolFactory);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index dd031f8..93c3375 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -391,17 +391,17 @@ public class SingleInputGateTest extends InputGateTestBase {
*/
@Test
public void testRequestBuffersWithRemoteInputChannel() throws Exception {
- final SingleInputGate inputGate = createInputGate(1, ResultPartitionType.PIPELINED_BOUNDED);
+ final NetworkEnvironment network = createNetworkEnvironment();
+ final SingleInputGate inputGate = createInputGate(network, 1, ResultPartitionType.PIPELINED_BOUNDED);
int buffersPerChannel = 2;
int extraNetworkBuffersPerGate = 8;
- final NetworkEnvironment network = createNetworkEnvironment();
try {
RemoteInputChannel remote =
InputChannelBuilder.newBuilder()
.setupFromNetworkEnvironment(network)
.buildRemoteAndSetToGate(inputGate);
- network.setupInputGate(inputGate);
+ inputGate.setup();
NetworkBufferPool bufferPool = network.getNetworkBufferPool();
if (enableCreditBasedFlowControl) {
@@ -427,16 +427,16 @@ public class SingleInputGateTest extends InputGateTestBase {
*/
@Test
public void testRequestBuffersWithUnknownInputChannel() throws Exception {
- final SingleInputGate inputGate = createInputGate(1, ResultPartitionType.PIPELINED_BOUNDED);
+ final NetworkEnvironment network = createNetworkEnvironment();
+ final SingleInputGate inputGate = createInputGate(network, 1, ResultPartitionType.PIPELINED_BOUNDED);
int buffersPerChannel = 2;
int extraNetworkBuffersPerGate = 8;
- final NetworkEnvironment network = createNetworkEnvironment();
try {
final ResultPartitionID resultPartitionId = new ResultPartitionID();
addUnknownInputChannel(network, inputGate, resultPartitionId, 0);
- network.setupInputGate(inputGate);
+ inputGate.setup();
NetworkBufferPool bufferPool = network.getNetworkBufferPool();
if (enableCreditBasedFlowControl) {
@@ -478,8 +478,8 @@ public class SingleInputGateTest extends InputGateTestBase {
*/
@Test
public void testUpdateUnknownInputChannel() throws Exception {
- final SingleInputGate inputGate = createInputGate(2);
final NetworkEnvironment network = createNetworkEnvironment();
+ final SingleInputGate inputGate = createInputGate(network, 2, ResultPartitionType.PIPELINED);
try {
final ResultPartitionID localResultPartitionId = new ResultPartitionID();
@@ -488,7 +488,7 @@ public class SingleInputGateTest extends InputGateTestBase {
final ResultPartitionID remoteResultPartitionId = new ResultPartitionID();
addUnknownInputChannel(network, inputGate, remoteResultPartitionId, 1);
- network.setupInputGate(inputGate);
+ inputGate.setup();
assertThat(inputGate.getInputChannels().get(remoteResultPartitionId.getPartitionId()),
is(instanceOf((UnknownInputChannel.class))));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 4c878db..75c7c89 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -33,6 +33,10 @@ import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
@@ -51,8 +55,10 @@ import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
@@ -262,7 +268,29 @@ public class TaskTest extends TestLogger {
}
@Test
- public void testExecutionFailsInNetworkRegistration() throws Exception {
+ public void testExecutionFailsInNetworkRegistrationForPartitions() throws Exception {
+ ResultPartitionDeploymentDescriptor dummyPartition = new ResultPartitionDeploymentDescriptor(
+ new IntermediateDataSetID(), new IntermediateResultPartitionID(),
+ ResultPartitionType.PIPELINED, 1, 1, true);
+ testExecutionFailsInNetworkRegistration(Collections.singleton(dummyPartition), Collections.emptyList());
+ }
+
+ @Test
+ public void testExecutionFailsInNetworkRegistrationForGates() throws Exception {
+ InputChannelDeploymentDescriptor dummyChannel =
+ new InputChannelDeploymentDescriptor(new ResultPartitionID(), ResultPartitionLocation.createLocal());
+ InputGateDeploymentDescriptor dummyGate = new InputGateDeploymentDescriptor(
+ new IntermediateDataSetID(), ResultPartitionType.PIPELINED, 0,
+ new InputChannelDeploymentDescriptor[] { dummyChannel });
+ testExecutionFailsInNetworkRegistration(Collections.emptyList(), Collections.singleton(dummyGate));
+ }
+
+ private void testExecutionFailsInNetworkRegistration(
+ Collection<ResultPartitionDeploymentDescriptor> resultPartitions,
+ Collection<InputGateDeploymentDescriptor> inputGates) throws Exception {
+
+ final String errorMessage = "Network buffer pool has already been destroyed.";
+
final ResultPartitionConsumableNotifier consumableNotifier = new NoOpResultPartitionConsumableNotifier();
final PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
@@ -271,6 +299,8 @@ public class TaskTest extends TestLogger {
.setTaskManagerActions(taskManagerActions)
.setConsumableNotifier(consumableNotifier)
.setPartitionProducerStateChecker(partitionProducerStateChecker)
+ .setResultPartitions(resultPartitions)
+ .setInputGates(inputGates)
.build();
// shut down the network to make the following task registration failure
@@ -282,10 +312,10 @@ public class TaskTest extends TestLogger {
// verify final state
assertEquals(ExecutionState.FAILED, task.getExecutionState());
assertTrue(task.isCanceledOrFailed());
- assertTrue(task.getFailureCause().getMessage().contains("NetworkEnvironment is shut down"));
+ assertTrue(task.getFailureCause().getMessage().contains(errorMessage));
taskManagerActions.validateListenerMessage(
- ExecutionState.FAILED, task, new IllegalStateException("NetworkEnvironment is shut down"));
+ ExecutionState.FAILED, task, new IllegalStateException(errorMessage));
}
@Test
@@ -935,6 +965,8 @@ public class TaskTest extends TestLogger {
private Configuration taskManagerConfig;
private ExecutionConfig executionConfig;
private Collection<PermanentBlobKey> requiredJarFileBlobKeys;
+ private Collection<ResultPartitionDeploymentDescriptor> resultPartitions = Collections.emptyList();
+ private Collection<InputGateDeploymentDescriptor> inputGates = Collections.emptyList();
{
invokable = TestInvokableCorrect.class;
@@ -1010,6 +1042,16 @@ public class TaskTest extends TestLogger {
return this;
}
+ public TaskBuilder setResultPartitions(Collection<ResultPartitionDeploymentDescriptor> resultPartitions) {
+ this.resultPartitions = resultPartitions;
+ return this;
+ }
+
+ public TaskBuilder setInputGates(Collection<InputGateDeploymentDescriptor> inputGates) {
+ this.inputGates = inputGates;
+ return this;
+ }
+
private Task build() throws Exception {
final JobID jobId = new JobID();
final JobVertexID jobVertexId = new JobVertexID();
@@ -1046,8 +1088,8 @@ public class TaskTest extends TestLogger {
new AllocationID(),
0,
0,
- Collections.emptyList(),
- Collections.emptyList(),
+ resultPartitions,
+ inputGates,
0,
mock(MemoryManager.class),
mock(IOManager.class),
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index 2ea957d..4284074 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -204,6 +204,10 @@ public class BarrierBufferMassiveRandomTest {
}
@Override
+ public void setup() {
+ }
+
+ @Override
public void close() {
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index 30941ab..b37dee4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -65,6 +65,10 @@ public class MockInputGate extends InputGate {
}
@Override
+ public void setup() {
+ }
+
+ @Override
public int getNumberOfInputChannels() {
return numberOfChannels;
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index 4b542e0..af59220 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -264,7 +264,7 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
InputChannelTestUtils.newUnregisteredInputChannelMetrics(),
new SimpleCounter());
- environment.setupInputGate(gate);
+ gate.setup();
gates[channel] = gate;
}