You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/10 07:15:16 UTC
[flink] 01/02: [hotfix][test] Clean up NettyShuffleEnvironmentTest
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4ed73a51174353dfff98e485798c5fc8dd9bd20b
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Tue Jul 9 15:10:07 2019 +0200
[hotfix][test] Clean up NettyShuffleEnvironmentTest
---
.../io/network/NettyShuffleEnvironmentTest.java | 60 +++++++++++-----------
1 file changed, 29 insertions(+), 31 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
index ba70a47..51f1c04 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
@@ -85,15 +85,6 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
/**
* Verifies that {@link Task#setupPartitionsAndGates(ResultPartitionWriter[], InputGate[])}} sets up (un)bounded buffer pool
- * instances for various types of input and output channels.
- */
- @Test
- public void testRegisterTaskUsesBoundedBuffers() throws Exception {
- testRegisterTaskWithLimitedBuffers(NettyShuffleEnvironmentBuilder.DEFAULT_NUM_NETWORK_BUFFERS, false);
- }
-
- /**
- * Verifies that {@link Task#setupPartitionsAndGates(ResultPartitionWriter[], InputGate[])}} sets up (un)bounded buffer pool
* instances for various types of input and output channels working with the bare minimum of
* required buffers.
*/
@@ -109,7 +100,7 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
bufferCount = 10 + 10 * NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue();
}
- testRegisterTaskWithLimitedBuffers(bufferCount, true);
+ testRegisterTaskWithLimitedBuffers(bufferCount);
}
/**
@@ -130,10 +121,10 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
expectedException.expect(IOException.class);
expectedException.expectMessage("Insufficient number of network buffers");
- testRegisterTaskWithLimitedBuffers(bufferCount, true);
+ testRegisterTaskWithLimitedBuffers(bufferCount);
}
- private void testRegisterTaskWithLimitedBuffers(int bufferPoolSize, boolean assertNumBuffers) throws Exception {
+ private void testRegisterTaskWithLimitedBuffers(int bufferPoolSize) throws Exception {
final NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder()
.setNumNetworkBuffers(bufferPoolSize)
.setIsCreditBased(enableCreditBasedFlowControl)
@@ -141,18 +132,27 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
final ConnectionManager connManager = createDummyConnectionManager();
+ int channels = 2;
+ int rp4Channels = 4;
+ int floatingBuffers = network.getConfiguration().floatingNetworkBuffersPerGate();
+ int exclusiveBuffers = network.getConfiguration().networkBuffersPerChannel();
+
+ int expectedBuffers = channels * exclusiveBuffers + floatingBuffers;
+ int expectedRp4Buffers = rp4Channels * exclusiveBuffers + floatingBuffers;
+
// result partitions
- ResultPartition rp1 = createPartition(network, ResultPartitionType.PIPELINED, 2);
- ResultPartition rp2 = createPartition(network, fileChannelManager, ResultPartitionType.BLOCKING, 2);
- ResultPartition rp3 = createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, 2);
- ResultPartition rp4 = createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, 4);
+ ResultPartition rp1 = createPartition(network, ResultPartitionType.PIPELINED, channels);
+ ResultPartition rp2 = createPartition(network, fileChannelManager, ResultPartitionType.BLOCKING, channels);
+ ResultPartition rp3 = createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, channels);
+ ResultPartition rp4 = createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, rp4Channels);
+
final ResultPartition[] resultPartitions = new ResultPartition[] {rp1, rp2, rp3, rp4};
// input gates
- SingleInputGate ig1 = createSingleInputGate(network, ResultPartitionType.PIPELINED, 2);
- SingleInputGate ig2 = createSingleInputGate(network, ResultPartitionType.BLOCKING, 2);
- SingleInputGate ig3 = createSingleInputGate(network, ResultPartitionType.PIPELINED_BOUNDED, 2);
- SingleInputGate ig4 = createSingleInputGate(network, ResultPartitionType.PIPELINED_BOUNDED, 4);
+ SingleInputGate ig1 = createSingleInputGate(network, ResultPartitionType.PIPELINED, channels);
+ SingleInputGate ig2 = createSingleInputGate(network, ResultPartitionType.BLOCKING, channels);
+ SingleInputGate ig3 = createSingleInputGate(network, ResultPartitionType.PIPELINED_BOUNDED, channels);
+ SingleInputGate ig4 = createSingleInputGate(network, ResultPartitionType.PIPELINED_BOUNDED, rp4Channels);
final SingleInputGate[] inputGates = new SingleInputGate[] {ig1, ig2, ig3, ig4};
createRemoteInputChannel(ig4, 0, rp1, connManager, network.getNetworkBufferPool());
@@ -174,27 +174,25 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
// verify buffer pools for the result partitions
assertEquals(Integer.MAX_VALUE, rp1.getBufferPool().getMaxNumberOfMemorySegments());
assertEquals(Integer.MAX_VALUE, rp2.getBufferPool().getMaxNumberOfMemorySegments());
- assertEquals(2 * 2 + 8, rp3.getBufferPool().getMaxNumberOfMemorySegments());
- assertEquals(4 * 2 + 8, rp4.getBufferPool().getMaxNumberOfMemorySegments());
+ assertEquals(expectedBuffers, rp3.getBufferPool().getMaxNumberOfMemorySegments());
+ assertEquals(expectedRp4Buffers, rp4.getBufferPool().getMaxNumberOfMemorySegments());
for (ResultPartition rp : resultPartitions) {
assertEquals(rp.getNumberOfSubpartitions(), rp.getBufferPool().getNumberOfRequiredMemorySegments());
- if (assertNumBuffers) {
- assertEquals(rp.getNumberOfSubpartitions(), rp.getBufferPool().getNumBuffers());
- }
+ assertEquals(rp.getNumberOfSubpartitions(), rp.getBufferPool().getNumBuffers());
}
// verify buffer pools for the input gates (NOTE: credit-based uses minimum required buffers
// for exclusive buffers not managed by the buffer pool)
- assertEquals(enableCreditBasedFlowControl ? 0 : 2, ig1.getBufferPool().getNumberOfRequiredMemorySegments());
- assertEquals(enableCreditBasedFlowControl ? 0 : 2, ig2.getBufferPool().getNumberOfRequiredMemorySegments());
- assertEquals(enableCreditBasedFlowControl ? 0 : 2, ig3.getBufferPool().getNumberOfRequiredMemorySegments());
- assertEquals(enableCreditBasedFlowControl ? 0 : 4, ig4.getBufferPool().getNumberOfRequiredMemorySegments());
+ assertEquals(enableCreditBasedFlowControl ? 0 : channels, ig1.getBufferPool().getNumberOfRequiredMemorySegments());
+ assertEquals(enableCreditBasedFlowControl ? 0 : channels, ig2.getBufferPool().getNumberOfRequiredMemorySegments());
+ assertEquals(enableCreditBasedFlowControl ? 0 : channels, ig3.getBufferPool().getNumberOfRequiredMemorySegments());
+ assertEquals(enableCreditBasedFlowControl ? 0 : rp4Channels, ig4.getBufferPool().getNumberOfRequiredMemorySegments());
assertEquals(Integer.MAX_VALUE, ig1.getBufferPool().getMaxNumberOfMemorySegments());
assertEquals(Integer.MAX_VALUE, ig2.getBufferPool().getMaxNumberOfMemorySegments());
- assertEquals(enableCreditBasedFlowControl ? 8 : 2 * 2 + 8, ig3.getBufferPool().getMaxNumberOfMemorySegments());
- assertEquals(enableCreditBasedFlowControl ? 8 : 4 * 2 + 8, ig4.getBufferPool().getMaxNumberOfMemorySegments());
+ assertEquals(enableCreditBasedFlowControl ? floatingBuffers : expectedBuffers, ig3.getBufferPool().getMaxNumberOfMemorySegments());
+ assertEquals(enableCreditBasedFlowControl ? floatingBuffers : expectedRp4Buffers, ig4.getBufferPool().getMaxNumberOfMemorySegments());
int invokations = enableCreditBasedFlowControl ? 1 : 0;
verify(ig1, times(invokations)).assignExclusiveSegments();