You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/10 07:15:16 UTC

[flink] 01/02: [hotfix][test] Clean up NettyShuffleEnvironmentTest

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4ed73a51174353dfff98e485798c5fc8dd9bd20b
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Tue Jul 9 15:10:07 2019 +0200

    [hotfix][test] Clean up NettyShuffleEnvironmentTest
---
 .../io/network/NettyShuffleEnvironmentTest.java    | 60 +++++++++++-----------
 1 file changed, 29 insertions(+), 31 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
index ba70a47..51f1c04 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
@@ -85,15 +85,6 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
 
 	/**
 	 * Verifies that {@link Task#setupPartitionsAndGates(ResultPartitionWriter[], InputGate[])}} sets up (un)bounded buffer pool
-	 * instances for various types of input and output channels.
-	 */
-	@Test
-	public void testRegisterTaskUsesBoundedBuffers() throws Exception {
-		testRegisterTaskWithLimitedBuffers(NettyShuffleEnvironmentBuilder.DEFAULT_NUM_NETWORK_BUFFERS, false);
-	}
-
-	/**
-	 * Verifies that {@link Task#setupPartitionsAndGates(ResultPartitionWriter[], InputGate[])}} sets up (un)bounded buffer pool
 	 * instances for various types of input and output channels working with the bare minimum of
 	 * required buffers.
 	 */
@@ -109,7 +100,7 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
 			bufferCount = 10 + 10 * NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue();
 		}
 
