You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/03 09:47:46 UTC
[flink] branch master updated: [FLINK-12284][Network,
Metrics]Fix the incorrect inputBufferUsage metric in credit-based
network mode
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 36a938a [FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
36a938a is described below
commit 36a938a45d5db46bb9ec4234fbdde6758a422143
Author: Aitozi <10...@qq.com>
AuthorDate: Thu May 16 00:07:55 2019 +0800
[FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
---
docs/monitoring/metrics.md | 10 +
docs/monitoring/metrics.zh.md | 10 +
.../io/network/NettyShuffleEnvironment.java | 3 +-
...geGauge.java => AbstractBuffersUsageGauge.java} | 29 +-
.../metrics/CreditBasedInputBuffersUsageGauge.java | 51 ++++
.../metrics/ExclusiveBuffersUsageGauge.java | 57 ++++
.../network/metrics/FloatingBuffersUsageGauge.java | 61 +++++
.../network/metrics/InputBufferPoolUsageGauge.java | 35 ++-
.../network/metrics/NettyShuffleMetricFactory.java | 24 +-
.../partition/consumer/RemoteInputChannel.java | 8 +
.../consumer/InputBuffersMetricsTest.java | 292 +++++++++++++++++++++
11 files changed, 545 insertions(+), 35 deletions(-)
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 86dc121..211ccfa 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -1045,6 +1045,16 @@ Thus, in order to infer the metric identifier:
<td>Gauge</td>
</tr>
<tr>
+ <td>inputFloatingBuffersUsage</td>
+ <td>An estimate of the floating input buffers usage, dediciated for credit-based mode.</td>
+ <td>Gauge</td>
+ </tr>
+ <tr>
+ <td>inputExclusiveBuffersUsage</td>
+ <td>An estimate of the exclusive input buffers usage, dediciated for credit-based mode.</td>
+ <td>Gauge</td>
+ </tr>
+ <tr>
<td>outPoolUsage</td>
<td>An estimate of the output buffers usage.</td>
<td>Gauge</td>
diff --git a/docs/monitoring/metrics.zh.md b/docs/monitoring/metrics.zh.md
index aaed606..44b4806 100644
--- a/docs/monitoring/metrics.zh.md
+++ b/docs/monitoring/metrics.zh.md
@@ -1044,6 +1044,16 @@ Thus, in order to infer the metric identifier:
<td>Gauge</td>
</tr>
<tr>
+ <td>inputFloatingBuffersUsage</td>
+ <td>An estimate of the floating input buffers usage, dediciated for credit-based mode.</td>
+ <td>Gauge</td>
+ </tr>
+ <tr>
+ <td>inputExclusiveBuffersUsage</td>
+ <td>An estimate of the exclusive input buffers usage, dediciated for credit-based mode.</td>
+ <td>Gauge</td>
+ </tr>
+ <tr>
<td>outPoolUsage</td>
<td>An estimate of the output buffers usage.</td>
<td>Gauge</td>
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
index 17fb2cc..bbe0833 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
@@ -226,7 +226,7 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
inputGates[counter++] = inputGate;
}
- registerInputMetrics(config.isNetworkDetailedMetrics(), networkInputGroup, inputGates);
+ registerInputMetrics(config.isNetworkDetailedMetrics(), config.isCreditBased(), networkInputGroup, inputGates);
return Arrays.asList(inputGates);
}
}
@@ -246,6 +246,7 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
InputGate[] inputGates) {
NettyShuffleMetricFactory.registerLegacyNetworkMetrics(
config.isNetworkDetailedMetrics(),
+ config.isCreditBased(),
metricGroup,
producedPartitions,
inputGates);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBufferPoolUsageGauge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/AbstractBuffersUsageGauge.java
similarity index 62%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBufferPoolUsageGauge.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/AbstractBuffersUsageGauge.java
index c7a6d4e..187d936 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBufferPoolUsageGauge.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/AbstractBuffersUsageGauge.java
@@ -18,36 +18,39 @@
package org.apache.flink.runtime.io.network.metrics;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Gauge;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
/**
- * Gauge metric measuring the input buffer pool usage gauge for {@link SingleInputGate}s.
+ * Abstract gauge implementation for calculating the buffer usage percent.
*/
-public class InputBufferPoolUsageGauge implements Gauge<Float> {
+public abstract class AbstractBuffersUsageGauge implements Gauge<Float> {
- private final SingleInputGate[] inputGates;
+ protected final SingleInputGate[] inputGates;
- public InputBufferPoolUsageGauge(SingleInputGate[] inputGates) {
+ @VisibleForTesting
+ public abstract int calculateUsedBuffers(SingleInputGate inputGate);
+
+ @VisibleForTesting
+ public abstract int calculateTotalBuffers(SingleInputGate inputGate);
+
+ AbstractBuffersUsageGauge(SingleInputGate[] inputGates) {
this.inputGates = inputGates;
}
@Override
public Float getValue() {
int usedBuffers = 0;
- int bufferPoolSize = 0;
+ int totalBuffers = 0;
for (SingleInputGate inputGate : inputGates) {
- BufferPool bufferPool = inputGate.getBufferPool();
- if (bufferPool != null) {
- usedBuffers += bufferPool.bestEffortGetNumOfUsedBuffers();
- bufferPoolSize += bufferPool.getNumBuffers();
- }
+ usedBuffers += calculateUsedBuffers(inputGate);
+ totalBuffers += calculateTotalBuffers(inputGate);
}
- if (bufferPoolSize != 0) {
- return ((float) usedBuffers) / bufferPoolSize;
+ if (totalBuffers != 0) {
+ return ((float) usedBuffers) / totalBuffers;
} else {
return 0.0f;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/CreditBasedInputBuffersUsageGauge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/CreditBasedInputBuffersUsageGauge.java
new file mode 100644
index 0000000..5102809
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/CreditBasedInputBuffersUsageGauge.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Gauge metric measuring the input buffers usage for {@link SingleInputGate}s under credit based mode.
+ */
+public class CreditBasedInputBuffersUsageGauge extends AbstractBuffersUsageGauge {
+
+ private final FloatingBuffersUsageGauge floatingBuffersUsageGauge;
+ private final ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge;
+
+ public CreditBasedInputBuffersUsageGauge(
+ FloatingBuffersUsageGauge floatingBuffersUsageGauge,
+ ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge,
+ SingleInputGate[] inputGates) {
+ super(checkNotNull(inputGates));
+ this.floatingBuffersUsageGauge = checkNotNull(floatingBuffersUsageGauge);
+ this.exclusiveBuffersUsageGauge = checkNotNull(exclusiveBuffersUsageGauge);
+ }
+
+ @Override
+ public int calculateUsedBuffers(SingleInputGate inputGate) {
+ return floatingBuffersUsageGauge.calculateUsedBuffers(inputGate) + exclusiveBuffersUsageGauge.calculateUsedBuffers(inputGate);
+ }
+
+ @Override
+ public int calculateTotalBuffers(SingleInputGate inputGate) {
+ return floatingBuffersUsageGauge.calculateTotalBuffers(inputGate) + exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate);
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/ExclusiveBuffersUsageGauge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/ExclusiveBuffersUsageGauge.java
new file mode 100644
index 0000000..6fcb1f5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/ExclusiveBuffersUsageGauge.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Gauge metric measuring the exclusive buffers usage gauge for {@link SingleInputGate}s.
+ */
+public class ExclusiveBuffersUsageGauge extends AbstractBuffersUsageGauge {
+
+ public ExclusiveBuffersUsageGauge(SingleInputGate[] inputGates) {
+ super(checkNotNull(inputGates));
+ }
+
+ @Override
+ public int calculateUsedBuffers(SingleInputGate inputGate) {
+ int usedBuffers = 0;
+ for (InputChannel ic : inputGate.getInputChannels().values()) {
+ if (ic instanceof RemoteInputChannel) {
+ usedBuffers += ((RemoteInputChannel) ic).unsynchronizedGetExclusiveBuffersUsed();
+ }
+ }
+ return usedBuffers;
+ }
+
+ @Override
+ public int calculateTotalBuffers(SingleInputGate inputGate) {
+ int totalExclusiveBuffers = 0;
+ for (InputChannel ic : inputGate.getInputChannels().values()) {
+ if (ic instanceof RemoteInputChannel) {
+ totalExclusiveBuffers += ((RemoteInputChannel) ic).getInitialCredit();
+ }
+ }
+ return totalExclusiveBuffers;
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/FloatingBuffersUsageGauge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/FloatingBuffersUsageGauge.java
new file mode 100644
index 0000000..d1c8dea
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/FloatingBuffersUsageGauge.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Gauge metric measuring the floating buffers usage gauge for {@link SingleInputGate}s.
+ */
+public class FloatingBuffersUsageGauge extends AbstractBuffersUsageGauge {
+
+ public FloatingBuffersUsageGauge(SingleInputGate[] inputGates) {
+ super(checkNotNull(inputGates));
+ }
+
+ @Override
+ public int calculateUsedBuffers(SingleInputGate inputGate) {
+ int availableFloatingBuffers = 0;
+ BufferPool bufferPool = inputGate.getBufferPool();
+ if (bufferPool != null) {
+ int requestedFloatingBuffers = bufferPool.bestEffortGetNumOfUsedBuffers();
+ for (InputChannel ic : inputGate.getInputChannels().values()) {
+ if (ic instanceof RemoteInputChannel) {
+ availableFloatingBuffers += ((RemoteInputChannel) ic).unsynchronizedGetFloatingBuffersAvailable();
+ }
+ }
+ return Math.max(0, requestedFloatingBuffers - availableFloatingBuffers);
+ }
+ return 0;
+ }
+
+ @Override
+ public int calculateTotalBuffers(SingleInputGate inputGate) {
+ BufferPool bufferPool = inputGate.getBufferPool();
+ if (bufferPool != null) {
+ return inputGate.getBufferPool().getNumBuffers();
+ }
+ return 0;
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBufferPoolUsageGauge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBufferPoolUsageGauge.java
index c7a6d4e..b5df3a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBufferPoolUsageGauge.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBufferPoolUsageGauge.java
@@ -18,38 +18,35 @@
package org.apache.flink.runtime.io.network.metrics;
-import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* Gauge metric measuring the input buffer pool usage gauge for {@link SingleInputGate}s.
*/
-public class InputBufferPoolUsageGauge implements Gauge<Float> {
-
- private final SingleInputGate[] inputGates;
+public class InputBufferPoolUsageGauge extends AbstractBuffersUsageGauge {
public InputBufferPoolUsageGauge(SingleInputGate[] inputGates) {
- this.inputGates = inputGates;
+ super(checkNotNull(inputGates));
}
@Override
- public Float getValue() {
- int usedBuffers = 0;
- int bufferPoolSize = 0;
-
- for (SingleInputGate inputGate : inputGates) {
- BufferPool bufferPool = inputGate.getBufferPool();
- if (bufferPool != null) {
- usedBuffers += bufferPool.bestEffortGetNumOfUsedBuffers();
- bufferPoolSize += bufferPool.getNumBuffers();
- }
+ public int calculateUsedBuffers(SingleInputGate inputGate) {
+ BufferPool bufferPool = inputGate.getBufferPool();
+ if (bufferPool != null) {
+ return bufferPool.bestEffortGetNumOfUsedBuffers();
}
+ return 0;
+ }
- if (bufferPoolSize != 0) {
- return ((float) usedBuffers) / bufferPoolSize;
- } else {
- return 0.0f;
+ @Override
+ public int calculateTotalBuffers(SingleInputGate inputGate) {
+ BufferPool bufferPool = inputGate.getBufferPool();
+ if (bufferPool != null) {
+ return bufferPool.getNumBuffers();
}
+ return 0;
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/NettyShuffleMetricFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/NettyShuffleMetricFactory.java
index 6bcb027..4c8f1c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/NettyShuffleMetricFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/NettyShuffleMetricFactory.java
@@ -66,6 +66,8 @@ public class NettyShuffleMetricFactory {
private static final String METRIC_INPUT_QUEUE_LENGTH = "inputQueueLength";
private static final String METRIC_INPUT_POOL_USAGE = "inPoolUsage";
+ private static final String METRIC_INPUT_FLOATING_BUFFERS_USAGE = "inputFloatingBuffersUsage";
+ private static final String METRIC_INPUT_EXCLUSIVE_BUFFERS_USAGE = "inputExclusiveBuffersUsage";
private NettyShuffleMetricFactory() {
}
@@ -107,6 +109,7 @@ public class NettyShuffleMetricFactory {
@Deprecated
public static void registerLegacyNetworkMetrics(
boolean isDetailedMetrics,
+ boolean isCreditBased,
MetricGroup metricGroup,
ResultPartitionWriter[] producedPartitions,
InputGate[] inputGates) {
@@ -126,7 +129,7 @@ public class NettyShuffleMetricFactory {
registerOutputMetrics(isDetailedMetrics, outputGroup, buffersGroup, resultPartitions);
SingleInputGate[] singleInputGates = Arrays.copyOf(inputGates, inputGates.length, SingleInputGate[].class);
- registerInputMetrics(isDetailedMetrics, inputGroup, buffersGroup, singleInputGates);
+ registerInputMetrics(isDetailedMetrics, isCreditBased, inputGroup, buffersGroup, singleInputGates);
}
public static void registerOutputMetrics(
@@ -154,10 +157,12 @@ public class NettyShuffleMetricFactory {
public static void registerInputMetrics(
boolean isDetailedMetrics,
+ boolean isCreditBased,
MetricGroup inputGroup,
SingleInputGate[] inputGates) {
registerInputMetrics(
isDetailedMetrics,
+ isCreditBased,
inputGroup,
inputGroup.addGroup(METRIC_GROUP_BUFFERS),
inputGates);
@@ -165,13 +170,28 @@ public class NettyShuffleMetricFactory {
private static void registerInputMetrics(
boolean isDetailedMetrics,
+ boolean isCreditBased,
MetricGroup inputGroup,
MetricGroup buffersGroup,
SingleInputGate[] inputGates) {
if (isDetailedMetrics) {
InputGateMetrics.registerQueueLengthMetrics(inputGroup, inputGates);
}
+
buffersGroup.gauge(METRIC_INPUT_QUEUE_LENGTH, new InputBuffersGauge(inputGates));
- buffersGroup.gauge(METRIC_INPUT_POOL_USAGE, new InputBufferPoolUsageGauge(inputGates));
+
+ if (isCreditBased) {
+ FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates);
+ ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new ExclusiveBuffersUsageGauge(inputGates);
+ CreditBasedInputBuffersUsageGauge creditBasedInputBuffersUsageGauge = new CreditBasedInputBuffersUsageGauge(
+ floatingBuffersUsageGauge,
+ exclusiveBuffersUsageGauge,
+ inputGates);
+ buffersGroup.gauge(METRIC_INPUT_EXCLUSIVE_BUFFERS_USAGE, exclusiveBuffersUsageGauge);
+ buffersGroup.gauge(METRIC_INPUT_FLOATING_BUFFERS_USAGE, floatingBuffersUsageGauge);
+ buffersGroup.gauge(METRIC_INPUT_POOL_USAGE, creditBasedInputBuffersUsageGauge);
+ } else {
+ buffersGroup.gauge(METRIC_INPUT_POOL_USAGE, new InputBufferPoolUsageGauge(inputGates));
+ }
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 778aa1d..58b36c3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -434,6 +434,14 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
return Math.max(0, receivedBuffers.size());
}
+ public int unsynchronizedGetExclusiveBuffersUsed() {
+ return Math.max(0, initialCredit - bufferQueue.exclusiveBuffers.size());
+ }
+
+ public int unsynchronizedGetFloatingBuffersAvailable() {
+ return Math.max(0, bufferQueue.floatingBuffers.size());
+ }
+
public InputChannelID getInputChannelId() {
return id;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
new file mode 100644
index 0000000..a602648
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+ @Test
+ public void testCalculateTotalBuffersSize() throws IOException {
+ int numberOfRemoteChannels = 2;
+ int numberOfLocalChannels = 0;
+
+ int numberOfBufferPerChannel = 2;
+ int numberOfBuffersPerGate = 8;
+
+ NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder()
+ .setNetworkBuffersPerChannel(numberOfBufferPerChannel)
+ .setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate)
+ .build();
+
+ SingleInputGate inputGate1 = buildInputGate(
+ network,
+ numberOfRemoteChannels,
+ numberOfLocalChannels).f0;
+
+ SingleInputGate[] inputGates = new SingleInputGate[]{inputGate1};
+ FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates);
+ ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new ExclusiveBuffersUsageGauge(inputGates);
+ CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = new CreditBasedInputBuffersUsageGauge(
+ floatingBuffersUsageGauge,
+ exclusiveBuffersUsageGauge,
+ inputGates);
+
+ try (CloseableRegistry closeableRegistry = new CloseableRegistry()) {
+
+ closeableRegistry.registerCloseable(network::close);
+ closeableRegistry.registerCloseable(inputGate1::close);
+
+ assertEquals(numberOfBuffersPerGate, floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+ assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel, exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+ assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel + numberOfBuffersPerGate, inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
+ }
+ }
+
+ @Test
+ public void testExclusiveBuffersUsage() throws IOException {
+ int numberOfRemoteChannelsGate1 = 2;
+ int numberOfLocalChannelsGate1 = 0;
+ int numberOfRemoteChannelsGate2 = 1;
+ int numberOfLocalChannelsGate2 = 1;
+
+ int totalNumberOfRemoteChannels = numberOfRemoteChannelsGate1 + numberOfRemoteChannelsGate2;
+
+ int buffersPerChannel = 2;
+ int extraNetworkBuffersPerGate = 8;
+
+ NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder()
+ .setNetworkBuffersPerChannel(buffersPerChannel)
+ .setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate)
+ .build();
+
+ Tuple2<SingleInputGate, List<RemoteInputChannel>> tuple1 = buildInputGate(
+ network,
+ numberOfRemoteChannelsGate1,
+ numberOfLocalChannelsGate1);
+ Tuple2<SingleInputGate, List<RemoteInputChannel>> tuple2 = buildInputGate(
+ network,
+ numberOfRemoteChannelsGate2,
+ numberOfLocalChannelsGate2);
+
+ SingleInputGate inputGate1 = tuple1.f0;
+ SingleInputGate inputGate2 = tuple2.f0;
+
+ List<RemoteInputChannel> remoteInputChannels = tuple1.f1;
+
+ SingleInputGate[] inputGates = new SingleInputGate[]{tuple1.f0, tuple2.f0};
+ FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates);
+ ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new ExclusiveBuffersUsageGauge(inputGates);
+ CreditBasedInputBuffersUsageGauge inputBuffersUsageGauge = new CreditBasedInputBuffersUsageGauge(
+ floatingBuffersUsageGauge,
+ exclusiveBuffersUsageGauge,
+ inputGates);
+
+ try (CloseableRegistry closeableRegistry = new CloseableRegistry()) {
+ assertEquals(0.0, exclusiveBuffersUsageGauge.getValue(), 0.0);
+ assertEquals(0.0, inputBuffersUsageGauge.getValue(), 0.0);
+
+ int totalBuffers = extraNetworkBuffersPerGate * inputGates.length + buffersPerChannel * totalNumberOfRemoteChannels;
+
+ int channelIndex = 1;
+ for (RemoteInputChannel channel : remoteInputChannels) {
+ drainAndValidate(
+ buffersPerChannel,
+ buffersPerChannel * channelIndex++,
+ channel,
+ closeableRegistry,
+ totalBuffers,
+ buffersPerChannel * totalNumberOfRemoteChannels,
+ exclusiveBuffersUsageGauge,
+ inputBuffersUsageGauge,
+ inputGate1);
+ }
+ } finally {
+ inputGate1.close();
+ inputGate2.close();
+ network.close();
+ }
+ }
+
+ @Test
+ public void testFloatingBuffersUsage() throws IOException, InterruptedException {
+
+ int numberOfRemoteChannelsGate1 = 2;
+ int numberOfLocalChannelsGate1 = 0;
+ int numberOfRemoteChannelsGate2 = 1;
+ int numberOfLocalChannelsGate2 = 1;
+
+ int totalNumberOfRemoteChannels = numberOfRemoteChannelsGate1 + numberOfRemoteChannelsGate2;
+
+ int buffersPerChannel = 2;
+ int extraNetworkBuffersPerGate = 8;
+
+ NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder()
+ .setNetworkBuffersPerChannel(buffersPerChannel)
+ .setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate)
+ .build();
+
+ Tuple2<SingleInputGate, List<RemoteInputChannel>> tuple1 = buildInputGate(
+ network,
+ numberOfRemoteChannelsGate1,
+ numberOfLocalChannelsGate1);
+ SingleInputGate inputGate2 = buildInputGate(
+ network,
+ numberOfRemoteChannelsGate2,
+ numberOfLocalChannelsGate2).f0;
+
+ SingleInputGate inputGate1 = tuple1.f0;
+
+ RemoteInputChannel remoteInputChannel1 = tuple1.f1.get(0);
+
+ SingleInputGate[] inputGates = new SingleInputGate[]{tuple1.f0, inputGate2};
+ FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates);
+ ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new ExclusiveBuffersUsageGauge(inputGates);
+ CreditBasedInputBuffersUsageGauge inputBuffersUsageGauge = new CreditBasedInputBuffersUsageGauge(
+ floatingBuffersUsageGauge,
+ exclusiveBuffersUsageGauge,
+ inputGates);
+
+ try (CloseableRegistry closeableRegistry = new CloseableRegistry()) {
+ assertEquals(0.0, floatingBuffersUsageGauge.getValue(), 0.0);
+ assertEquals(0.0, inputBuffersUsageGauge.getValue(), 0.0);
+
+ // drain gate1's exclusive buffers
+ drainBuffer(buffersPerChannel, remoteInputChannel1, closeableRegistry);
+
+ int totalBuffers = extraNetworkBuffersPerGate * inputGates.length + buffersPerChannel * totalNumberOfRemoteChannels;
+
+ remoteInputChannel1.requestSubpartition(0);
+
+ int backlog = 3;
+ int totalRequestedBuffers = buffersPerChannel + backlog;
+
+ remoteInputChannel1.onSenderBacklog(backlog);
+
+ assertEquals(totalRequestedBuffers, remoteInputChannel1.unsynchronizedGetFloatingBuffersAvailable());
+
+ drainBuffer(totalRequestedBuffers, remoteInputChannel1, closeableRegistry);
+
+ assertEquals(0, remoteInputChannel1.unsynchronizedGetFloatingBuffersAvailable());
+ assertEquals((double) (buffersPerChannel + totalRequestedBuffers) / totalBuffers,
+ inputBuffersUsageGauge.getValue(), 0.0001);
+ } finally {
+ inputGate1.close();
+ inputGate2.close();
+ network.close();
+ }
+ }
+
+ private void drainAndValidate(
+ int numBuffersToRequest,
+ int totalRequestedBuffers,
+ RemoteInputChannel channel,
+ CloseableRegistry closeableRegistry,
+ int totalBuffers,
+ int totalExclusiveBuffers,
+ ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge,
+ CreditBasedInputBuffersUsageGauge inputBuffersUsageGauge,
+ SingleInputGate inputGate) throws IOException {
+
+ drainBuffer(numBuffersToRequest, channel, closeableRegistry);
+ assertEquals(totalRequestedBuffers, exclusiveBuffersUsageGauge.calculateUsedBuffers(inputGate));
+ assertEquals((double) totalRequestedBuffers / totalExclusiveBuffers, exclusiveBuffersUsageGauge.getValue(), 0.0001);
+ assertEquals((double) totalRequestedBuffers / totalBuffers, inputBuffersUsageGauge.getValue(), 0.0001);
+ }
+
+ private void drainBuffer(int boundary, RemoteInputChannel channel, CloseableRegistry closeableRegistry) throws IOException {
+ for (int i = 0; i < boundary; i++) {
+ Buffer buffer = channel.requestBuffer();
+ if (buffer != null) {
+ closeableRegistry.registerCloseable(buffer::recycleBuffer);
+ } else {
+ break;
+ }
+ }
+ }
+
+ private Tuple2<SingleInputGate, List<RemoteInputChannel>> buildInputGate(
+ NettyShuffleEnvironment network,
+ int numberOfRemoteChannels,
+ int numberOfLocalChannels) throws IOException {
+
+ SingleInputGate inputGate = new SingleInputGateBuilder()
+ .setNumberOfChannels(numberOfRemoteChannels + numberOfLocalChannels)
+ .setResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED)
+ .setupBufferPoolFactory(network)
+ .build();
+
+ Tuple2<SingleInputGate, List<RemoteInputChannel>> res = Tuple2.of(inputGate, new ArrayList<>());
+
+ int channelIdx = 0;
+ for (int i = 0; i < numberOfRemoteChannels; i++) {
+ res.f1.add(buildRemoteChannel(channelIdx, inputGate, network));
+ channelIdx++;
+ }
+
+ for (int i = 0; i < numberOfLocalChannels; i++) {
+ buildLocalChannel(channelIdx, inputGate, network);
+ }
+ inputGate.setup();
+ return res;
+ }
+
+ private RemoteInputChannel buildRemoteChannel(
+ int channelIndex,
+ SingleInputGate inputGate,
+ NettyShuffleEnvironment network) {
+ return new InputChannelBuilder()
+ .setChannelIndex(channelIndex)
+ .setupFromNettyShuffleEnvironment(network)
+ .setConnectionManager(new TestingConnectionManager())
+ .buildRemoteAndSetToGate(inputGate);
+ }
+
+ private void buildLocalChannel(
+ int channelIndex,
+ SingleInputGate inputGate,
+ NettyShuffleEnvironment network) {
+ new InputChannelBuilder()
+ .setChannelIndex(channelIndex)
+ .setupFromNettyShuffleEnvironment(network)
+ .setConnectionManager(new TestingConnectionManager())
+ .buildLocalAndSetToGate(inputGate);
+ }
+}