You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/09 12:46:13 UTC

[flink] 02/05: [hotfix][test] Move CloseableRegistry as field in InputBuffersMetricsTest

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 479b6890bf1e1e8f4634903975b4983c96913dc2
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Fri Jul 5 16:33:10 2019 +0200

    [hotfix][test] Move CloseableRegistry as field in InputBuffersMetricsTest
---
 .../consumer/InputBuffersMetricsTest.java          | 113 +++++++++++----------
 1 file changed, 59 insertions(+), 54 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
index a602648..8d868cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
@@ -30,6 +30,8 @@ import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -43,6 +45,18 @@ import static org.junit.Assert.assertEquals;
  */
 public class InputBuffersMetricsTest extends TestLogger {
 
+	private CloseableRegistry closeableRegistry;
+
+	@Before
+	public void setup() {
+		closeableRegistry = new CloseableRegistry();
+	}
+
+	@After
+	public void tearDown() throws IOException {
+		closeableRegistry.close();
+	}
+
 	@Test
 	public void testCalculateTotalBuffersSize() throws IOException {
 		int numberOfRemoteChannels = 2;
@@ -55,11 +69,13 @@ public class InputBuffersMetricsTest extends TestLogger {
 			.setNetworkBuffersPerChannel(numberOfBufferPerChannel)
 			.setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate)
 			.build();
+		closeableRegistry.registerCloseable(network::close);
 
 		SingleInputGate inputGate1 = buildInputGate(
 			network,
 			numberOfRemoteChannels,
 			numberOfLocalChannels).f0;
+		closeableRegistry.registerCloseable(inputGate1::close);
 
 		SingleInputGate[] inputGates = new SingleInputGate[]{inputGate1};
 		FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates);
@@ -69,15 +85,12 @@ public class InputBuffersMetricsTest extends TestLogger {
 			exclusiveBuffersUsageGauge,
 			inputGates);
 
-		try (CloseableRegistry closeableRegistry = new CloseableRegistry()) {
+		closeableRegistry.registerCloseable(network::close);
+		closeableRegistry.registerCloseable(inputGate1::close);
 
-			closeableRegistry.registerCloseable(network::close);
-			closeableRegistry.registerCloseable(inputGate1::close);
-
-			assertEquals(numberOfBuffersPerGate, floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
-			assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel, exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
-			assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel + numberOfBuffersPerGate, inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
-		}
+		assertEquals(numberOfBuffersPerGate, floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+		assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel, exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+		assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel + numberOfBuffersPerGate, inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
 	}
 
 	@Test
@@ -96,6 +109,7 @@ public class InputBuffersMetricsTest extends TestLogger {
 			.setNetworkBuffersPerChannel(buffersPerChannel)
 			.setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate)
 			.build();
+		closeableRegistry.registerCloseable(network::close);
 
 		Tuple2<SingleInputGate, List<RemoteInputChannel>> tuple1 = buildInputGate(
 			network,
@@ -108,6 +122,8 @@ public class InputBuffersMetricsTest extends TestLogger {
 
 		SingleInputGate inputGate1 = tuple1.f0;
 		SingleInputGate inputGate2 = tuple2.f0;
+		closeableRegistry.registerCloseable(inputGate1::close);
+		closeableRegistry.registerCloseable(inputGate2::close);
 
 		List<RemoteInputChannel> remoteInputChannels = tuple1.f1;
 
@@ -119,29 +135,22 @@ public class InputBuffersMetricsTest extends TestLogger {
 			exclusiveBuffersUsageGauge,
 			inputGates);
 
-		try (CloseableRegistry closeableRegistry = new CloseableRegistry()) {
-			assertEquals(0.0, exclusiveBuffersUsageGauge.getValue(), 0.0);
-			assertEquals(0.0, inputBuffersUsageGauge.getValue(), 0.0);
-
-			int totalBuffers = extraNetworkBuffersPerGate * inputGates.length + buffersPerChannel * totalNumberOfRemoteChannels;
-
-			int channelIndex = 1;
-			for (RemoteInputChannel channel : remoteInputChannels) {
-				drainAndValidate(
-					buffersPerChannel,
-					buffersPerChannel * channelIndex++,
-					channel,
-					closeableRegistry,
-					totalBuffers,
-					buffersPerChannel * totalNumberOfRemoteChannels,
-					exclusiveBuffersUsageGauge,
-					inputBuffersUsageGauge,
-					inputGate1);
-			}
-		} finally {
-			inputGate1.close();
-			inputGate2.close();
-			network.close();
+		assertEquals(0.0, exclusiveBuffersUsageGauge.getValue(), 0.0);
+		assertEquals(0.0, inputBuffersUsageGauge.getValue(), 0.0);
+
+		int totalBuffers = extraNetworkBuffersPerGate * inputGates.length + buffersPerChannel * totalNumberOfRemoteChannels;
+
+		int channelIndex = 1;
+		for (RemoteInputChannel channel : remoteInputChannels) {
+			drainAndValidate(
+				buffersPerChannel,
+				buffersPerChannel * channelIndex++,
+				channel,
+				totalBuffers,
+				buffersPerChannel * totalNumberOfRemoteChannels,
+				exclusiveBuffersUsageGauge,
+				inputBuffersUsageGauge,
+				inputGate1);
 		}
 	}
 
@@ -162,6 +171,7 @@ public class InputBuffersMetricsTest extends TestLogger {
 			.setNetworkBuffersPerChannel(buffersPerChannel)
 			.setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate)
 			.build();
+		closeableRegistry.registerCloseable(network::close);
 
 		Tuple2<SingleInputGate, List<RemoteInputChannel>> tuple1 = buildInputGate(
 			network,
@@ -173,6 +183,8 @@ public class InputBuffersMetricsTest extends TestLogger {
 			numberOfLocalChannelsGate2).f0;
 
 		SingleInputGate inputGate1 = tuple1.f0;
+		closeableRegistry.registerCloseable(inputGate1::close);
+		closeableRegistry.registerCloseable(inputGate2::close);
 
 		RemoteInputChannel remoteInputChannel1 = tuple1.f1.get(0);
 
@@ -184,54 +196,47 @@ public class InputBuffersMetricsTest extends TestLogger {
 			exclusiveBuffersUsageGauge,
 			inputGates);
 
-		try (CloseableRegistry closeableRegistry = new CloseableRegistry()) {
-			assertEquals(0.0, floatingBuffersUsageGauge.getValue(), 0.0);
-			assertEquals(0.0, inputBuffersUsageGauge.getValue(), 0.0);
+		assertEquals(0.0, floatingBuffersUsageGauge.getValue(), 0.0);
+		assertEquals(0.0, inputBuffersUsageGauge.getValue(), 0.0);
 
-			// drain gate1's exclusive buffers
-			drainBuffer(buffersPerChannel, remoteInputChannel1, closeableRegistry);
+		// drain gate1's exclusive buffers
+		drainBuffer(buffersPerChannel, remoteInputChannel1);
 
-			int totalBuffers = extraNetworkBuffersPerGate * inputGates.length + buffersPerChannel * totalNumberOfRemoteChannels;
+		int totalBuffers = extraNetworkBuffersPerGate * inputGates.length + buffersPerChannel * totalNumberOfRemoteChannels;
 
-			remoteInputChannel1.requestSubpartition(0);
+		remoteInputChannel1.requestSubpartition(0);
 
-			int backlog = 3;
-			int totalRequestedBuffers = buffersPerChannel + backlog;
+		int backlog = 3;
+		int totalRequestedBuffers = buffersPerChannel + backlog;
 
-			remoteInputChannel1.onSenderBacklog(backlog);
+		remoteInputChannel1.onSenderBacklog(backlog);
 
-			assertEquals(totalRequestedBuffers, remoteInputChannel1.unsynchronizedGetFloatingBuffersAvailable());
+		assertEquals(totalRequestedBuffers, remoteInputChannel1.unsynchronizedGetFloatingBuffersAvailable());
 
-			drainBuffer(totalRequestedBuffers, remoteInputChannel1, closeableRegistry);
+		drainBuffer(totalRequestedBuffers, remoteInputChannel1);
 
-			assertEquals(0, remoteInputChannel1.unsynchronizedGetFloatingBuffersAvailable());
-			assertEquals((double) (buffersPerChannel + totalRequestedBuffers) / totalBuffers,
-				inputBuffersUsageGauge.getValue(), 0.0001);
-		} finally {
-			inputGate1.close();
-			inputGate2.close();
-			network.close();
-		}
+		assertEquals(0, remoteInputChannel1.unsynchronizedGetFloatingBuffersAvailable());
+		assertEquals((double) (buffersPerChannel + totalRequestedBuffers) / totalBuffers,
+			inputBuffersUsageGauge.getValue(), 0.0001);
 	}
 
 	private void drainAndValidate(
 		int numBuffersToRequest,
 		int totalRequestedBuffers,
 		RemoteInputChannel channel,
-		CloseableRegistry closeableRegistry,
 		int totalBuffers,
 		int totalExclusiveBuffers,
 		ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge,
 		CreditBasedInputBuffersUsageGauge inputBuffersUsageGauge,
 		SingleInputGate inputGate) throws IOException {
 
-		drainBuffer(numBuffersToRequest, channel, closeableRegistry);
+		drainBuffer(numBuffersToRequest, channel);
 		assertEquals(totalRequestedBuffers, exclusiveBuffersUsageGauge.calculateUsedBuffers(inputGate));
 		assertEquals((double) totalRequestedBuffers / totalExclusiveBuffers, exclusiveBuffersUsageGauge.getValue(), 0.0001);
 		assertEquals((double) totalRequestedBuffers / totalBuffers, inputBuffersUsageGauge.getValue(), 0.0001);
 	}
 
-	private void drainBuffer(int boundary, RemoteInputChannel channel, CloseableRegistry closeableRegistry) throws IOException {
+	private void drainBuffer(int boundary, RemoteInputChannel channel) throws IOException {
 		for (int i = 0; i < boundary; i++) {
 			Buffer buffer = channel.requestBuffer();
 			if (buffer != null) {