-		testRegisterTaskWithLimitedBuffers(bufferCount, true);
+		testRegisterTaskWithLimitedBuffers(bufferCount);
 	}
 
 	/**
@@ -130,10 +121,10 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
 
 		expectedException.expect(IOException.class);
 		expectedException.expectMessage("Insufficient number of network buffers");
-		testRegisterTaskWithLimitedBuffers(bufferCount, true);
+		testRegisterTaskWithLimitedBuffers(bufferCount);
 	}
 
-	private void testRegisterTaskWithLimitedBuffers(int bufferPoolSize, boolean assertNumBuffers) throws Exception {
+	private void testRegisterTaskWithLimitedBuffers(int bufferPoolSize) throws Exception {
 		final NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder()
 			.setNumNetworkBuffers(bufferPoolSize)
 			.setIsCreditBased(enableCreditBasedFlowControl)
@@ -141,18 +132,27 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
 
 		final ConnectionManager connManager = createDummyConnectionManager();
 
+		int channels = 2;
+		int rp4Channels = 4;
+		int floatingBuffers = network.getConfiguration().floatingNetworkBuffersPerGate();
+		int exclusiveBuffers = network.getConfiguration().networkBuffersPerChannel();
+
+		int expectedBuffers = channels * exclusiveBuffers + floatingBuffers;
+		int expectedRp4Buffers = rp4Channels * exclusiveBuffers + floatingBuffers;
+
 		// result partitions
-		ResultPartition rp1 = createPartition(network, ResultPartitionType.PIPELINED, 2);
-		ResultPartition rp2 = createPartition(network, fileChannelManager, ResultPartitionType.BLOCKING, 2);
-		ResultPartition rp3 = createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, 2);
-		ResultPartition rp4 = createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, 4);
+		ResultPartition rp1 = createPartition(network, ResultPartitionType.PIPELINED, channels);
+		ResultPartition rp2 = createPartition(network, fileChannelManager, ResultPartitionType.BLOCKING, channels);
+		ResultPartition rp3 = createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, channels);
+		ResultPartition rp4 = createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, rp4Channels);
+
 		final ResultPartition[] resultPartitions = new ResultPartition[] {rp1, rp2, rp3, rp4};
 
 		// input gates
-		SingleInputGate ig1 = createSingleInputGate(network, ResultPartitionType.PIPELINED, 2);
-		SingleInputGate ig2 = createSingleInputGate(network, ResultPartitionType.BLOCKING, 2);
-		SingleInputGate ig3 = createSingleInputGate(network, ResultPartitionType.PIPELINED_BOUNDED, 2);
-		SingleInputGate ig4 = createSingleInputGate(network, ResultPartitionType.PIPELINED_BOUNDED, 4);
+		SingleInputGate ig1 = createSingleInputGate(network, ResultPartitionType.PIPELINED, channels);
+		SingleInputGate ig2 = createSingleInputGate(network, ResultPartitionType.BLOCKING, channels);
+		SingleInputGate ig3 = createSingleInputGate(network, ResultPartitionType.PIPELINED_BOUNDED, channels);
+		SingleInputGate ig4 = createSingleInputGate(network, ResultPartitionType.PIPELINED_BOUNDED, rp4Channels);
 		final SingleInputGate[] inputGates = new SingleInputGate[] {ig1, ig2, ig3, ig4};
 
 		createRemoteInputChannel(ig4, 0, rp1, connManager, network.getNetworkBufferPool());
@@ -174,27 +174,25 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
 		// verify buffer pools for the result partitions
 		assertEquals(Integer.MAX_VALUE, rp1.getBufferPool().getMaxNumberOfMemorySegments());
 		assertEquals(Integer.MAX_VALUE, rp2.getBufferPool().getMaxNumberOfMemorySegments());
-		assertEquals(2 * 2 + 8, rp3.getBufferPool().getMaxNumberOfMemorySegments());
-		assertEquals(4 * 2 + 8, rp4.getBufferPool().getMaxNumberOfMemorySegments());
+		assertEquals(expectedBuffers, rp3.getBufferPool().getMaxNumberOfMemorySegments());
+		assertEquals(expectedRp4Buffers, rp4.getBufferPool().getMaxNumberOfMemorySegments());
 
 		for (ResultPartition rp : resultPartitions) {
 			assertEquals(rp.getNumberOfSubpartitions(), rp.getBufferPool().getNumberOfRequiredMemorySegments());
-			if (assertNumBuffers) {
-				assertEquals(rp.getNumberOfSubpartitions(), rp.getBufferPool().getNumBuffers());
-			}
+			assertEquals(rp.getNumberOfSubpartitions(), rp.getBufferPool().getNumBuffers());
 		}
 
 		// verify buffer pools for the input gates (NOTE: credit-based uses minimum required buffers
 		// for exclusive buffers not managed by the buffer pool)
-		assertEquals(enableCreditBasedFlowControl ? 0 : 2, ig1.getBufferPool().getNumberOfRequiredMemorySegments());
-		assertEquals(enableCreditBasedFlowControl ? 0 : 2, ig2.getBufferPool().getNumberOfRequiredMemorySegments());
-		assertEquals(enableCreditBasedFlowControl ? 0 : 2, ig3.getBufferPool().getNumberOfRequiredMemorySegments());
-		assertEquals(enableCreditBasedFlowControl ? 0 : 4, ig4.getBufferPool().getNumberOfRequiredMemorySegments());
+		assertEquals(enableCreditBasedFlowControl ? 0 : channels, ig1.getBufferPool().getNumberOfRequiredMemorySegments());
+		assertEquals(enableCreditBasedFlowControl ? 0 : channels, ig2.getBufferPool().getNumberOfRequiredMemorySegments());
+		assertEquals(enableCreditBasedFlowControl ? 0 : channels, ig3.getBufferPool().getNumberOfRequiredMemorySegments());
+		assertEquals(enableCreditBasedFlowControl ? 0 : rp4Channels, ig4.getBufferPool().getNumberOfRequiredMemorySegments());
 
 		assertEquals(Integer.MAX_VALUE, ig1.getBufferPool().getMaxNumberOfMemorySegments());
 		assertEquals(Integer.MAX_VALUE, ig2.getBufferPool().getMaxNumberOfMemorySegments());
-		assertEquals(enableCreditBasedFlowControl ? 8 : 2 * 2 + 8, ig3.getBufferPool().getMaxNumberOfMemorySegments());
-		assertEquals(enableCreditBasedFlowControl ? 8 : 4 * 2 + 8, ig4.getBufferPool().getMaxNumberOfMemorySegments());
+		assertEquals(enableCreditBasedFlowControl ? floatingBuffers : expectedBuffers, ig3.getBufferPool().getMaxNumberOfMemorySegments());
+		assertEquals(enableCreditBasedFlowControl ? floatingBuffers : expectedRp4Buffers, ig4.getBufferPool().getMaxNumberOfMemorySegments());
 
 		int invokations = enableCreditBasedFlowControl ? 1 : 0;
 		verify(ig1, times(invokations)).assignExclusiveSegments();