You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/08/14 19:51:00 UTC

[GitHub] NicoK closed pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers

NicoK closed pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers
URL: https://github.com/apache/flink/pull/6551
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index bf0aee1ed42..bac40f00066 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -1217,6 +1217,27 @@ Thus, in order to infer the metric identifier:
       <td>The number of bytes this task reads from a remote source per second.</td>
       <td>Meter</td>
     </tr>
+    <tr>
+      <th rowspan="6"><strong>Task</strong></th>
+      <td>numBuffersInLocal</td>
+      <td>The total number of network buffers this task has read from a local source.</td>
+      <td>Counter</td>
+    </tr>
+    <tr>
+      <td>numBuffersInLocalPerSecond</td>
+      <td>The number of network buffers this task reads from a local source per second.</td>
+      <td>Meter</td>
+    </tr>
+    <tr>
+      <td>numBuffersInRemote</td>
+      <td>The total number of network buffers this task has read from a remote source.</td>
+      <td>Counter</td>
+    </tr>
+    <tr>
+      <td>numBuffersInRemotePerSecond</td>
+      <td>The number of network buffers this task reads from a remote source per second.</td>
+      <td>Meter</td>
+    </tr>
     <tr>
       <td>numBytesOut</td>
       <td>The total number of bytes this task has emitted.</td>
@@ -1227,6 +1248,16 @@ Thus, in order to infer the metric identifier:
       <td>The number of bytes this task emits per second.</td>
       <td>Meter</td>
     </tr>
+    <tr>
+      <td>numBuffersOut</td>
+      <td>The total number of network buffers this task has emitted.</td>
+      <td>Counter</td>
+    </tr>
+    <tr>
+      <td>numBuffersOutPerSecond</td>
+      <td>The number of network buffers this task emits per second.</td>
+      <td>Meter</td>
+    </tr>
     <tr>
       <th rowspan="6"><strong>Task/Operator</strong></th>
       <td>numRecordsIn</td>
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index e3a8e49316f..970795c0564 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -71,6 +71,8 @@
 
 	private Counter numBytesOut = new SimpleCounter();
 
+	private Counter numBuffersOut = new SimpleCounter();
+
 	public RecordWriter(ResultPartitionWriter writer) {
 		this(writer, new RoundRobinChannelSelector<T>());
 	}
