You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/03 09:47:46 UTC

[flink] branch master updated: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 36a938a  [FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
36a938a is described below

commit 36a938a45d5db46bb9ec4234fbdde6758a422143
Author: Aitozi <10...@qq.com>
AuthorDate: Thu May 16 00:07:55 2019 +0800

    [FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
---
 docs/monitoring/metrics.md                         |  10 +
 docs/monitoring/metrics.zh.md                      |  10 +
 .../io/network/NettyShuffleEnvironment.java        |   3 +-
 ...geGauge.java => AbstractBuffersUsageGauge.java} |  29 +-
 .../metrics/CreditBasedInputBuffersUsageGauge.java |  51 ++++
 .../metrics/ExclusiveBuffersUsageGauge.java        |  57 ++++
 .../network/metrics/FloatingBuffersUsageGauge.java |  61 +++++
 .../network/metrics/InputBufferPoolUsageGauge.java |  35 ++-
 .../network/metrics/NettyShuffleMetricFactory.java |  24 +-
 .../partition/consumer/RemoteInputChannel.java     |   8 +
 .../consumer/InputBuffersMetricsTest.java          | 292 +++++++++++++++++++++
 11 files changed, 545 insertions(+), 35 deletions(-)

diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 86dc121..211ccfa 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -1045,6 +1045,16 @@ Thus, in order to infer the metric identifier:
       <td>Gauge</td>
     </tr>
     <tr>
+      <td>inputFloatingBuffersUsage</td>
+      <td>An estimate of the floating input buffers usage, dediciated for credit-based mode.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>inputExclusiveBuffersUsage</td>
+      <td>An estimate of the exclusive input buffers usage, dediciated for credit-based mode.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
       <td>outPoolUsage</td>
       <td>An estimate of the output buffers usage.</td>
       <td>Gauge</td>
diff --git a/docs/monitoring/metrics.zh.md b/docs/monitoring/metrics.zh.md
index aaed606..44b4806 100644
--- a/docs/monitoring/metrics.zh.md
+++ b/docs/monitoring/metrics.zh.md
@@ -1044,6 +1044,16 @@ Thus, in order to infer the metric identifier:
       <td>Gauge</td>
     </tr>
     <tr>
+      <td>inputFloatingBuffersUsage</td>
+      <td>An estimate of the floating input buffers usage, dediciated for credit-based mode.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>inputExclusiveBuffersUsage</td>
+      <td>An estimate of the exclusive input buffers usage, dediciated for credit-based mode.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
       <td>outPoolUsage</td>
       <td>An estimate of the output buffers usage.</td>
       <td>Gauge</td>
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
index 17fb2cc..bbe0833 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
@@ -226,7 +226,7 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
 				inputGates[counter++] = inputGate;
 			}
 
-			registerInputMetrics(config.isNetworkDetailedMetrics(), networkInputGroup, inputGates);
+			registerInputMetrics(config.isNetworkDetailedMetrics(), config.isCreditBased(), networkInputGroup, inputGates);
 			return Arrays.asList(inputGates);
 		}
 	}
