You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/09 12:46:13 UTC
[flink] 02/05: [hotfix][test] Move CloseableRegistry as field in
InputBuffersMetricsTest
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 479b6890bf1e1e8f4634903975b4983c96913dc2
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Fri Jul 5 16:33:10 2019 +0200
[hotfix][test] Move CloseableRegistry as field in InputBuffersMetricsTest
---
.../consumer/InputBuffersMetricsTest.java | 113 +++++++++++----------
1 file changed, 59 insertions(+), 54 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
index a602648..8d868cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
@@ -30,6 +30,8 @@ import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.util.TestLogger;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
@@ -43,6 +45,18 @@ import static org.junit.Assert.assertEquals;
*/
public class InputBuffersMetricsTest extends TestLogger {
+ private CloseableRegistry closeableRegistry;
+
+ @Before
+ public void setup() {
+ closeableRegistry = new CloseableRegistry();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ closeableRegistry.close();
+ }
+
@Test
public void testCalculateTotalBuffersSize() throws IOException {
int numberOfRemoteChannels = 2;
@@ -55,11 +69,13 @@ public class InputBuffersMetricsTest extends TestLogger {
.setNetworkBuffersPerChannel(numberOfBufferPerChannel)
.setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate)
.build();
+ closeableRegistry.registerCloseable(network::close);
SingleInputGate inputGate1 = buildInputGate(
network,
numberOfRemoteChannels,
numberOfLocalChannels).f0;
+ closeableRegistry.registerCloseable(inputGate1::close);
SingleInputGate[] inputGates = new SingleInputGate[]{inputGate1};
FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates);
@@ -69,15 +85,12 @@ public class InputBuffersMetricsTest extends TestLogger {
exclusiveBuffersUsageGauge,
inputGates);
- try (CloseableRegistry closeableRegistry = new CloseableRegistry()) {
+ closeableRegistry.registerCloseable(network::close);
+ closeableRegistry.registerCloseable(inputGate1::close);
- closeableRegistry.registerCloseable(network::close);
- closeableRegistry.registerCloseable(inputGate1::close);
-
- assertEquals(numberOfBuffersPerGate, floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
- assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel, exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
- assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel + numberOfBuffersPerGate, inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
- }
+ assertEquals(numberOfBuffersPerGate, floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+ assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel, exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+ assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel + numberOfBuffersPerGate, inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
}
@Test
@@ -96,6 +109,7 @@ public class InputBuffersMetricsTest extends TestLogger {
.setNetworkBuffersPerChannel(buffersPerChannel)
.setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate)
.build();
+ closeableRegistry.registerCloseable(network::close);
Tuple2<SingleInputGate, List<RemoteInputChannel>> tuple1 = buildInputGate(
network,
@@ -108,6 +122,8 @@ public class InputBuffersMetricsTest extends TestLogger {
SingleInputGate inputGate1 = tuple1.f0;
SingleInputGate inputGate2 = tuple2.f0;
+ closeableRegistry.registerCloseable(inputGate1::close);
+ closeableRegistry.registerCloseable(inputGate2::close);
List<RemoteInputChannel> remoteInputChannels = tuple1.f1;
@@ -119,29 +135,22 @@ public class InputBuffersMetricsTest extends TestLogger {
exclusiveBuffersUsageGauge,
inputGates);
- try (CloseableRegistry closeableRegistry = new CloseableRegistry()) {
- assertEquals(0.0, exclusiveBuffersUsageGauge.getValue(), 0.0);
- assertEquals(0.0, inputBuffersUsageGauge.getValue(), 0.0);
-
- int totalBuffers = extraNetworkBuffersPerGate * inputGates.length + buffersPerChannel * totalNumberOfRemoteChannels;
-
- int channelIndex = 1;
- for (RemoteInputChannel channel : remoteInputChannels) {
- drainAndValidate(
- buffersPerChannel,
- buffersPerChannel * channelIndex++,
- channel,
- closeableRegistry,
- totalBuffers,
- buffersPerChannel * totalNumberOfRemoteChannels,
- exclusiveBuffersUsageGauge,
- inputBuffersUsageGauge,
- inputGate1);
- }
- } finally {
- inputGate1.close();
- inputGate2.close();
- network.close();
+ assertEquals(0.0, exclusiveBuffersUsageGauge.getValue(), 0.0);
+ assertEquals(0.0, inputBuffersUsageGauge.getValue(), 0.0);
+
+ int totalBuffers = extraNetworkBuffersPerGate * inputGates.length + buffersPerChannel * totalNumberOfRemoteChannels;
+
+ int channelIndex = 1;
+ for (RemoteInputChannel channel : remoteInputChannels) {
+ drainAndValidate(
+ buffersPerChannel,
+ buffersPerChannel * channelIndex++,
+ channel,
+ totalBuffers,
+ buffersPerChannel * totalNumberOfRemoteChannels,
+ exclusiveBuffersUsageGauge,
+ inputBuffersUsageGauge,
+ inputGate1);
}
}
@@ -162,6 +171,7 @@ public class InputBuffersMetricsTest extends TestLogger {
.setNetworkBuffersPerChannel(buffersPerChannel)
.setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate)
.build();
+ closeableRegistry.registerCloseable(network::close);
Tuple2<SingleInputGate, List<RemoteInputChannel>> tuple1 = buildInputGate(
network,
@@ -173,6 +183,8 @@ public class InputBuffersMetricsTest extends TestLogger {
numberOfLocalChannelsGate2).f0;
SingleInputGate inputGate1 = tuple1.f0;
+ closeableRegistry.registerCloseable(inputGate1::close);
+ closeableRegistry.registerCloseable(inputGate2::close);
RemoteInputChannel remoteInputChannel1 = tuple1.f1.get(0);
@@ -184,54 +196,47 @@ public class InputBuffersMetricsTest extends TestLogger {
exclusiveBuffersUsageGauge,
inputGates);
- try (CloseableRegistry closeableRegistry = new CloseableRegistry()) {
- assertEquals(0.0, floatingBuffersUsageGauge.getValue(), 0.0);
- assertEquals(0.0, inputBuffersUsageGauge.getValue(), 0.0);
+ assertEquals(0.0, floatingBuffersUsageGauge.getValue(), 0.0);
+ assertEquals(0.0, inputBuffersUsageGauge.getValue(), 0.0);
- // drain gate1's exclusive buffers
- drainBuffer(buffersPerChannel, remoteInputChannel1, closeableRegistry);
+ // drain gate1's exclusive buffers
+ drainBuffer(buffersPerChannel, remoteInputChannel1);
- int totalBuffers = extraNetworkBuffersPerGate * inputGates.length + buffersPerChannel * totalNumberOfRemoteChannels;
+ int totalBuffers = extraNetworkBuffersPerGate * inputGates.length + buffersPerChannel * totalNumberOfRemoteChannels;
- remoteInputChannel1.requestSubpartition(0);
+ remoteInputChannel1.requestSubpartition(0);
- int backlog = 3;
- int totalRequestedBuffers = buffersPerChannel + backlog;
+ int backlog = 3;
+ int totalRequestedBuffers = buffersPerChannel + backlog;
- remoteInputChannel1.onSenderBacklog(backlog);
+ remoteInputChannel1.onSenderBacklog(backlog);
- assertEquals(totalRequestedBuffers, remoteInputChannel1.unsynchronizedGetFloatingBuffersAvailable());
+ assertEquals(totalRequestedBuffers, remoteInputChannel1.unsynchronizedGetFloatingBuffersAvailable());
- drainBuffer(totalRequestedBuffers, remoteInputChannel1, closeableRegistry);
+ drainBuffer(totalRequestedBuffers, remoteInputChannel1);
- assertEquals(0, remoteInputChannel1.unsynchronizedGetFloatingBuffersAvailable());
- assertEquals((double) (buffersPerChannel + totalRequestedBuffers) / totalBuffers,
- inputBuffersUsageGauge.getValue(), 0.0001);
- } finally {
- inputGate1.close();
- inputGate2.close();
- network.close();
- }
+ assertEquals(0, remoteInputChannel1.unsynchronizedGetFloatingBuffersAvailable());
+ assertEquals((double) (buffersPerChannel + totalRequestedBuffers) / totalBuffers,
+ inputBuffersUsageGauge.getValue(), 0.0001);
}
private void drainAndValidate(
int numBuffersToRequest,
int totalRequestedBuffers,
RemoteInputChannel channel,
- CloseableRegistry closeableRegistry,
int totalBuffers,
int totalExclusiveBuffers,
ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge,
CreditBasedInputBuffersUsageGauge inputBuffersUsageGauge,
SingleInputGate inputGate) throws IOException {
- drainBuffer(numBuffersToRequest, channel, closeableRegistry);
+ drainBuffer(numBuffersToRequest, channel);
assertEquals(totalRequestedBuffers, exclusiveBuffersUsageGauge.calculateUsedBuffers(inputGate));
assertEquals((double) totalRequestedBuffers / totalExclusiveBuffers, exclusiveBuffersUsageGauge.getValue(), 0.0001);
assertEquals((double) totalRequestedBuffers / totalBuffers, inputBuffersUsageGauge.getValue(), 0.0001);
}
- private void drainBuffer(int boundary, RemoteInputChannel channel, CloseableRegistry closeableRegistry) throws IOException {
+ private void drainBuffer(int boundary, RemoteInputChannel channel) throws IOException {
for (int i = 0; i < boundary; i++) {
Buffer buffer = channel.requestBuffer();
if (buffer != null) {