You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/04/22 20:40:49 UTC
[13/17] flink git commit: [FLINK-9076] [network] Reduce minimum
number of floating buffers to 0
[FLINK-9076] [network] Reduce minimum number of floating buffers to 0
This closes #5874
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c491400b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c491400b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c491400b
Branch: refs/heads/master
Commit: c491400bcb354d242396302a5561920cb525dbae
Parents: 6581914
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Apr 6 11:15:10 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Apr 22 16:28:35 2018 +0200
----------------------------------------------------------------------
.../flink/configuration/TaskManagerOptions.java | 2 +-
.../runtime/io/network/NetworkEnvironment.java | 5 +-
.../io/network/NetworkEnvironmentTest.java | 259 ++++++++++++++++---
.../partition/InputChannelTestUtils.java | 2 +-
.../StreamNetworkBenchmarkEnvironment.java | 34 ++-
.../StreamNetworkPointToPointBenchmark.java | 2 +-
.../StreamNetworkThroughputBenchmark.java | 14 +-
.../StreamNetworkThroughputBenchmarkTest.java | 45 ++++
...SuccessAfterNetworkBuffersFailureITCase.java | 2 +-
9 files changed, 318 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c491400b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index 2bd3091..b726e07 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -275,7 +275,7 @@ public class TaskManagerOptions {
public static final ConfigOption<Integer> NETWORK_BUFFERS_PER_CHANNEL =
key("taskmanager.network.memory.buffers-per-channel")
.defaultValue(2)
- .withDescription("Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel)." +
+ .withDescription("Maximum number of network buffers to use for each outgoing/incoming channel (subpartition/input channel)." +
"In credit-based flow control mode, this indicates how many credits are exclusive in each input channel. It should be" +
" configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is" +
" for parallel serialization.");
http://git-wip-us.apache.org/repos/asf/flink/blob/c491400b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 23ac911..0a9dc0f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -234,10 +234,9 @@ public class NetworkEnvironment {
maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ?
extraNetworkBuffersPerGate : Integer.MAX_VALUE;
- // Create a buffer pool for floating buffers and assign exclusive buffers to input channels directly
- bufferPool = networkBufferPool.createBufferPool(extraNetworkBuffersPerGate,
- maxNumberOfMemorySegments);
+ // assign exclusive buffers to input channels directly and use the rest for floating buffers
gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel);
+ bufferPool = networkBufferPool.createBufferPool(0, maxNumberOfMemorySegments);
} else {
maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ?
gate.getNumberOfInputChannels() * networkBuffersPerChannel +
http://git-wip-us.apache.org/repos/asf/flink/blob/c491400b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index 24878ef..317a214 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -19,39 +19,60 @@
package org.apache.flink.runtime.io.network;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.junit.Rule;
import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createDummyConnectionManager;
import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.spy;
/**
* Various tests for the {@link NetworkEnvironment} class.
*/
+@RunWith(Parameterized.class)
public class NetworkEnvironmentTest {
private static final int numBuffers = 1024;
private static final int memorySegmentSize = 128;
+ @Parameterized.Parameter
+ public boolean enableCreditBasedFlowControl;
+
+ @Parameterized.Parameters(name = "Credit-based = {0}")
+ public static List<Boolean> parameters() {
+ return Arrays.asList(Boolean.TRUE, Boolean.FALSE);
+ }
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
/**
* Verifies that {@link NetworkEnvironment#registerTask(Task)} sets up (un)bounded buffer pool
* instances for various types of input and output channels.
@@ -72,7 +93,7 @@ public class NetworkEnvironmentTest {
0,
2,
8,
- true);
+ enableCreditBasedFlowControl);
// result partitions
ResultPartition rp1 = createResultPartition(ResultPartitionType.PIPELINED, 2);
@@ -82,10 +103,11 @@ public class NetworkEnvironmentTest {
final ResultPartition[] resultPartitions = new ResultPartition[] {rp1, rp2, rp3, rp4};
// input gates
- SingleInputGate ig1 = createSingleInputGateMock(ResultPartitionType.PIPELINED, 2);
- SingleInputGate ig2 = createSingleInputGateMock(ResultPartitionType.BLOCKING, 2);
- SingleInputGate ig3 = createSingleInputGateMock(ResultPartitionType.PIPELINED_BOUNDED, 2);
- final SingleInputGate[] inputGates = new SingleInputGate[] {ig1, ig2, ig3};
+ SingleInputGate ig1 = createSingleInputGate(ResultPartitionType.PIPELINED, 2);
+ SingleInputGate ig2 = createSingleInputGate(ResultPartitionType.BLOCKING, 2);
+ SingleInputGate ig3 = createSingleInputGate(ResultPartitionType.PIPELINED_BOUNDED, 2);
+ SingleInputGate ig4 = createSingleInputGate(ResultPartitionType.PIPELINED_BOUNDED, 8);
+ final SingleInputGate[] inputGates = new SingleInputGate[] {ig1, ig2, ig3, ig4};
// overall task to register
Task task = mock(Task.class);
@@ -94,15 +116,177 @@ public class NetworkEnvironmentTest {
network.registerTask(task);
+ // verify buffer pools for the result partitions
+ assertEquals(rp1.getNumberOfSubpartitions(), rp1.getBufferPool().getNumberOfRequiredMemorySegments());
+ assertEquals(rp2.getNumberOfSubpartitions(), rp2.getBufferPool().getNumberOfRequiredMemorySegments());
+ assertEquals(rp3.getNumberOfSubpartitions(), rp3.getBufferPool().getNumberOfRequiredMemorySegments());
+ assertEquals(rp4.getNumberOfSubpartitions(), rp4.getBufferPool().getNumberOfRequiredMemorySegments());
+
assertEquals(Integer.MAX_VALUE, rp1.getBufferPool().getMaxNumberOfMemorySegments());
assertEquals(Integer.MAX_VALUE, rp2.getBufferPool().getMaxNumberOfMemorySegments());
assertEquals(2 * 2 + 8, rp3.getBufferPool().getMaxNumberOfMemorySegments());
assertEquals(8 * 2 + 8, rp4.getBufferPool().getMaxNumberOfMemorySegments());
- verify(ig1, times(1)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
- verify(ig2, times(1)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
- verify(ig3, times(1)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
+ // verify buffer pools for the input gates (NOTE: credit-based uses minimum required buffers
+ // for exclusive buffers not managed by the buffer pool)
+ assertEquals(enableCreditBasedFlowControl ? 0 : 2, ig1.getBufferPool().getNumberOfRequiredMemorySegments());
+ assertEquals(enableCreditBasedFlowControl ? 0 : 2, ig2.getBufferPool().getNumberOfRequiredMemorySegments());
+ assertEquals(enableCreditBasedFlowControl ? 0 : 2, ig3.getBufferPool().getNumberOfRequiredMemorySegments());
+ assertEquals(enableCreditBasedFlowControl ? 0 : 8, ig4.getBufferPool().getNumberOfRequiredMemorySegments());
+
+ assertEquals(Integer.MAX_VALUE, ig1.getBufferPool().getMaxNumberOfMemorySegments());
+ assertEquals(Integer.MAX_VALUE, ig2.getBufferPool().getMaxNumberOfMemorySegments());
+ assertEquals(enableCreditBasedFlowControl ? 8 : 2 * 2 + 8, ig3.getBufferPool().getMaxNumberOfMemorySegments());
+ assertEquals(enableCreditBasedFlowControl ? 8 : 8 * 2 + 8, ig4.getBufferPool().getMaxNumberOfMemorySegments());
+
+ int invokations = enableCreditBasedFlowControl ? 1 : 0;
+ verify(ig1, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
+ verify(ig2, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
+ verify(ig3, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
+ verify(ig4, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
+
+ for (ResultPartition rp : resultPartitions) {
+ rp.release();
+ }
+ for (SingleInputGate ig : inputGates) {
+ ig.releaseAllResources();
+ }
+ network.shutdown();
+ }
+
+ /**
+ * Verifies that {@link NetworkEnvironment#registerTask(Task)} sets up (un)bounded buffer pool
+ * instances for various types of input and output channels working with the bare minimum of
+ * required buffers.
+ */
+ @Test
+ public void testRegisterTaskWithLimitedBuffers() throws Exception {
+ final int bufferCount;
+ // outgoing: 1 buffer per channel (always)
+ if (!enableCreditBasedFlowControl) {
+ // incoming: 1 buffer per channel
+ bufferCount = 20;
+ } else {
+ // incoming: 2 exclusive buffers per channel
+ bufferCount = 10 + 10 * TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue();
+ }
+
+ testRegisterTaskWithLimitedBuffers(bufferCount);
+ }
+
+ /**
+ * Verifies that {@link NetworkEnvironment#registerTask(Task)} fails if the bare minimum of
+ * required buffers is not available (we are one buffer short).
+ */
+ @Test
+ public void testRegisterTaskWithInsufficientBuffers() throws Exception {
+ final int bufferCount;
+ // outgoing: 1 buffer per channel (always)
+ if (!enableCreditBasedFlowControl) {
+ // incoming: 1 buffer per channel
+ bufferCount = 19;
+ } else {
+ // incoming: 2 exclusive buffers per channel
+ bufferCount = 10 + 10 * TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() - 1;
+ }
+
+ expectedException.expect(IOException.class);
+ expectedException.expectMessage("Insufficient number of network buffers");
+ testRegisterTaskWithLimitedBuffers(bufferCount);
+ }
+
+ private void testRegisterTaskWithLimitedBuffers(int bufferPoolSize) throws Exception {
+ final NetworkEnvironment network = new NetworkEnvironment(
+ new NetworkBufferPool(bufferPoolSize, memorySegmentSize),
+ new LocalConnectionManager(),
+ new ResultPartitionManager(),
+ new TaskEventDispatcher(),
+ new KvStateRegistry(),
+ null,
+ null,
+ IOManager.IOMode.SYNC,
+ 0,
+ 0,
+ 2,
+ 8,
+ enableCreditBasedFlowControl);
+
+ final ConnectionManager connManager = createDummyConnectionManager();
+
+ // result partitions
+ ResultPartition rp1 = createResultPartition(ResultPartitionType.PIPELINED, 2);
+ ResultPartition rp2 = createResultPartition(ResultPartitionType.BLOCKING, 2);
+ ResultPartition rp3 = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED, 2);
+ ResultPartition rp4 = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED, 4);
+ final ResultPartition[] resultPartitions = new ResultPartition[] {rp1, rp2, rp3, rp4};
+
+ // input gates
+ SingleInputGate ig1 = createSingleInputGate(ResultPartitionType.PIPELINED, 2);
+ SingleInputGate ig2 = createSingleInputGate(ResultPartitionType.BLOCKING, 2);
+ SingleInputGate ig3 = createSingleInputGate(ResultPartitionType.PIPELINED_BOUNDED, 2);
+ SingleInputGate ig4 = createSingleInputGate(ResultPartitionType.PIPELINED_BOUNDED, 4);
+ final SingleInputGate[] inputGates = new SingleInputGate[] {ig1, ig2, ig3, ig4};
+
+ // set up remote input channels for the exclusive buffers of the credit-based flow control
+ // (note that this does not obey the partition types which is ok for the scope of the test)
+ if (enableCreditBasedFlowControl) {
+ createRemoteInputChannel(ig4, 0, rp1, connManager);
+ createRemoteInputChannel(ig4, 0, rp2, connManager);
+ createRemoteInputChannel(ig4, 0, rp3, connManager);
+ createRemoteInputChannel(ig4, 0, rp4, connManager);
+ createRemoteInputChannel(ig1, 1, rp1, connManager);
+ createRemoteInputChannel(ig1, 1, rp4, connManager);
+
+ createRemoteInputChannel(ig2, 1, rp2, connManager);
+ createRemoteInputChannel(ig2, 2, rp4, connManager);
+
+ createRemoteInputChannel(ig3, 1, rp3, connManager);
+ createRemoteInputChannel(ig3, 3, rp4, connManager);
+ }
+
+ // overall task to register
+ Task task = mock(Task.class);
+ when(task.getProducedPartitions()).thenReturn(resultPartitions);
+ when(task.getAllInputGates()).thenReturn(inputGates);
+
+ network.registerTask(task);
+
+ // verify buffer pools for the result partitions
+ assertEquals(Integer.MAX_VALUE, rp1.getBufferPool().getMaxNumberOfMemorySegments());
+ assertEquals(Integer.MAX_VALUE, rp2.getBufferPool().getMaxNumberOfMemorySegments());
+ assertEquals(2 * 2 + 8, rp3.getBufferPool().getMaxNumberOfMemorySegments());
+ assertEquals(4 * 2 + 8, rp4.getBufferPool().getMaxNumberOfMemorySegments());
+
+ for (ResultPartition rp : resultPartitions) {
+ assertEquals(rp.getNumberOfSubpartitions(), rp.getBufferPool().getNumberOfRequiredMemorySegments());
+ assertEquals(rp.getNumberOfSubpartitions(), rp.getBufferPool().getNumBuffers());
+ }
+
+ // verify buffer pools for the input gates (NOTE: credit-based uses minimum required buffers
+ // for exclusive buffers not managed by the buffer pool)
+ assertEquals(enableCreditBasedFlowControl ? 0 : 2, ig1.getBufferPool().getNumberOfRequiredMemorySegments());
+ assertEquals(enableCreditBasedFlowControl ? 0 : 2, ig2.getBufferPool().getNumberOfRequiredMemorySegments());
+ assertEquals(enableCreditBasedFlowControl ? 0 : 2, ig3.getBufferPool().getNumberOfRequiredMemorySegments());
+ assertEquals(enableCreditBasedFlowControl ? 0 : 4, ig4.getBufferPool().getNumberOfRequiredMemorySegments());
+
+ assertEquals(Integer.MAX_VALUE, ig1.getBufferPool().getMaxNumberOfMemorySegments());
+ assertEquals(Integer.MAX_VALUE, ig2.getBufferPool().getMaxNumberOfMemorySegments());
+ assertEquals(enableCreditBasedFlowControl ? 8 : 2 * 2 + 8, ig3.getBufferPool().getMaxNumberOfMemorySegments());
+ assertEquals(enableCreditBasedFlowControl ? 8 : 4 * 2 + 8, ig4.getBufferPool().getMaxNumberOfMemorySegments());
+
+ int invokations = enableCreditBasedFlowControl ? 1 : 0;
+ verify(ig1, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
+ verify(ig2, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
+ verify(ig3, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
+ verify(ig4, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
+
+ for (ResultPartition rp : resultPartitions) {
+ rp.release();
+ }
+ for (SingleInputGate ig : inputGates) {
+ ig.releaseAllResources();
+ }
network.shutdown();
}
@@ -135,7 +319,7 @@ public class NetworkEnvironmentTest {
}
/**
- * Helper to create a mock of a {@link SingleInputGate} for use by a {@link Task} inside
+ * Helper to create spy of a {@link SingleInputGate} for use by a {@link Task} inside
* {@link NetworkEnvironment#registerTask(Task)}.
*
* @param partitionType
@@ -143,26 +327,35 @@ public class NetworkEnvironmentTest {
* @param channels
* the number of input channels
*
- * @return mock with minimal functionality necessary by {@link NetworkEnvironment#registerTask(Task)}
+ * @return input gate with some fake settings
*/
- private static SingleInputGate createSingleInputGateMock(
+ private static SingleInputGate createSingleInputGate(
final ResultPartitionType partitionType, final int channels) {
- SingleInputGate ig = mock(SingleInputGate.class);
- when(ig.getConsumedPartitionType()).thenReturn(partitionType);
- when(ig.getNumberOfInputChannels()).thenReturn(channels);
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(final InvocationOnMock invocation) throws Throwable {
- BufferPool bp = invocation.getArgumentAt(0, BufferPool.class);
- if (partitionType == ResultPartitionType.PIPELINED_BOUNDED) {
- assertEquals(8, bp.getMaxNumberOfMemorySegments());
- } else {
- assertEquals(Integer.MAX_VALUE, bp.getMaxNumberOfMemorySegments());
- }
- return null;
- }
- }).when(ig).setBufferPool(any(BufferPool.class));
-
- return ig;
+ return spy(new SingleInputGate(
+ "Test Task Name",
+ new JobID(),
+ new IntermediateDataSetID(),
+ partitionType,
+ 0,
+ channels,
+ mock(TaskActions.class),
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()));
+ }
+
+ private static void createRemoteInputChannel(
+ SingleInputGate inputGate,
+ int channelIndex,
+ ResultPartition resultPartition,
+ ConnectionManager connManager) {
+ RemoteInputChannel channel = new RemoteInputChannel(
+ inputGate,
+ channelIndex,
+ resultPartition.getPartitionId(),
+ mock(ConnectionID.class),
+ connManager,
+ 0,
+ 0,
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
+ inputGate.setInputChannel(resultPartition.getPartitionId().getPartitionId(), channel);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c491400b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
index 31effe1..f73ede7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
@@ -33,7 +33,7 @@ import static org.mockito.Mockito.when;
/**
* Some utility methods used for testing InputChannels and InputGates.
*/
-class InputChannelTestUtils {
+public class InputChannelTestUtils {
/**
* Creates a result partition manager that ignores all IDs, and simply returns the given
http://git-wip-us.apache.org/repos/asf/flink/blob/c491400b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index b1613f2..3e6dcf6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -93,21 +93,45 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
protected ResultPartitionID[] partitionIds;
- public void setUp(int writers, int channels, boolean localMode) throws Exception {
+ /**
+ * Sets up the environment including buffer pools and netty threads.
+ *
+ * @param writers
+ * number of writers
+ * @param channels
+ * outgoing channels per writer
+ * @param localMode
+ * only local channels?
+ * @param senderBufferPoolSize
+ * buffer pool size for the sender (set to <tt>-1</tt> for default)
+ * @param receiverBufferPoolSize
+ * buffer pool size for the receiver (set to <tt>-1</tt> for default)
+ */
+ public void setUp(
+ int writers,
+ int channels,
+ boolean localMode,
+ int senderBufferPoolSize,
+ int receiverBufferPoolSize) throws Exception {
this.localMode = localMode;
this.channels = channels;
this.partitionIds = new ResultPartitionID[writers];
+ if (senderBufferPoolSize == -1) {
+ senderBufferPoolSize = Math.max(2048, writers * channels * 4);
+ }
+ if (receiverBufferPoolSize == -1) {
+ receiverBufferPoolSize = Math.max(2048, writers * channels * 4);
+ }
ioManager = new IOManagerAsync();
- int bufferPoolSize = Math.max(2048, writers * channels * 4);
- senderEnv = createNettyNetworkEnvironment(bufferPoolSize);
+ senderEnv = createNettyNetworkEnvironment(senderBufferPoolSize);
senderEnv.start();
- if (localMode) {
+ if (localMode && senderBufferPoolSize == receiverBufferPoolSize) {
receiverEnv = senderEnv;
}
else {
- receiverEnv = createNettyNetworkEnvironment(bufferPoolSize);
+ receiverEnv = createNettyNetworkEnvironment(receiverBufferPoolSize);
receiverEnv.start();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c491400b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
index 7aa218e..6b96c62 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
@@ -70,7 +70,7 @@ public class StreamNetworkPointToPointBenchmark {
*/
public void setUp(long flushTimeout) throws Exception {
environment = new StreamNetworkBenchmarkEnvironment<>();
- environment.setUp(1, 1, false);
+ environment.setUp(1, 1, false, -1, -1);
receiver = environment.createReceiver();
recordWriter = environment.createRecordWriter(0, flushTimeout);
http://git-wip-us.apache.org/repos/asf/flink/blob/c491400b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
index fe08993..1b0ef8a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
@@ -59,6 +59,10 @@ public class StreamNetworkThroughputBenchmark {
setUp(recordWriters, channels, flushTimeout, false);
}
+ public void setUp(int recordWriters, int channels, int flushTimeout, boolean localMode) throws Exception {
+ setUp(recordWriters, channels, flushTimeout, false, -1, -1);
+ }
+
/**
* Initializes the throughput benchmark with the given parameters.
*
@@ -68,9 +72,15 @@ public class StreamNetworkThroughputBenchmark {
* @param channels
* number of outgoing channels / receivers
*/
- public void setUp(int recordWriters, int channels, int flushTimeout, boolean localMode) throws Exception {
+ public void setUp(
+ int recordWriters,
+ int channels,
+ int flushTimeout,
+ boolean localMode,
+ int senderBufferPoolSize,
+ int receiverBufferPoolSize) throws Exception {
environment = new StreamNetworkBenchmarkEnvironment<>();
- environment.setUp(recordWriters, channels, localMode);
+ environment.setUp(recordWriters, channels, localMode, senderBufferPoolSize, receiverBufferPoolSize);
receiver = environment.createReceiver();
writerThreads = new LongRecordWriterThread[recordWriters];
for (int writer = 0; writer < recordWriters; writer++) {
http://git-wip-us.apache.org/repos/asf/flink/blob/c491400b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java
index ba8fe27..dac8ee2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java
@@ -18,12 +18,21 @@
package org.apache.flink.streaming.runtime.io.benchmark;
+import org.apache.flink.configuration.TaskManagerOptions;
+
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
/**
* Tests for various network benchmarks based on {@link StreamNetworkThroughputBenchmark}.
*/
public class StreamNetworkThroughputBenchmarkTest {
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
@Test
public void pointToPointBenchmark() throws Exception {
StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark();
@@ -61,6 +70,42 @@ public class StreamNetworkThroughputBenchmarkTest {
}
@Test
+ public void remoteModeInsufficientBuffersSender() throws Exception {
+ StreamNetworkThroughputBenchmark env = new StreamNetworkThroughputBenchmark();
+ int writers = 2;
+ int channels = 2;
+
+ expectedException.expect(IOException.class);
+ expectedException.expectMessage("Insufficient number of network buffers");
+
+ env.setUp(writers, channels, 100, false, writers * channels - 1, writers * channels * TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue());
+ }
+
+ @Test
+ public void remoteModeInsufficientBuffersReceiver() throws Exception {
+ StreamNetworkThroughputBenchmark env = new StreamNetworkThroughputBenchmark();
+ int writers = 2;
+ int channels = 2;
+
+ expectedException.expect(IOException.class);
+ expectedException.expectMessage("Insufficient number of network buffers");
+
+ env.setUp(writers, channels, 100, false, writers * channels, writers * channels * TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() - 1);
+ }
+
+ @Test
+ public void remoteModeMinimumBuffers() throws Exception {
+ StreamNetworkThroughputBenchmark env = new StreamNetworkThroughputBenchmark();
+ int writers = 2;
+ int channels = 2;
+
+ env.setUp(writers, channels, 100, false, writers * channels, writers * channels *
+ TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue());
+ env.executeBenchmark(10_000);
+ env.tearDown();
+ }
+
+ @Test
public void pointToMultiPointBenchmark() throws Exception {
StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark();
benchmark.setUp(1, 100, 100);
http://git-wip-us.apache.org/repos/asf/flink/blob/c491400b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
index dbd0f79..16159c1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
@@ -60,7 +60,7 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
private static Configuration getConfiguration() {
Configuration config = new Configuration();
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L);
- config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1024);
+ config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800);
return config;
}