You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/08/15 07:06:17 UTC

[flink] branch master updated: [FLINK-10022][network][metrics] add metrics for input/output buffers

This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a40a659  [FLINK-10022][network][metrics] add metrics for input/output buffers
a40a659 is described below

commit a40a659259882bef3e0f3c5b4356a731ab52a394
Author: Nico Kruber <ni...@gmail.com>
AuthorDate: Tue Aug 14 21:50:58 2018 +0200

    [FLINK-10022][network][metrics] add metrics for input/output buffers
    
    This closes #6551.
---
 docs/monitoring/metrics.md                         | 31 ++++++++++++++++++++++
 .../io/network/api/writer/RecordWriter.java        |  4 +++
 .../network/partition/consumer/InputChannel.java   |  6 ++++-
 .../partition/consumer/LocalInputChannel.java      |  3 ++-
 .../partition/consumer/RemoteInputChannel.java     |  3 ++-
 .../partition/consumer/UnknownInputChannel.java    |  2 +-
 .../apache/flink/runtime/metrics/MetricNames.java  |  8 ++++++
 .../runtime/metrics/groups/TaskIOMetricGroup.java  | 26 ++++++++++++++++++
 .../partition/consumer/InputChannelTest.java       |  2 +-
 .../partition/consumer/TestInputChannel.java       |  2 +-
 .../metrics/groups/TaskIOMetricGroupTest.java      |  6 +++++
 11 files changed, 87 insertions(+), 6 deletions(-)

diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 0542737..e46549a 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -1244,6 +1244,27 @@ Thus, in order to infer the metric identifier:
       <td>Meter</td>
     </tr>
     <tr>
+      <th rowspan="6"><strong>Task</strong></th>
+      <td>numBuffersInLocal</td>
+      <td>The total number of network buffers this task has read from a local source.</td>
+      <td>Counter</td>
+    </tr>
+    <tr>
+      <td>numBuffersInLocalPerSecond</td>
+      <td>The number of network buffers this task reads from a local source per second.</td>
+      <td>Meter</td>
+    </tr>
+    <tr>
+      <td>numBuffersInRemote</td>
+      <td>The total number of network buffers this task has read from a remote source.</td>
+      <td>Counter</td>
+    </tr>
+    <tr>
+      <td>numBuffersInRemotePerSecond</td>
+      <td>The number of network buffers this task reads from a remote source per second.</td>
+      <td>Meter</td>
+    </tr>
+    <tr>
       <td>numBytesOut</td>
       <td>The total number of bytes this task has emitted.</td>
       <td>Counter</td>
@@ -1254,6 +1275,16 @@ Thus, in order to infer the metric identifier:
       <td>Meter</td>
     </tr>
     <tr>
+      <td>numBuffersOut</td>
+      <td>The total number of network buffers this task has emitted.</td>
+      <td>Counter</td>
+    </tr>
+    <tr>
+      <td>numBuffersOutPerSecond</td>
+      <td>The number of network buffers this task emits per second.</td>
+      <td>Meter</td>
+    </tr>
+    <tr>
       <th rowspan="6"><strong>Task/Operator</strong></th>
       <td>numRecordsIn</td>
       <td>The total number of records this operator/task has received.</td>
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index e3a8e49..970795c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -71,6 +71,8 @@ public class RecordWriter<T extends IOReadableWritable> {
 
 	private Counter numBytesOut = new SimpleCounter();
 
+	private Counter numBuffersOut = new SimpleCounter();
+
 	public RecordWriter(ResultPartitionWriter writer) {
 		this(writer, new RoundRobinChannelSelector<T>());
 	}
@@ -184,6 +186,7 @@ public class RecordWriter<T extends IOReadableWritable> {
      */
 	public void setMetricGroup(TaskIOMetricGroup metrics) {
 		numBytesOut = metrics.getNumBytesOutCounter();
+		numBuffersOut = metrics.getNumBuffersOutCounter();
 	}
 
 	/**
@@ -200,6 +203,7 @@ public class RecordWriter<T extends IOReadableWritable> {
 		bufferBuilders[targetChannel] = Optional.empty();
 
 		numBytesOut.inc(bufferBuilder.finish());
+		numBuffersOut.inc();
 		serializer.clear();
 		return true;
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 3ce5866..2a7cedf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -64,6 +64,8 @@ public abstract class InputChannel {
 
 	protected final Counter numBytesIn;
 
+	protected final Counter numBuffersIn;
+
 	/** The current backoff (in ms) */
 	private int currentBackoff;
 
@@ -73,7 +75,8 @@ public abstract class InputChannel {
 			ResultPartitionID partitionId,
 			int initialBackoff,
 			int maxBackoff,
-			Counter numBytesIn) {
+			Counter numBytesIn,
+			Counter numBuffersIn) {
 
 		checkArgument(channelIndex >= 0);
 
@@ -91,6 +94,7 @@ public abstract class InputChannel {
 		this.currentBackoff = initial == 0 ? -1 : 0;
 
 		this.numBytesIn = numBytesIn;
+		this.numBuffersIn = numBuffersIn;
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index f9c75ad..4b3a8ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -84,7 +84,7 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
 		int maxBackoff,
 		TaskIOMetricGroup metrics) {
 
-		super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, metrics.getNumBytesInLocalCounter());
+		super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, metrics.getNumBytesInLocalCounter(), metrics.getNumBuffersInLocalCounter());
 
 		this.partitionManager = checkNotNull(partitionManager);
 		this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
@@ -194,6 +194,7 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
 		}
 
 		numBytesIn.inc(next.buffer().getSizeUnsafe());
+		numBuffersIn.inc();
 		return Optional.of(new BufferAndAvailability(next.buffer(), next.isMoreAvailable(), next.buffersInBacklog()));
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index b94f48a..28f3020 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -123,7 +123,7 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
 		int maxBackoff,
 		TaskIOMetricGroup metrics) {
 
-		super(inputGate, channelIndex, partitionId, initialBackOff, maxBackoff, metrics.getNumBytesInRemoteCounter());
+		super(inputGate, channelIndex, partitionId, initialBackOff, maxBackoff, metrics.getNumBytesInRemoteCounter(), metrics.getNumBuffersInRemoteCounter());
 
 		this.connectionId = checkNotNull(connectionId);
 		this.connectionManager = checkNotNull(connectionManager);
@@ -199,6 +199,7 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
 		}
 
 		numBytesIn.inc(next.getSizeUnsafe());