@@ -184,6 +186,7 @@ public void clearBuffers() {
      */
 	public void setMetricGroup(TaskIOMetricGroup metrics) {
 		numBytesOut = metrics.getNumBytesOutCounter();
+		numBuffersOut = metrics.getNumBuffersOutCounter();
 	}
 
 	/**
@@ -200,6 +203,7 @@ private boolean tryFinishCurrentBufferBuilder(int targetChannel, RecordSerialize
 		bufferBuilders[targetChannel] = Optional.empty();
 
 		numBytesOut.inc(bufferBuilder.finish());
+		numBuffersOut.inc();
 		serializer.clear();
 		return true;
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 3ce58661281..2a7cedf6949 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -64,6 +64,8 @@
 
 	protected final Counter numBytesIn;
 
+	protected final Counter numBuffersIn;
+
 	/** The current backoff (in ms) */
 	private int currentBackoff;
 
@@ -73,7 +75,8 @@ protected InputChannel(
 			ResultPartitionID partitionId,
 			int initialBackoff,
 			int maxBackoff,
-			Counter numBytesIn) {
+			Counter numBytesIn,
+			Counter numBuffersIn) {
 
 		checkArgument(channelIndex >= 0);
 
@@ -91,6 +94,7 @@ protected InputChannel(
 		this.currentBackoff = initial == 0 ? -1 : 0;
 
 		this.numBytesIn = numBytesIn;
+		this.numBuffersIn = numBuffersIn;
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index f9c75addd51..4b3a8ff9773 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -84,7 +84,7 @@ public LocalInputChannel(
 		int maxBackoff,
 		TaskIOMetricGroup metrics) {
 
-		super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, metrics.getNumBytesInLocalCounter());
+		super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, metrics.getNumBytesInLocalCounter(), metrics.getNumBuffersInLocalCounter());
 
 		this.partitionManager = checkNotNull(partitionManager);
 		this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
@@ -194,6 +194,7 @@ public void run() {
 		}
 
 		numBytesIn.inc(next.buffer().getSizeUnsafe());
+		numBuffersIn.inc();
 		return Optional.of(new BufferAndAvailability(next.buffer(), next.isMoreAvailable(), next.buffersInBacklog()));
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index b94f48a3a79..28f30209892 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -123,7 +123,7 @@ public RemoteInputChannel(
 		int maxBackoff,
 		TaskIOMetricGroup metrics) {
 
-		super(inputGate, channelIndex, partitionId, initialBackOff, maxBackoff, metrics.getNumBytesInRemoteCounter());
+		super(inputGate, channelIndex, partitionId, initialBackOff, maxBackoff, metrics.getNumBytesInRemoteCounter(), metrics.getNumBuffersInRemoteCounter());
 
 		this.connectionId = checkNotNull(connectionId);
 		this.connectionManager = checkNotNull(connectionManager);
@@ -199,6 +199,7 @@ void retriggerSubpartitionRequest(int subpartitionIndex) throws IOException, Int
 		}
 
 		numBytesIn.inc(next.getSizeUnsafe());
+		numBuffersIn.inc();
 		return Optional.of(new BufferAndAvailability(next, remaining > 0, getSenderBacklog()));
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index 1101f666c01..20a7aed76e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -61,7 +61,7 @@ public UnknownInputChannel(
 			int maxBackoff,
 			TaskIOMetricGroup metrics) {
 
-		super(gate, channelIndex, partitionId, initialBackoff, maxBackoff, null);
+		super(gate, channelIndex, partitionId, initialBackoff, maxBackoff, null, null);
 
 		this.partitionManager = checkNotNull(partitionManager);
 		this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
index d15a0f1e887..c106c391e46 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
@@ -40,6 +40,14 @@ private MetricNames() {
 	public static final String IO_NUM_BYTES_IN_REMOTE_RATE = IO_NUM_BYTES_IN_REMOTE + SUFFIX_RATE;
 	public static final String IO_NUM_BYTES_OUT_RATE = IO_NUM_BYTES_OUT + SUFFIX_RATE;
 
+	public static final String IO_NUM_BUFFERS_IN = "numBuffersIn";
+	public static final String IO_NUM_BUFFERS_IN_LOCAL = IO_NUM_BUFFERS_IN + "Local";
+	public static final String IO_NUM_BUFFERS_IN_REMOTE = IO_NUM_BUFFERS_IN + "Remote";
+	public static final String IO_NUM_BUFFERS_OUT = "numBuffersOut";
+	public static final String IO_NUM_BUFFERS_IN_LOCAL_RATE = IO_NUM_BUFFERS_IN_LOCAL + SUFFIX_RATE;
+	public static final String IO_NUM_BUFFERS_IN_REMOTE_RATE = IO_NUM_BUFFERS_IN_REMOTE + SUFFIX_RATE;
+	public static final String IO_NUM_BUFFERS_OUT_RATE = IO_NUM_BUFFERS_OUT + SUFFIX_RATE;
+
 	public static final String IO_CURRENT_INPUT_WATERMARK = "currentInputWatermark";
 	public static final String IO_CURRENT_INPUT_1_WATERMARK = "currentInput1Watermark";
 	public static final String IO_CURRENT_INPUT_2_WATERMARK = "currentInput2Watermark";
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index e12ecd7d25c..79c5e8f41c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -44,12 +44,18 @@
 	private final Counter numBytesInRemote;
 	private final SumCounter numRecordsIn;
 	private final SumCounter numRecordsOut;
+	private final Counter numBuffersOut;
+	private final Counter numBuffersInLocal;
+	private final Counter numBuffersInRemote;
 
 	private final Meter numBytesInRateLocal;
 	private final Meter numBytesInRateRemote;
 	private final Meter numBytesOutRate;
 	private final Meter numRecordsInRate;
 	private final Meter numRecordsOutRate;
+	private final Meter numBuffersOutRate;
+	private final Meter numBuffersInRateLocal;
+	private final Meter numBuffersInRateRemote;
 
 	public TaskIOMetricGroup(TaskMetricGroup parent) {
 		super(parent);
@@ -60,10 +66,18 @@ public TaskIOMetricGroup(TaskMetricGroup parent) {
 		this.numBytesOutRate = meter(MetricNames.IO_NUM_BYTES_OUT_RATE, new MeterView(numBytesOut, 60));
 		this.numBytesInRateLocal = meter(MetricNames.IO_NUM_BYTES_IN_LOCAL_RATE, new MeterView(numBytesInLocal, 60));
 		this.numBytesInRateRemote = meter(MetricNames.IO_NUM_BYTES_IN_REMOTE_RATE, new MeterView(numBytesInRemote, 60));
+
 		this.numRecordsIn = counter(MetricNames.IO_NUM_RECORDS_IN, new SumCounter());
 		this.numRecordsOut = counter(MetricNames.IO_NUM_RECORDS_OUT, new SumCounter());
 		this.numRecordsInRate = meter(MetricNames.IO_NUM_RECORDS_IN_RATE, new MeterView(numRecordsIn, 60));
 		this.numRecordsOutRate = meter(MetricNames.IO_NUM_RECORDS_OUT_RATE, new MeterView(numRecordsOut, 60));
+
+		this.numBuffersOut = counter(MetricNames.IO_NUM_BUFFERS_OUT);
+		this.numBuffersInLocal = counter(MetricNames.IO_NUM_BUFFERS_IN_LOCAL);
+		this.numBuffersInRemote = counter(MetricNames.IO_NUM_BUFFERS_IN_REMOTE);
+		this.numBuffersOutRate = meter(MetricNames.IO_NUM_BUFFERS_OUT_RATE, new MeterView(numBuffersOut, 60));
+		this.numBuffersInRateLocal = meter(MetricNames.IO_NUM_BUFFERS_IN_LOCAL_RATE, new MeterView(numBuffersInLocal, 60));
+		this.numBuffersInRateRemote = meter(MetricNames.IO_NUM_BUFFERS_IN_REMOTE_RATE, new MeterView(numBuffersInRemote, 60));
 	}
 
 	public IOMetrics createSnapshot() {
@@ -93,6 +107,18 @@ public Counter getNumRecordsOutCounter() {
 		return numRecordsOut;
 	}
 
+	public Counter getNumBuffersOutCounter() {
+		return numBuffersOut;
+	}
+
+	public Counter getNumBuffersInLocalCounter() {
+		return numBuffersInLocal;
+	}
+
+	public Counter getNumBuffersInRemoteCounter() {
+		return numBuffersInRemote;
+	}
+
 	public Meter getNumBytesInLocalRateMeter() {
 		return numBytesInRateLocal;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
index abadddf6d57..d757aa9c0f5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
@@ -120,7 +120,7 @@ private MockInputChannel(
 			int initialBackoff,
 			int maxBackoff) {
 
-			super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, new SimpleCounter());
+			super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, new SimpleCounter(), new SimpleCounter());
 		}
 
 		@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
index 80e07f51996..ac3f0ff8cf6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
@@ -48,7 +48,7 @@
 	private boolean isReleased = false;
 
 	TestInputChannel(SingleInputGate inputGate, int channelIndex) {
-		super(inputGate, channelIndex, new ResultPartitionID(), 0, 0, new SimpleCounter());
+		super(inputGate, channelIndex, new ResultPartitionID(), 0, 0, new SimpleCounter(), new SimpleCounter());
 	}
 
 	public TestInputChannel read(Buffer buffer) throws IOException, InterruptedException {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
index f23b2f5def0..b02be747658 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
@@ -54,6 +54,9 @@ public void testTaskIOMetricGroup() {
 		taskIO.getNumBytesInLocalCounter().inc(100L);
 		taskIO.getNumBytesInRemoteCounter().inc(150L);
 		taskIO.getNumBytesOutCounter().inc(250L);
+		taskIO.getNumBuffersInLocalCounter().inc(1L);
+		taskIO.getNumBuffersInRemoteCounter().inc(2L);
+		taskIO.getNumBuffersOutCounter().inc(3L);
 
 		IOMetrics io = taskIO.createSnapshot();
 		assertEquals(32L, io.getNumRecordsIn());
@@ -61,5 +64,8 @@ public void testTaskIOMetricGroup() {
 		assertEquals(100L, io.getNumBytesInLocal());
 		assertEquals(150L, io.getNumBytesInRemote());
 		assertEquals(250L, io.getNumBytesOut());
+		assertEquals(1L, taskIO.getNumBuffersInLocalCounter().getCount());
+		assertEquals(2L, taskIO.getNumBuffersInRemoteCounter().getCount());
+		assertEquals(3L, taskIO.getNumBuffersOutCounter().getCount());
 	}
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services