@@ -246,6 +246,7 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
 			InputGate[] inputGates) {
 		NettyShuffleMetricFactory.registerLegacyNetworkMetrics(
 			config.isNetworkDetailedMetrics(),
+			config.isCreditBased(),
 			metricGroup,
 			producedPartitions,
 			inputGates);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBufferPoolUsageGauge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/AbstractBuffersUsageGauge.java
similarity index 62%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBufferPoolUsageGauge.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/AbstractBuffersUsageGauge.java
index c7a6d4e..187d936 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBufferPoolUsageGauge.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/AbstractBuffersUsageGauge.java
@@ -18,36 +18,39 @@
 
 package org.apache.flink.runtime.io.network.metrics;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.metrics.Gauge;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 
 /**
- * Gauge metric measuring the input buffer pool usage gauge for {@link SingleInputGate}s.
+ * Abstract gauge implementation for calculating the buffer usage percent.
  */
-public class InputBufferPoolUsageGauge implements Gauge<Float> {
+public abstract class AbstractBuffersUsageGauge implements Gauge<Float> {
 
-	private final SingleInputGate[] inputGates;
+	protected final SingleInputGate[] inputGates;
 
-	public InputBufferPoolUsageGauge(SingleInputGate[] inputGates) {
+	@VisibleForTesting
+	public abstract int calculateUsedBuffers(SingleInputGate inputGate);
+
+	@VisibleForTesting
+	public abstract int calculateTotalBuffers(SingleInputGate inputGate);
+
+	AbstractBuffersUsageGauge(SingleInputGate[] inputGates) {
 		this.inputGates = inputGates;
 	}
 
 	@Override
 	public Float getValue() {
 		int usedBuffers = 0;
-		int bufferPoolSize = 0;
+		int totalBuffers = 0;
 
 		for (SingleInputGate inputGate : inputGates) {
-			BufferPool bufferPool = inputGate.getBufferPool();
-			if (bufferPool != null) {
-				usedBuffers += bufferPool.bestEffortGetNumOfUsedBuffers();
-				bufferPoolSize += bufferPool.getNumBuffers();
-			}
+			usedBuffers += calculateUsedBuffers(inputGate);
+			totalBuffers += calculateTotalBuffers(inputGate);
 		}
 
-		if (bufferPoolSize != 0) {
-			return ((float) usedBuffers) / bufferPoolSize;
+		if (totalBuffers != 0) {
+			return ((float) usedBuffers) / totalBuffers;
 		} else {
 			return 0.0f;
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/CreditBasedInputBuffersUsageGauge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/CreditBasedInputBuffersUsageGauge.java
new file mode 100644
index 0000000..5102809
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/CreditBasedInputBuffersUsageGauge.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Gauge metric measuring the input buffers usage for {@link SingleInputGate}s under credit based mode.
+ */
+public class CreditBasedInputBuffersUsageGauge extends AbstractBuffersUsageGauge {
+
+	private final FloatingBuffersUsageGauge floatingBuffersUsageGauge;
+	private final ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge;
+
+	public CreditBasedInputBuffersUsageGauge(
+		FloatingBuffersUsageGauge floatingBuffersUsageGauge,
+		ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge,
+		SingleInputGate[] inputGates) {
+		super(checkNotNull(inputGates));
+		this.floatingBuffersUsageGauge = checkNotNull(floatingBuffersUsageGauge);
+		this.exclusiveBuffersUsageGauge = checkNotNull(exclusiveBuffersUsageGauge);
+	}
+
+	@Override
+	public int calculateUsedBuffers(SingleInputGate inputGate) {
+		return floatingBuffersUsageGauge.calculateUsedBuffers(inputGate) + exclusiveBuffersUsageGauge.calculateUsedBuffers(inputGate);
+	}
+
+	@Override
+	public int calculateTotalBuffers(SingleInputGate inputGate) {
+		return floatingBuffersUsageGauge.calculateTotalBuffers(inputGate) + exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/ExclusiveBuffersUsageGauge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/ExclusiveBuffersUsageGauge.java
new file mode 100644
index 0000000..6fcb1f5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/ExclusiveBuffersUsageGauge.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Gauge metric measuring the exclusive buffers usage gauge for {@link SingleInputGate}s.
+ */
+public class ExclusiveBuffersUsageGauge extends AbstractBuffersUsageGauge {
+
+	public ExclusiveBuffersUsageGauge(SingleInputGate[] inputGates) {
+		super(checkNotNull(inputGates));
+	}
+
+	@Override
+	public int calculateUsedBuffers(SingleInputGate inputGate) {
+		int usedBuffers = 0;
+		for (InputChannel ic : inputGate.getInputChannels().values()) {
+			if (ic instanceof RemoteInputChannel) {
+				usedBuffers += ((RemoteInputChannel) ic).unsynchronizedGetExclusiveBuffersUsed();
+			}
+		}
+		return usedBuffers;
+	}
+
+	@Override
+	public int calculateTotalBuffers(SingleInputGate inputGate) {
+		int totalExclusiveBuffers = 0;
+		for (InputChannel ic : inputGate.getInputChannels().values()) {
+			if (ic instanceof RemoteInputChannel) {
+				totalExclusiveBuffers += ((RemoteInputChannel) ic).getInitialCredit();
+			}
+		}
+		return totalExclusiveBuffers;
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/FloatingBuffersUsageGauge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/FloatingBuffersUsageGauge.java
new file mode 100644
index 0000000..d1c8dea
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/FloatingBuffersUsageGauge.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Gauge metric measuring the floating buffers usage gauge for {@link SingleInputGate}s.
+ */
+public class FloatingBuffersUsageGauge extends AbstractBuffersUsageGauge {
+
+	public FloatingBuffersUsageGauge(SingleInputGate[] inputGates) {
+		super(checkNotNull(inputGates));
+	}
+
+	@Override
+	public int calculateUsedBuffers(SingleInputGate inputGate) {
+		int availableFloatingBuffers = 0;
+		BufferPool bufferPool = inputGate.getBufferPool();
+		if (bufferPool != null) {
+			int requestedFloatingBuffers = bufferPool.bestEffortGetNumOfUsedBuffers();
+			for (InputChannel ic : inputGate.getInputChannels().values()) {
+				if (ic instanceof RemoteInputChannel) {
+					availableFloatingBuffers += ((RemoteInputChannel) ic).unsynchronizedGetFloatingBuffersAvailable();
+				}
+			}
+			return Math.max(0, requestedFloatingBuffers - availableFloatingBuffers);
+		}
+		return 0;
+	}
+
+	@Override
+	public int calculateTotalBuffers(SingleInputGate inputGate) {
+		BufferPool bufferPool = inputGate.getBufferPool();
+		if (bufferPool != null) {
+			return inputGate.getBufferPool().getNumBuffers();
+		}
+		return 0;
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBufferPoolUsageGauge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBufferPoolUsageGauge.java
index c7a6d4e..b5df3a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBufferPoolUsageGauge.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBufferPoolUsageGauge.java
@@ -18,38 +18,35 @@
 
 package org.apache.flink.runtime.io.network.metrics;
 
-import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Gauge metric measuring the input buffer pool usage gauge for {@link SingleInputGate}s.
  */
-public class InputBufferPoolUsageGauge implements Gauge<Float> {
-
-	private final SingleInputGate[] inputGates;
+public class InputBufferPoolUsageGauge extends AbstractBuffersUsageGauge {
 
 	public InputBufferPoolUsageGauge(SingleInputGate[] inputGates) {
-		this.inputGates = inputGates;
+		super(checkNotNull(inputGates));
 	}
 
 	@Override
-	public Float getValue() {
-		int usedBuffers = 0;
-		int bufferPoolSize = 0;
-
-		for (SingleInputGate inputGate : inputGates) {
-			BufferPool bufferPool = inputGate.getBufferPool();
-			if (bufferPool != null) {
-				usedBuffers += bufferPool.bestEffortGetNumOfUsedBuffers();
-				bufferPoolSize += bufferPool.getNumBuffers();
-			}
+	public int calculateUsedBuffers(SingleInputGate inputGate) {
+		BufferPool bufferPool = inputGate.getBufferPool();
+		if (bufferPool != null) {
+			return bufferPool.bestEffortGetNumOfUsedBuffers();
 		}
+		return 0;
+	}
 
-		if (bufferPoolSize != 0) {
-			return ((float) usedBuffers) / bufferPoolSize;
-		} else {
-			return 0.0f;
+	@Override
+	public int calculateTotalBuffers(SingleInputGate inputGate) {
+		BufferPool bufferPool = inputGate.getBufferPool();
+		if (bufferPool != null) {
+			return bufferPool.getNumBuffers();
 		}
+		return 0;
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/NettyShuffleMetricFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/NettyShuffleMetricFactory.java
index 6bcb027..4c8f1c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/NettyShuffleMetricFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/NettyShuffleMetricFactory.java
@@ -66,6 +66,8 @@ public class NettyShuffleMetricFactory {
 
 	private static final String METRIC_INPUT_QUEUE_LENGTH = "inputQueueLength";
 	private static final String METRIC_INPUT_POOL_USAGE = "inPoolUsage";
+	private static final String METRIC_INPUT_FLOATING_BUFFERS_USAGE = "inputFloatingBuffersUsage";
+	private static final String METRIC_INPUT_EXCLUSIVE_BUFFERS_USAGE = "inputExclusiveBuffersUsage";
 
 	private NettyShuffleMetricFactory() {
 	}
@@ -107,6 +109,7 @@ public class NettyShuffleMetricFactory {
 	@Deprecated
 	public static void registerLegacyNetworkMetrics(
 			boolean isDetailedMetrics,
+			boolean isCreditBased,
 			MetricGroup metricGroup,
 			ResultPartitionWriter[] producedPartitions,
 			InputGate[] inputGates) {
@@ -126,7 +129,7 @@ public class NettyShuffleMetricFactory {
 		registerOutputMetrics(isDetailedMetrics, outputGroup, buffersGroup, resultPartitions);
 
 		SingleInputGate[] singleInputGates = Arrays.copyOf(inputGates, inputGates.length, SingleInputGate[].class);
-		registerInputMetrics(isDetailedMetrics, inputGroup, buffersGroup, singleInputGates);
+		registerInputMetrics(isDetailedMetrics, isCreditBased, inputGroup, buffersGroup, singleInputGates);
 	}
 
 	public static void registerOutputMetrics(
@@ -154,10 +157,12 @@ public class NettyShuffleMetricFactory {
 
 	public static void registerInputMetrics(
 			boolean isDetailedMetrics,
+			boolean isCreditBased,
 			MetricGroup inputGroup,
 			SingleInputGate[] inputGates) {
 		registerInputMetrics(
 			isDetailedMetrics,
+			isCreditBased,
 			inputGroup,
 			inputGroup.addGroup(METRIC_GROUP_BUFFERS),
 			inputGates);
@@ -165,13 +170,28 @@ public class NettyShuffleMetricFactory {
 
 	private static void registerInputMetrics(
 			boolean isDetailedMetrics,
+			boolean isCreditBased,
 			MetricGroup inputGroup,
 			MetricGroup buffersGroup,
 			SingleInputGate[] inputGates) {
 		if (isDetailedMetrics) {
 			InputGateMetrics.registerQueueLengthMetrics(inputGroup, inputGates);
 		}
+
 		buffersGroup.gauge(METRIC_INPUT_QUEUE_LENGTH, new InputBuffersGauge(inputGates));
-		buffersGroup.gauge(METRIC_INPUT_POOL_USAGE, new InputBufferPoolUsageGauge(inputGates));
+
+		if (isCreditBased) {
+			FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates);
+			ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new ExclusiveBuffersUsageGauge(inputGates);
+			CreditBasedInputBuffersUsageGauge creditBasedInputBuffersUsageGauge = new CreditBasedInputBuffersUsageGauge(
+				floatingBuffersUsageGauge,
+				exclusiveBuffersUsageGauge,
+				inputGates);
+			buffersGroup.gauge(METRIC_INPUT_EXCLUSIVE_BUFFERS_USAGE, exclusiveBuffersUsageGauge);
+			buffersGroup.gauge(METRIC_INPUT_FLOATING_BUFFERS_USAGE, floatingBuffersUsageGauge);
+			buffersGroup.gauge(METRIC_INPUT_POOL_USAGE, creditBasedInputBuffersUsageGauge);
+		} else {
+			buffersGroup.gauge(METRIC_INPUT_POOL_USAGE, new InputBufferPoolUsageGauge(inputGates));
+		}
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 778aa1d..58b36c3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -434,6 +434,14 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
 		return Math.max(0, receivedBuffers.size());
 	}
 
+	public int unsynchronizedGetExclusiveBuffersUsed() {
+		return Math.max(0, initialCredit - bufferQueue.exclusiveBuffers.size());
+	}
+
+	public int unsynchronizedGetFloatingBuffersAvailable() {
+		return Math.max(0, bufferQueue.floatingBuffers.size());
+	}
+
 	public InputChannelID getInputChannelId() {
 		return id;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
new file mode 100644
index 0000000..a602648
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+	@Test
+	public void testCalculateTotalBuffersSize() throws IOException {
+		int numberOfRemoteChannels = 2;
+		int numberOfLocalChannels = 0;
+
+		int numberOfBufferPerChannel = 2;
+		int numberOfBuffersPerGate = 8;
+
+		NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder()
+			.setNetworkBuffersPerChannel(numberOfBufferPerChannel)
+			.setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate)
+			.build();
+
+		SingleInputGate inputGate1 = buildInputGate(
+			network,
+			numberOfRemoteChannels,
+			numberOfLocalChannels).f0;
+
+		SingleInputGate[] inputGates = new SingleInputGate[]{inputGate1};
+		FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates);
+		ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new ExclusiveBuffersUsageGauge(inputGates);
+		CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = new CreditBasedInputBuffersUsageGauge(
+			floatingBuffersUsageGauge,
+			exclusiveBuffersUsageGauge,
+			inputGates);
+
+		try (CloseableRegistry closeableRegistry = new CloseableRegistry()) {
+
+			closeableRegistry.registerCloseable(network::close);
+			closeableRegistry.registerCloseable(inputGate1::close);
+
+			assertEquals(numberOfBuffersPerGate, floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+			assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel, exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+			assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel + numberOfBuffersPerGate, inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
+		}
+	}
+
+	@Test
+	public void testExclusiveBuffersUsage() throws IOException {
+		int numberOfRemoteChannelsGate1 = 2;
+		int numberOfLocalChannelsGate1 = 0;
+		int numberOfRemoteChannelsGate2 = 1;
+		int numberOfLocalChannelsGate2 = 1;
+
+		int totalNumberOfRemoteChannels = numberOfRemoteChannelsGate1 + numberOfRemoteChannelsGate2;
+
+		int buffersPerChannel = 2;
+		int extraNetworkBuffersPerGate = 8;
+
+		NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder()
+			.setNetworkBuffersPerChannel(buffersPerChannel)
+			.setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate)
+			.build();
+
+		Tuple2<SingleInputGate, List<RemoteInputChannel>> tuple1 = buildInputGate(
+			network,
+			numberOfRemoteChannelsGate1,
+			numberOfLocalChannelsGate1);
+		Tuple2<SingleInputGate, List<RemoteInputChannel>> tuple2 = buildInputGate(
+			network,
+			numberOfRemoteChannelsGate2,
+			numberOfLocalChannelsGate2);
+
+		SingleInputGate inputGate1 = tuple1.f0;
+		SingleInputGate inputGate2 = tuple2.f0;
+
+		List<RemoteInputChannel> remoteInputChannels = tuple1.f1;
+
+		SingleInputGate[] inputGates = new SingleInputGate[]{tuple1.f0, tuple2.f0};
+		FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates);
+		ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new ExclusiveBuffersUsageGauge(inputGates);
+		CreditBasedInputBuffersUsageGauge inputBuffersUsageGauge = new CreditBasedInputBuffersUsageGauge(
+			floatingBuffersUsageGauge,
+			exclusiveBuffersUsageGauge,
+			inputGates);
+
+		try (CloseableRegistry closeableRegistry = new CloseableRegistry()) {
+			assertEquals(0.0, exclusiveBuffersUsageGauge.getValue(), 0.0);
+			assertEquals(0.0, inputBuffersUsageGauge.getValue(), 0.0);
+
+			int totalBuffers = extraNetworkBuffersPerGate * inputGates.length + buffersPerChannel * totalNumberOfRemoteChannels;
+
+			int channelIndex = 1;
+			for (RemoteInputChannel channel : remoteInputChannels) {
+				drainAndValidate(
+					buffersPerChannel,
+					buffersPerChannel * channelIndex++,
+					channel,
+					closeableRegistry,
+					totalBuffers,
+					buffersPerChannel * totalNumberOfRemoteChannels,
+					exclusiveBuffersUsageGauge,
+					inputBuffersUsageGauge,
+					inputGate1);
+			}
+		} finally {
+			inputGate1.close();
+			inputGate2.close();
+			network.close();
+		}
+	}
+
+	@Test
+	public void testFloatingBuffersUsage() throws IOException, InterruptedException {
+
+		int numberOfRemoteChannelsGate1 = 2;
+		int numberOfLocalChannelsGate1 = 0;
+		int numberOfRemoteChannelsGate2 = 1;
+		int numberOfLocalChannelsGate2 = 1;
+
+		int totalNumberOfRemoteChannels = numberOfRemoteChannelsGate1 + numberOfRemoteChannelsGate2;
+
+		int buffersPerChannel = 2;
+		int extraNetworkBuffersPerGate = 8;
+
+		NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder()
+			.setNetworkBuffersPerChannel(buffersPerChannel)
+			.setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate)
+			.build();
+
+		Tuple2<SingleInputGate, List<RemoteInputChannel>> tuple1 = buildInputGate(
+			network,
+			numberOfRemoteChannelsGate1,
+			numberOfLocalChannelsGate1);
+		SingleInputGate inputGate2 = buildInputGate(
+			network,
+			numberOfRemoteChannelsGate2,
+			numberOfLocalChannelsGate2).f0;
+
+		SingleInputGate inputGate1 = tuple1.f0;
+
+		RemoteInputChannel remoteInputChannel1 = tuple1.f1.get(0);
+
+		SingleInputGate[] inputGates = new SingleInputGate[]{tuple1.f0, inputGate2};
+		FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates);
+		ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new ExclusiveBuffersUsageGauge(inputGates);
+		CreditBasedInputBuffersUsageGauge inputBuffersUsageGauge = new CreditBasedInputBuffersUsageGauge(
+			floatingBuffersUsageGauge,
+			exclusiveBuffersUsageGauge,
+			inputGates);
+
+		try (CloseableRegistry closeableRegistry = new CloseableRegistry()) {
+			assertEquals(0.0, floatingBuffersUsageGauge.getValue(), 0.0);
+			assertEquals(0.0, inputBuffersUsageGauge.getValue(), 0.0);
+
+			// drain gate1's exclusive buffers
+			drainBuffer(buffersPerChannel, remoteInputChannel1, closeableRegistry);
+
+			int totalBuffers = extraNetworkBuffersPerGate * inputGates.length + buffersPerChannel * totalNumberOfRemoteChannels;
+
+			remoteInputChannel1.requestSubpartition(0);
+
+			int backlog = 3;
+			int totalRequestedBuffers = buffersPerChannel + backlog;
+
+			remoteInputChannel1.onSenderBacklog(backlog);
+
+			assertEquals(totalRequestedBuffers, remoteInputChannel1.unsynchronizedGetFloatingBuffersAvailable());
+
+			drainBuffer(totalRequestedBuffers, remoteInputChannel1, closeableRegistry);
+
+			assertEquals(0, remoteInputChannel1.unsynchronizedGetFloatingBuffersAvailable());
+			assertEquals((double) (buffersPerChannel + totalRequestedBuffers) / totalBuffers,
+				inputBuffersUsageGauge.getValue(), 0.0001);
+		} finally {
+			inputGate1.close();
+			inputGate2.close();
+			network.close();
+		}
+	}
+
+	private void drainAndValidate(
+		int numBuffersToRequest,
+		int totalRequestedBuffers,
+		RemoteInputChannel channel,
+		CloseableRegistry closeableRegistry,
+		int totalBuffers,
+		int totalExclusiveBuffers,
+		ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge,
+		CreditBasedInputBuffersUsageGauge inputBuffersUsageGauge,
+		SingleInputGate inputGate) throws IOException {
+
+		drainBuffer(numBuffersToRequest, channel, closeableRegistry);
+		assertEquals(totalRequestedBuffers, exclusiveBuffersUsageGauge.calculateUsedBuffers(inputGate));
+		assertEquals((double) totalRequestedBuffers / totalExclusiveBuffers, exclusiveBuffersUsageGauge.getValue(), 0.0001);
+		assertEquals((double) totalRequestedBuffers / totalBuffers, inputBuffersUsageGauge.getValue(), 0.0001);
+	}
+
+	private void drainBuffer(int boundary, RemoteInputChannel channel, CloseableRegistry closeableRegistry) throws IOException {
+		for (int i = 0; i < boundary; i++) {
+			Buffer buffer = channel.requestBuffer();
+			if (buffer != null) {
+				closeableRegistry.registerCloseable(buffer::recycleBuffer);
+			} else {
+				break;
+			}
+		}
+	}
+
+	private Tuple2<SingleInputGate, List<RemoteInputChannel>> buildInputGate(
+		NettyShuffleEnvironment network,
+		int numberOfRemoteChannels,
+		int numberOfLocalChannels) throws IOException {
+
+		SingleInputGate inputGate = new SingleInputGateBuilder()
+			.setNumberOfChannels(numberOfRemoteChannels + numberOfLocalChannels)
+			.setResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED)
+			.setupBufferPoolFactory(network)
+			.build();
+
+		Tuple2<SingleInputGate, List<RemoteInputChannel>> res = Tuple2.of(inputGate, new ArrayList<>());
+
+		int channelIdx = 0;
+		for (int i = 0; i < numberOfRemoteChannels; i++) {
+			res.f1.add(buildRemoteChannel(channelIdx, inputGate, network));
+			channelIdx++;
+		}
+
+		for (int i = 0; i < numberOfLocalChannels; i++) {
+			buildLocalChannel(channelIdx, inputGate, network);
+		}
+		inputGate.setup();
+		return res;
+	}
+
+	private RemoteInputChannel buildRemoteChannel(
+		int channelIndex,
+		SingleInputGate inputGate,
+		NettyShuffleEnvironment network) {
+		return new InputChannelBuilder()
+			.setChannelIndex(channelIndex)
+			.setupFromNettyShuffleEnvironment(network)
+			.setConnectionManager(new TestingConnectionManager())
+			.buildRemoteAndSetToGate(inputGate);
+	}
+
+	private void buildLocalChannel(
+		int channelIndex,
+		SingleInputGate inputGate,
+		NettyShuffleEnvironment network) {
+		new InputChannelBuilder()
+			.setChannelIndex(channelIndex)
+			.setupFromNettyShuffleEnvironment(network)
+			.setConnectionManager(new TestingConnectionManager())
+			.buildLocalAndSetToGate(inputGate);
+	}
+}