You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/08/15 07:02:17 UTC
[flink] branch release-1.6 updated: [FLINK-10022][network][metrics]
add metrics for input/output buffers
This is an automated email from the ASF dual-hosted git repository.
nkruber pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.6 by this push:
new 9b8202b [FLINK-10022][network][metrics] add metrics for input/output buffers
9b8202b is described below
commit 9b8202bc246ec7f03a723061f0580695a8eaae03
Author: Nico Kruber <ni...@gmail.com>
AuthorDate: Tue Aug 14 21:50:58 2018 +0200
[FLINK-10022][network][metrics] add metrics for input/output buffers
This closes #6551.
---
docs/monitoring/metrics.md | 31 ++++++++++++++++++++++
.../io/network/api/writer/RecordWriter.java | 4 +++
.../network/partition/consumer/InputChannel.java | 6 ++++-
.../partition/consumer/LocalInputChannel.java | 3 ++-
.../partition/consumer/RemoteInputChannel.java | 3 ++-
.../partition/consumer/UnknownInputChannel.java | 2 +-
.../apache/flink/runtime/metrics/MetricNames.java | 8 ++++++
.../runtime/metrics/groups/TaskIOMetricGroup.java | 26 ++++++++++++++++++
.../partition/consumer/InputChannelTest.java | 2 +-
.../partition/consumer/TestInputChannel.java | 2 +-
.../metrics/groups/TaskIOMetricGroupTest.java | 6 +++++
11 files changed, 87 insertions(+), 6 deletions(-)
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 55f626e..8be5878 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -1244,6 +1244,27 @@ Thus, in order to infer the metric identifier:
<td>Meter</td>
</tr>
<tr>
+ <th rowspan="6"><strong>Task</strong></th>
+ <td>numBuffersInLocal</td>
+ <td>The total number of network buffers this task has read from a local source.</td>
+ <td>Counter</td>
+ </tr>
+ <tr>
+ <td>numBuffersInLocalPerSecond</td>
+ <td>The number of network buffers this task reads from a local source per second.</td>
+ <td>Meter</td>
+ </tr>
+ <tr>
+ <td>numBuffersInRemote</td>
+ <td>The total number of network buffers this task has read from a remote source.</td>
+ <td>Counter</td>
+ </tr>
+ <tr>
+ <td>numBuffersInRemotePerSecond</td>
+ <td>The number of network buffers this task reads from a remote source per second.</td>
+ <td>Meter</td>
+ </tr>
+ <tr>
<td>numBytesOut</td>
<td>The total number of bytes this task has emitted.</td>
<td>Counter</td>
@@ -1254,6 +1275,16 @@ Thus, in order to infer the metric identifier:
<td>Meter</td>
</tr>
<tr>
+ <td>numBuffersOut</td>
+ <td>The total number of network buffers this task has emitted.</td>
+ <td>Counter</td>
+ </tr>
+ <tr>
+ <td>numBuffersOutPerSecond</td>
+ <td>The number of network buffers this task emits per second.</td>
+ <td>Meter</td>
+ </tr>
+ <tr>
<th rowspan="6"><strong>Task/Operator</strong></th>
<td>numRecordsIn</td>
<td>The total number of records this operator/task has received.</td>
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index e3a8e49..970795c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -71,6 +71,8 @@ public class RecordWriter<T extends IOReadableWritable> {
private Counter numBytesOut = new SimpleCounter();
+ private Counter numBuffersOut = new SimpleCounter();
+
public RecordWriter(ResultPartitionWriter writer) {
this(writer, new RoundRobinChannelSelector<T>());
}
@@ -184,6 +186,7 @@ public class RecordWriter<T extends IOReadableWritable> {
*/
public void setMetricGroup(TaskIOMetricGroup metrics) {
numBytesOut = metrics.getNumBytesOutCounter();
+ numBuffersOut = metrics.getNumBuffersOutCounter();
}
/**
@@ -200,6 +203,7 @@ public class RecordWriter<T extends IOReadableWritable> {
bufferBuilders[targetChannel] = Optional.empty();
numBytesOut.inc(bufferBuilder.finish());
+ numBuffersOut.inc();
serializer.clear();
return true;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 3ce5866..2a7cedf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -64,6 +64,8 @@ public abstract class InputChannel {
protected final Counter numBytesIn;
+ protected final Counter numBuffersIn;
+
/** The current backoff (in ms) */
private int currentBackoff;
@@ -73,7 +75,8 @@ public abstract class InputChannel {
ResultPartitionID partitionId,
int initialBackoff,
int maxBackoff,
- Counter numBytesIn) {
+ Counter numBytesIn,
+ Counter numBuffersIn) {
checkArgument(channelIndex >= 0);
@@ -91,6 +94,7 @@ public abstract class InputChannel {
this.currentBackoff = initial == 0 ? -1 : 0;
this.numBytesIn = numBytesIn;
+ this.numBuffersIn = numBuffersIn;
}
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index f9c75ad..4b3a8ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -84,7 +84,7 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
int maxBackoff,
TaskIOMetricGroup metrics) {
- super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, metrics.getNumBytesInLocalCounter());
+ super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, metrics.getNumBytesInLocalCounter(), metrics.getNumBuffersInLocalCounter());
this.partitionManager = checkNotNull(partitionManager);
this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
@@ -194,6 +194,7 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
}
numBytesIn.inc(next.buffer().getSizeUnsafe());
+ numBuffersIn.inc();
return Optional.of(new BufferAndAvailability(next.buffer(), next.isMoreAvailable(), next.buffersInBacklog()));
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index b94f48a..28f3020 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -123,7 +123,7 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
int maxBackoff,
TaskIOMetricGroup metrics) {
- super(inputGate, channelIndex, partitionId, initialBackOff, maxBackoff, metrics.getNumBytesInRemoteCounter());
+ super(inputGate, channelIndex, partitionId, initialBackOff, maxBackoff, metrics.getNumBytesInRemoteCounter(), metrics.getNumBuffersInRemoteCounter());
this.connectionId = checkNotNull(connectionId);
this.connectionManager = checkNotNull(connectionManager);
@@ -199,6 +199,7 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
}
numBytesIn.inc(next.getSizeUnsafe());
+ numBuffersIn.inc();
return Optional.of(new BufferAndAvailability(next, remaining > 0, getSenderBacklog()));
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index 1101f66..20a7aed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -61,7 +61,7 @@ class UnknownInputChannel extends InputChannel {
int maxBackoff,
TaskIOMetricGroup metrics) {
- super(gate, channelIndex, partitionId, initialBackoff, maxBackoff, null);
+ super(gate, channelIndex, partitionId, initialBackoff, maxBackoff, null, null);
this.partitionManager = checkNotNull(partitionManager);
this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
index d15a0f1..c106c39 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
@@ -40,6 +40,14 @@ public class MetricNames {
public static final String IO_NUM_BYTES_IN_REMOTE_RATE = IO_NUM_BYTES_IN_REMOTE + SUFFIX_RATE;
public static final String IO_NUM_BYTES_OUT_RATE = IO_NUM_BYTES_OUT + SUFFIX_RATE;
+ public static final String IO_NUM_BUFFERS_IN = "numBuffersIn";
+ public static final String IO_NUM_BUFFERS_IN_LOCAL = IO_NUM_BUFFERS_IN + "Local";
+ public static final String IO_NUM_BUFFERS_IN_REMOTE = IO_NUM_BUFFERS_IN + "Remote";
+ public static final String IO_NUM_BUFFERS_OUT = "numBuffersOut";
+ public static final String IO_NUM_BUFFERS_IN_LOCAL_RATE = IO_NUM_BUFFERS_IN_LOCAL + SUFFIX_RATE;
+ public static final String IO_NUM_BUFFERS_IN_REMOTE_RATE = IO_NUM_BUFFERS_IN_REMOTE + SUFFIX_RATE;
+ public static final String IO_NUM_BUFFERS_OUT_RATE = IO_NUM_BUFFERS_OUT + SUFFIX_RATE;
+
public static final String IO_CURRENT_INPUT_WATERMARK = "currentInputWatermark";
public static final String IO_CURRENT_INPUT_1_WATERMARK = "currentInput1Watermark";
public static final String IO_CURRENT_INPUT_2_WATERMARK = "currentInput2Watermark";
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index e12ecd7..79c5e8f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -44,12 +44,18 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
private final Counter numBytesInRemote;
private final SumCounter numRecordsIn;
private final SumCounter numRecordsOut;
+ private final Counter numBuffersOut;
+ private final Counter numBuffersInLocal;
+ private final Counter numBuffersInRemote;
private final Meter numBytesInRateLocal;
private final Meter numBytesInRateRemote;
private final Meter numBytesOutRate;
private final Meter numRecordsInRate;
private final Meter numRecordsOutRate;
+ private final Meter numBuffersOutRate;
+ private final Meter numBuffersInRateLocal;
+ private final Meter numBuffersInRateRemote;
public TaskIOMetricGroup(TaskMetricGroup parent) {
super(parent);
@@ -60,10 +66,18 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
this.numBytesOutRate = meter(MetricNames.IO_NUM_BYTES_OUT_RATE, new MeterView(numBytesOut, 60));
this.numBytesInRateLocal = meter(MetricNames.IO_NUM_BYTES_IN_LOCAL_RATE, new MeterView(numBytesInLocal, 60));
this.numBytesInRateRemote = meter(MetricNames.IO_NUM_BYTES_IN_REMOTE_RATE, new MeterView(numBytesInRemote, 60));
+
this.numRecordsIn = counter(MetricNames.IO_NUM_RECORDS_IN, new SumCounter());
this.numRecordsOut = counter(MetricNames.IO_NUM_RECORDS_OUT, new SumCounter());
this.numRecordsInRate = meter(MetricNames.IO_NUM_RECORDS_IN_RATE, new MeterView(numRecordsIn, 60));
this.numRecordsOutRate = meter(MetricNames.IO_NUM_RECORDS_OUT_RATE, new MeterView(numRecordsOut, 60));
+
+ this.numBuffersOut = counter(MetricNames.IO_NUM_BUFFERS_OUT);
+ this.numBuffersInLocal = counter(MetricNames.IO_NUM_BUFFERS_IN_LOCAL);
+ this.numBuffersInRemote = counter(MetricNames.IO_NUM_BUFFERS_IN_REMOTE);
+ this.numBuffersOutRate = meter(MetricNames.IO_NUM_BUFFERS_OUT_RATE, new MeterView(numBuffersOut, 60));
+ this.numBuffersInRateLocal = meter(MetricNames.IO_NUM_BUFFERS_IN_LOCAL_RATE, new MeterView(numBuffersInLocal, 60));
+ this.numBuffersInRateRemote = meter(MetricNames.IO_NUM_BUFFERS_IN_REMOTE_RATE, new MeterView(numBuffersInRemote, 60));
}
public IOMetrics createSnapshot() {
@@ -93,6 +107,18 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
return numRecordsOut;
}
+ public Counter getNumBuffersOutCounter() {
+ return numBuffersOut;
+ }
+
+ public Counter getNumBuffersInLocalCounter() {
+ return numBuffersInLocal;
+ }
+
+ public Counter getNumBuffersInRemoteCounter() {
+ return numBuffersInRemote;
+ }
+
public Meter getNumBytesInLocalRateMeter() {
return numBytesInRateLocal;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
index abadddf..d757aa9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
@@ -120,7 +120,7 @@ public class InputChannelTest {
int initialBackoff,
int maxBackoff) {
- super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, new SimpleCounter());
+ super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, new SimpleCounter(), new SimpleCounter());
}
@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
index 80e07f5..ac3f0ff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
@@ -48,7 +48,7 @@ public class TestInputChannel extends InputChannel {
private boolean isReleased = false;
TestInputChannel(SingleInputGate inputGate, int channelIndex) {
- super(inputGate, channelIndex, new ResultPartitionID(), 0, 0, new SimpleCounter());
+ super(inputGate, channelIndex, new ResultPartitionID(), 0, 0, new SimpleCounter(), new SimpleCounter());
}
public TestInputChannel read(Buffer buffer) throws IOException, InterruptedException {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
index f23b2f5..b02be74 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
@@ -54,6 +54,9 @@ public class TaskIOMetricGroupTest {
taskIO.getNumBytesInLocalCounter().inc(100L);
taskIO.getNumBytesInRemoteCounter().inc(150L);
taskIO.getNumBytesOutCounter().inc(250L);
+ taskIO.getNumBuffersInLocalCounter().inc(1L);
+ taskIO.getNumBuffersInRemoteCounter().inc(2L);
+ taskIO.getNumBuffersOutCounter().inc(3L);
IOMetrics io = taskIO.createSnapshot();
assertEquals(32L, io.getNumRecordsIn());
@@ -61,5 +64,8 @@ public class TaskIOMetricGroupTest {
assertEquals(100L, io.getNumBytesInLocal());
assertEquals(150L, io.getNumBytesInRemote());
assertEquals(250L, io.getNumBytesOut());
+ assertEquals(1L, taskIO.getNumBuffersInLocalCounter().getCount());
+ assertEquals(2L, taskIO.getNumBuffersInRemoteCounter().getCount());
+ assertEquals(3L, taskIO.getNumBuffersOutCounter().getCount());
}
}