You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2019/08/05 11:26:02 UTC
[flink] branch master updated: [FLINK-12576][Network,
Metrics] Take LocalInputChannel into account when compute
inputQueueLength (#8559)
This is an automated email from the ASF dual-hosted git repository.
zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 302fc1d [FLINK-12576][Network, Metrics] Take LocalInputChannel into account when compute inputQueueLength (#8559)
302fc1d is described below
commit 302fc1d5de38bb6db99ddc45470efcc9ebd782bc
Author: Aitozi <10...@qq.com>
AuthorDate: Mon Aug 5 19:25:29 2019 +0800
[FLINK-12576][Network, Metrics] Take LocalInputChannel into account when compute inputQueueLength (#8559)
* [FLINK-12576]Take localInputChannel into account when complute inputQueueLength
* remove default method
* fix comments
* fix up
* fix up
* u
---
docs/monitoring/metrics.md | 2 +-
docs/monitoring/metrics.zh.md | 2 +-
.../BoundedBlockingSubpartitionReader.java | 5 +++
.../partition/NoOpResultSubpartitionView.java | 5 +++
.../partition/PipelinedSubpartitionView.java | 5 +++
.../network/partition/ResultSubpartitionView.java | 2 +
.../network/partition/consumer/InputChannel.java | 8 ++++
.../partition/consumer/LocalInputChannel.java | 11 ++++++
.../partition/consumer/RemoteInputChannel.java | 1 +
.../partition/consumer/SingleInputGate.java | 4 +-
.../network/netty/CancelPartitionRequestTest.java | 5 +++
.../partition/consumer/LocalInputChannelTest.java | 1 +
.../partition/consumer/SingleInputGateTest.java | 44 ++++++++++++++++++++++
13 files changed, 90 insertions(+), 5 deletions(-)
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 211ccfa..9b5c504 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -1031,7 +1031,7 @@ Thus, in order to infer the metric identifier:
<th rowspan="8">Task</th>
<td rowspan="4">buffers</td>
<td>inputQueueLength</td>
- <td>The number of queued input buffers.</td>
+ <td>The number of queued input buffers. (ignores LocalInputChannels which are using blocking subpartitions)</td>
<td>Gauge</td>
</tr>
<tr>
diff --git a/docs/monitoring/metrics.zh.md b/docs/monitoring/metrics.zh.md
index 44b4806..d262c44 100644
--- a/docs/monitoring/metrics.zh.md
+++ b/docs/monitoring/metrics.zh.md
@@ -1030,7 +1030,7 @@ Thus, in order to infer the metric identifier:
<th rowspan="8">Task</th>
<td rowspan="4">buffers</td>
<td>inputQueueLength</td>
- <td>The number of queued input buffers.</td>
+ <td>The number of queued input buffers. (ignores LocalInputChannels which are using blocking subpartitions)</td>
<td>Gauge</td>
</tr>
<tr>
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
index 63e5e22..2da9534 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
@@ -167,6 +167,11 @@ final class BoundedBlockingSubpartitionReader implements ResultSubpartitionView
}
@Override
+ public int unsynchronizedGetNumberOfQueuedBuffers() {
+ return parent.unsynchronizedGetNumberOfQueuedBuffers();
+ }
+
+ @Override
public String toString() {
return String.format("Blocking Subpartition Reader: ID=%s, index=%d",
parent.parent.getPartitionId(),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java
index f3ba1e3..b961ab6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java
@@ -61,4 +61,9 @@ public class NoOpResultSubpartitionView implements ResultSubpartitionView {
public boolean isAvailable() {
return false;
}
+
+ @Override
+ public int unsynchronizedGetNumberOfQueuedBuffers() {
+ return 0;
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
index 9d08358..94ada2d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
@@ -91,6 +91,11 @@ class PipelinedSubpartitionView implements ResultSubpartitionView {
}
@Override
+ public int unsynchronizedGetNumberOfQueuedBuffers() {
+ return parent.unsynchronizedGetNumberOfQueuedBuffers();
+ }
+
+ @Override
public String toString() {
return String.format("PipelinedSubpartitionView(index: %d) of ResultPartition %s",
parent.index,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
index a755955..49ff4f8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
@@ -60,4 +60,6 @@ public interface ResultSubpartitionView {
boolean nextBufferIsEvent();
boolean isAvailable();
+
+ int unsynchronizedGetNumberOfQueuedBuffers();
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index c0a204b..a0d3cb5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -249,6 +249,14 @@ public abstract class InputChannel {
}
// ------------------------------------------------------------------------
+ // Metric related method
+ // ------------------------------------------------------------------------
+
+ public int unsynchronizedGetNumberOfQueuedBuffers() {
+ return 0;
+ }
+
+ // ------------------------------------------------------------------------
/**
* A combination of a {@link Buffer} and a flag indicating availability of further buffers,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 3a54310..4c7b098 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -261,6 +261,17 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
}
@Override
+ public int unsynchronizedGetNumberOfQueuedBuffers() {
+ ResultSubpartitionView view = subpartitionView;
+
+ if (view != null) {
+ return view.unsynchronizedGetNumberOfQueuedBuffers();
+ }
+
+ return 0;
+ }
+
+ @Override
public String toString() {
return "LocalInputChannel [" + partitionId + "]";
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 58b36c3..8ffca16 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -430,6 +430,7 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
}
}
+ @Override
public int unsynchronizedGetNumberOfQueuedBuffers() {
return Math.max(0, receivedBuffers.size());
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index fd40c94..13b56df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -279,9 +279,7 @@ public class SingleInputGate extends InputGate {
int totalBuffers = 0;
for (InputChannel channel : inputChannels.values()) {
- if (channel instanceof RemoteInputChannel) {
- totalBuffers += ((RemoteInputChannel) channel).getNumberOfQueuedBuffers();
- }
+ totalBuffers += channel.unsynchronizedGetNumberOfQueuedBuffers();
}
return totalBuffers;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
index 58c02df..3640864 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
@@ -225,6 +225,11 @@ public class CancelPartitionRequestTest {
}
@Override
+ public int unsynchronizedGetNumberOfQueuedBuffers() {
+ return 0;
+ }
+
+ @Override
public Throwable getFailureCause() {
return null;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index fd7cdd1..dc8b501 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -558,5 +558,6 @@ public class LocalInputChannelTest {
return null;
}
+
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 20eae98..057ac5e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
@@ -41,6 +42,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.io.network.util.TestTaskEvent;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -559,6 +561,48 @@ public class SingleInputGateTest extends InputGateTestBase {
}
}
+ @Test
+ public void testQueuedBuffers() throws Exception {
+ final NettyShuffleEnvironment network = createNettyShuffleEnvironment();
+
+ final ResultPartition resultPartition = new ResultPartitionBuilder()
+ .setResultPartitionManager(network.getResultPartitionManager())
+ .setupBufferPoolFactoryFromNettyShuffleEnvironment(network)
+ .build();
+
+ final SingleInputGate inputGate = createInputGate(network, 2, ResultPartitionType.PIPELINED);
+
+ final ResultPartitionID localResultPartitionId = resultPartition.getPartitionId();
+
+ final RemoteInputChannel remoteInputChannel = InputChannelBuilder.newBuilder()
+ .setChannelIndex(1)
+ .setupFromNettyShuffleEnvironment(network)
+ .setConnectionManager(new TestingConnectionManager())
+ .buildRemoteAndSetToGate(inputGate);
+
+ InputChannelBuilder.newBuilder()
+ .setChannelIndex(0)
+ .setPartitionId(localResultPartitionId)
+ .setupFromNettyShuffleEnvironment(network)
+ .setConnectionManager(new TestingConnectionManager())
+ .buildLocalAndSetToGate(inputGate);
+
+ try {
+ resultPartition.setup();
+ inputGate.setup();
+
+ remoteInputChannel.onBuffer(TestBufferFactory.createBuffer(1), 0, 0);
+ assertEquals(1, inputGate.getNumberOfQueuedBuffers());
+
+ resultPartition.addBufferConsumer(BufferBuilderTestUtils.createFilledBufferConsumer(1), 0);
+ assertEquals(2, inputGate.getNumberOfQueuedBuffers());
+ } finally {
+ resultPartition.release();
+ inputGate.close();
+ network.close();
+ }
+ }
+
/**
* Tests that if the {@link PartitionNotFoundException} is set onto one {@link InputChannel},
* then it would be thrown directly via {@link SingleInputGate#getNextBufferOrEvent()}. So we