You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2019/08/05 11:26:02 UTC

[flink] branch master updated: [FLINK-12576][Network, Metrics] Take LocalInputChannel into account when compute inputQueueLength (#8559)

This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 302fc1d  [FLINK-12576][Network, Metrics] Take LocalInputChannel into account when compute inputQueueLength (#8559)
302fc1d is described below

commit 302fc1d5de38bb6db99ddc45470efcc9ebd782bc
Author: Aitozi <10...@qq.com>
AuthorDate: Mon Aug 5 19:25:29 2019 +0800

    [FLINK-12576][Network, Metrics] Take LocalInputChannel into account when compute inputQueueLength (#8559)
    
    * [FLINK-12576]Take localInputChannel into account when complute inputQueueLength
    
    * remove default method
    
    * fix comments
    
    * fix up
    
    * fix up
    
    * u
---
 docs/monitoring/metrics.md                         |  2 +-
 docs/monitoring/metrics.zh.md                      |  2 +-
 .../BoundedBlockingSubpartitionReader.java         |  5 +++
 .../partition/NoOpResultSubpartitionView.java      |  5 +++
 .../partition/PipelinedSubpartitionView.java       |  5 +++
 .../network/partition/ResultSubpartitionView.java  |  2 +
 .../network/partition/consumer/InputChannel.java   |  8 ++++
 .../partition/consumer/LocalInputChannel.java      | 11 ++++++
 .../partition/consumer/RemoteInputChannel.java     |  1 +
 .../partition/consumer/SingleInputGate.java        |  4 +-
 .../network/netty/CancelPartitionRequestTest.java  |  5 +++
 .../partition/consumer/LocalInputChannelTest.java  |  1 +
 .../partition/consumer/SingleInputGateTest.java    | 44 ++++++++++++++++++++++
 13 files changed, 90 insertions(+), 5 deletions(-)

diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 211ccfa..9b5c504 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -1031,7 +1031,7 @@ Thus, in order to infer the metric identifier:
       <th rowspan="8">Task</th>
       <td rowspan="4">buffers</td>
       <td>inputQueueLength</td>
-      <td>The number of queued input buffers.</td>
+      <td>The number of queued input buffers. (ignores LocalInputChannels which are using blocking subpartitions)</td>
       <td>Gauge</td>
     </tr>
     <tr>
diff --git a/docs/monitoring/metrics.zh.md b/docs/monitoring/metrics.zh.md
index 44b4806..d262c44 100644
--- a/docs/monitoring/metrics.zh.md
+++ b/docs/monitoring/metrics.zh.md
@@ -1030,7 +1030,7 @@ Thus, in order to infer the metric identifier:
       <th rowspan="8">Task</th>
       <td rowspan="4">buffers</td>
       <td>inputQueueLength</td>
-      <td>The number of queued input buffers.</td>
+      <td>The number of queued input buffers. (ignores LocalInputChannels which are using blocking subpartitions)</td>
       <td>Gauge</td>
     </tr>
     <tr>
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
index 63e5e22..2da9534 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
@@ -167,6 +167,11 @@ final class BoundedBlockingSubpartitionReader implements ResultSubpartitionView
 	}
 
 	@Override
+	public int unsynchronizedGetNumberOfQueuedBuffers() {
+		return parent.unsynchronizedGetNumberOfQueuedBuffers();
+	}
+
+	@Override
 	public String toString() {
 		return String.format("Blocking Subpartition Reader: ID=%s, index=%d",
 				parent.parent.getPartitionId(),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java
index f3ba1e3..b961ab6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java
@@ -61,4 +61,9 @@ public class NoOpResultSubpartitionView implements ResultSubpartitionView {
 	public boolean isAvailable() {
 		return false;
 	}
+
+	@Override
+	public int unsynchronizedGetNumberOfQueuedBuffers() {
+		return 0;
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
index 9d08358..94ada2d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
@@ -91,6 +91,11 @@ class PipelinedSubpartitionView implements ResultSubpartitionView {
 	}
 
 	@Override
+	public int unsynchronizedGetNumberOfQueuedBuffers() {
+		return parent.unsynchronizedGetNumberOfQueuedBuffers();
+	}
+
+	@Override
 	public String toString() {
 		return String.format("PipelinedSubpartitionView(index: %d) of ResultPartition %s",
 				parent.index,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
index a755955..49ff4f8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
@@ -60,4 +60,6 @@ public interface ResultSubpartitionView {
 	boolean nextBufferIsEvent();
 
 	boolean isAvailable();
+
+	int unsynchronizedGetNumberOfQueuedBuffers();
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index c0a204b..a0d3cb5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -249,6 +249,14 @@ public abstract class InputChannel {
 	}
 
 	// ------------------------------------------------------------------------
+	// Metric related method
+	// ------------------------------------------------------------------------
+
+	public int unsynchronizedGetNumberOfQueuedBuffers() {
+		return 0;
+	}
+
+	// ------------------------------------------------------------------------
 
 	/**
 	 * A combination of a {@link Buffer} and a flag indicating availability of further buffers,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 3a54310..4c7b098 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -261,6 +261,17 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
 	}
 
 	@Override
+	public int unsynchronizedGetNumberOfQueuedBuffers() {
+		ResultSubpartitionView view = subpartitionView;
+
+		if (view != null) {
+			return view.unsynchronizedGetNumberOfQueuedBuffers();
+		}
+
+		return 0;
+	}
+
+	@Override
 	public String toString() {
 		return "LocalInputChannel [" + partitionId + "]";
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 58b36c3..8ffca16 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -430,6 +430,7 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
 		}
 	}
 
+	@Override
 	public int unsynchronizedGetNumberOfQueuedBuffers() {
 		return Math.max(0, receivedBuffers.size());
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index fd40c94..13b56df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -279,9 +279,7 @@ public class SingleInputGate extends InputGate {
 				int totalBuffers = 0;
 
 				for (InputChannel channel : inputChannels.values()) {
-					if (channel instanceof RemoteInputChannel) {
-						totalBuffers += ((RemoteInputChannel) channel).getNumberOfQueuedBuffers();
-					}
+					totalBuffers += channel.unsynchronizedGetNumberOfQueuedBuffers();
 				}
 
 				return  totalBuffers;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
index 58c02df..3640864 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
@@ -225,6 +225,11 @@ public class CancelPartitionRequestTest {
 		}
 
 		@Override
+		public int unsynchronizedGetNumberOfQueuedBuffers() {
+			return 0;
+		}
+
+		@Override
 		public Throwable getFailureCause() {
 			return null;
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index fd7cdd1..dc8b501 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -558,5 +558,6 @@ public class LocalInputChannelTest {
 
 			return null;
 		}
+
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 20eae98..057ac5e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
@@ -41,6 +42,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -559,6 +561,48 @@ public class SingleInputGateTest extends InputGateTestBase {
 		}
 	}
 
+	@Test
+	public void testQueuedBuffers() throws Exception {
+		final NettyShuffleEnvironment network = createNettyShuffleEnvironment();
+
+		final ResultPartition resultPartition = new ResultPartitionBuilder()
+			.setResultPartitionManager(network.getResultPartitionManager())
+			.setupBufferPoolFactoryFromNettyShuffleEnvironment(network)
+			.build();
+
+		final SingleInputGate inputGate = createInputGate(network, 2, ResultPartitionType.PIPELINED);
+
+		final ResultPartitionID localResultPartitionId = resultPartition.getPartitionId();
+
+		final RemoteInputChannel remoteInputChannel = InputChannelBuilder.newBuilder()
+			.setChannelIndex(1)
+			.setupFromNettyShuffleEnvironment(network)
+			.setConnectionManager(new TestingConnectionManager())
+			.buildRemoteAndSetToGate(inputGate);
+
+		InputChannelBuilder.newBuilder()
+			.setChannelIndex(0)
+			.setPartitionId(localResultPartitionId)
+			.setupFromNettyShuffleEnvironment(network)
+			.setConnectionManager(new TestingConnectionManager())
+			.buildLocalAndSetToGate(inputGate);
+
+		try {
+			resultPartition.setup();
+			inputGate.setup();
+
+			remoteInputChannel.onBuffer(TestBufferFactory.createBuffer(1), 0, 0);
+			assertEquals(1, inputGate.getNumberOfQueuedBuffers());
+
+			resultPartition.addBufferConsumer(BufferBuilderTestUtils.createFilledBufferConsumer(1), 0);
+			assertEquals(2, inputGate.getNumberOfQueuedBuffers());
+		} finally {
+			resultPartition.release();
+			inputGate.close();
+			network.close();
+		}
+	}
+
 	/**
 	 * Tests that if the {@link PartitionNotFoundException} is set onto one {@link InputChannel},
 	 * then it would be thrown directly via {@link SingleInputGate#getNextBufferOrEvent()}. So we