+		numBuffersIn.inc();
 		return Optional.of(new BufferAndAvailability(next, remaining > 0, getSenderBacklog()));
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index 1101f66..20a7aed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -61,7 +61,7 @@ class UnknownInputChannel extends InputChannel {
 			int maxBackoff,
 			TaskIOMetricGroup metrics) {
 
-		super(gate, channelIndex, partitionId, initialBackoff, maxBackoff, null);
+		super(gate, channelIndex, partitionId, initialBackoff, maxBackoff, null, null);
 
 		this.partitionManager = checkNotNull(partitionManager);
 		this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
index d15a0f1..c106c39 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
@@ -40,6 +40,14 @@ public class MetricNames {
 	public static final String IO_NUM_BYTES_IN_REMOTE_RATE = IO_NUM_BYTES_IN_REMOTE + SUFFIX_RATE;
 	public static final String IO_NUM_BYTES_OUT_RATE = IO_NUM_BYTES_OUT + SUFFIX_RATE;
 
+	public static final String IO_NUM_BUFFERS_IN = "numBuffersIn";
+	public static final String IO_NUM_BUFFERS_IN_LOCAL = IO_NUM_BUFFERS_IN + "Local";
+	public static final String IO_NUM_BUFFERS_IN_REMOTE = IO_NUM_BUFFERS_IN + "Remote";
+	public static final String IO_NUM_BUFFERS_OUT = "numBuffersOut";
+	public static final String IO_NUM_BUFFERS_IN_LOCAL_RATE = IO_NUM_BUFFERS_IN_LOCAL + SUFFIX_RATE;
+	public static final String IO_NUM_BUFFERS_IN_REMOTE_RATE = IO_NUM_BUFFERS_IN_REMOTE + SUFFIX_RATE;
+	public static final String IO_NUM_BUFFERS_OUT_RATE = IO_NUM_BUFFERS_OUT + SUFFIX_RATE;
+
 	public static final String IO_CURRENT_INPUT_WATERMARK = "currentInputWatermark";
 	public static final String IO_CURRENT_INPUT_1_WATERMARK = "currentInput1Watermark";
 	public static final String IO_CURRENT_INPUT_2_WATERMARK = "currentInput2Watermark";
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index e12ecd7..79c5e8f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -44,12 +44,18 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
 	private final Counter numBytesInRemote;
 	private final SumCounter numRecordsIn;
 	private final SumCounter numRecordsOut;
+	private final Counter numBuffersOut;
+	private final Counter numBuffersInLocal;
+	private final Counter numBuffersInRemote;
 
 	private final Meter numBytesInRateLocal;
 	private final Meter numBytesInRateRemote;
 	private final Meter numBytesOutRate;
 	private final Meter numRecordsInRate;
 	private final Meter numRecordsOutRate;
+	private final Meter numBuffersOutRate;
+	private final Meter numBuffersInRateLocal;
+	private final Meter numBuffersInRateRemote;
 
 	public TaskIOMetricGroup(TaskMetricGroup parent) {
 		super(parent);
@@ -60,10 +66,18 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
 		this.numBytesOutRate = meter(MetricNames.IO_NUM_BYTES_OUT_RATE, new MeterView(numBytesOut, 60));
 		this.numBytesInRateLocal = meter(MetricNames.IO_NUM_BYTES_IN_LOCAL_RATE, new MeterView(numBytesInLocal, 60));
 		this.numBytesInRateRemote = meter(MetricNames.IO_NUM_BYTES_IN_REMOTE_RATE, new MeterView(numBytesInRemote, 60));
+
 		this.numRecordsIn = counter(MetricNames.IO_NUM_RECORDS_IN, new SumCounter());
 		this.numRecordsOut = counter(MetricNames.IO_NUM_RECORDS_OUT, new SumCounter());
 		this.numRecordsInRate = meter(MetricNames.IO_NUM_RECORDS_IN_RATE, new MeterView(numRecordsIn, 60));
 		this.numRecordsOutRate = meter(MetricNames.IO_NUM_RECORDS_OUT_RATE, new MeterView(numRecordsOut, 60));
+
+		this.numBuffersOut = counter(MetricNames.IO_NUM_BUFFERS_OUT);
+		this.numBuffersInLocal = counter(MetricNames.IO_NUM_BUFFERS_IN_LOCAL);
+		this.numBuffersInRemote = counter(MetricNames.IO_NUM_BUFFERS_IN_REMOTE);
+		this.numBuffersOutRate = meter(MetricNames.IO_NUM_BUFFERS_OUT_RATE, new MeterView(numBuffersOut, 60));
+		this.numBuffersInRateLocal = meter(MetricNames.IO_NUM_BUFFERS_IN_LOCAL_RATE, new MeterView(numBuffersInLocal, 60));
+		this.numBuffersInRateRemote = meter(MetricNames.IO_NUM_BUFFERS_IN_REMOTE_RATE, new MeterView(numBuffersInRemote, 60));
 	}
 
 	public IOMetrics createSnapshot() {
@@ -93,6 +107,18 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
 		return numRecordsOut;
 	}
 
+	public Counter getNumBuffersOutCounter() {
+		return numBuffersOut;
+	}
+
+	public Counter getNumBuffersInLocalCounter() {
+		return numBuffersInLocal;
+	}
+
+	public Counter getNumBuffersInRemoteCounter() {
+		return numBuffersInRemote;
+	}
+
 	public Meter getNumBytesInLocalRateMeter() {
 		return numBytesInRateLocal;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
index abadddf..d757aa9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
@@ -120,7 +120,7 @@ public class InputChannelTest {
 			int initialBackoff,
 			int maxBackoff) {
 
-			super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, new SimpleCounter());
+			super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, new SimpleCounter(), new SimpleCounter());
 		}
 
 		@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
index 80e07f5..ac3f0ff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
@@ -48,7 +48,7 @@ public class TestInputChannel extends InputChannel {
 	private boolean isReleased = false;
 
 	TestInputChannel(SingleInputGate inputGate, int channelIndex) {
-		super(inputGate, channelIndex, new ResultPartitionID(), 0, 0, new SimpleCounter());
+		super(inputGate, channelIndex, new ResultPartitionID(), 0, 0, new SimpleCounter(), new SimpleCounter());
 	}
 
 	public TestInputChannel read(Buffer buffer) throws IOException, InterruptedException {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
index f23b2f5..b02be74 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
@@ -54,6 +54,9 @@ public class TaskIOMetricGroupTest {
 		taskIO.getNumBytesInLocalCounter().inc(100L);
 		taskIO.getNumBytesInRemoteCounter().inc(150L);
 		taskIO.getNumBytesOutCounter().inc(250L);
+		taskIO.getNumBuffersInLocalCounter().inc(1L);
+		taskIO.getNumBuffersInRemoteCounter().inc(2L);
+		taskIO.getNumBuffersOutCounter().inc(3L);
 
 		IOMetrics io = taskIO.createSnapshot();
 		assertEquals(32L, io.getNumRecordsIn());
@@ -61,5 +64,8 @@ public class TaskIOMetricGroupTest {
 		assertEquals(100L, io.getNumBytesInLocal());
 		assertEquals(150L, io.getNumBytesInRemote());
 		assertEquals(250L, io.getNumBytesOut());
+		assertEquals(1L, taskIO.getNumBuffersInLocalCounter().getCount());
+		assertEquals(2L, taskIO.getNumBuffersInRemoteCounter().getCount());
+		assertEquals(3L, taskIO.getNumBuffersOutCounter().getCount());
 	}
 }