You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/30 16:44:14 UTC
flink git commit: [FLINK-4923] [metrics] Expose Task's input/output
buffer queue lengths and bufferPool usage as a metrics
Repository: flink
Updated Branches:
refs/heads/master 5b54009eb -> 344fe94db
[FLINK-4923] [metrics] Expose Task's input/output buffer queue lengths and bufferPool usage as a metrics
This closes #2693
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/344fe94d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/344fe94d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/344fe94d
Branch: refs/heads/master
Commit: 344fe94db96ea44fa7714e105631eb192bb382e4
Parents: 5b54009
Author: zhuhaifengleon <zh...@gmail.com>
Authored: Wed Oct 26 15:48:31 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Oct 30 15:00:28 2016 +0100
----------------------------------------------------------------------
.../partition/PipelinedSubpartition.java | 5 +
.../io/network/partition/ResultPartition.java | 14 +++
.../network/partition/ResultSubpartition.java | 2 +
.../partition/SpillableSubpartition.java | 5 +
.../partition/consumer/SingleInputGate.java | 24 ++++
.../metrics/groups/TaskIOMetricGroup.java | 121 +++++++++++++++++++
.../apache/flink/runtime/taskmanager/Task.java | 11 ++
.../runtime/taskmanager/TaskAsyncCallTest.java | 7 +-
8 files changed, 183 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/344fe94d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 2d7097d..266f581 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -220,4 +220,9 @@ class PipelinedSubpartition extends ResultSubpartition {
throw new IllegalStateException("Already registered listener.");
}
}
+
+ @Override
+ public int getNumberOfQueuedBuffers() {
+ return buffers.size();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/344fe94d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 5bbfab1..f06cb43 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -220,6 +220,10 @@ public class ResultPartition implements BufferPoolOwner {
return bufferPool;
}
+ public BufferPool getBufferPool() {
+ return bufferPool;
+ }
+
public int getTotalNumberOfBuffers() {
return totalNumberOfBuffers;
}
@@ -228,6 +232,16 @@ public class ResultPartition implements BufferPoolOwner {
return totalNumberOfBytes;
}
+ public int getNumberOfQueuedBuffers() {
+ int totalBuffers = 0;
+
+ for (ResultSubpartition subpartition : subpartitions) {
+ totalBuffers += subpartition.getNumberOfQueuedBuffers();
+ }
+
+ return totalBuffers;
+ }
+
// ------------------------------------------------------------------------
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/344fe94d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
index b7ca9c4..31c8f73 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -83,4 +83,6 @@ public abstract class ResultSubpartition {
abstract public boolean isReleased();
+ abstract public int getNumberOfQueuedBuffers();
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/344fe94d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 3e4692a..3f19559 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -227,4 +227,9 @@ class SpillableSubpartition extends ResultSubpartition {
getTotalNumberOfBuffers(), getTotalNumberOfBytes(), isFinished, readView != null,
spillWriter != null);
}
+
+ @Override
+ public int getNumberOfQueuedBuffers() {
+ return buffers.size();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/344fe94d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index f4e4325..af5fd89 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -212,6 +212,10 @@ public class SingleInputGate implements InputGate {
return bufferPool;
}
+ public BufferPool getBufferPool() {
+ return bufferPool;
+ }
+
@Override
public int getPageSize() {
if (bufferPool != null) {
@@ -222,6 +226,26 @@ public class SingleInputGate implements InputGate {
}
}
+ public int getNumberOfQueuedBuffers() {
+ // re-try 3 times, if fails, return 0 for "unknown"
+ for (int retry = 0; retry < 3; retry++) {
+ try {
+ int totalBuffers = 0;
+
+ for (InputChannel channel : inputChannels.values()) {
+ if (channel instanceof RemoteInputChannel) {
+ totalBuffers += ((RemoteInputChannel) channel).getNumberOfQueuedBuffers();
+ }
+ }
+
+ return totalBuffers;
+ }
+ catch (Exception ignored) {}
+ }
+
+ return 0;
+ }
+
// ------------------------------------------------------------------------
// Setup/Life-cycle
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/344fe94d/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index ab7ceb2..b2884ec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -19,8 +19,13 @@
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.taskmanager.Task;
/**
* Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is
@@ -36,6 +41,8 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
private final Meter numBytesInRateRemote;
private final Meter numBytesOutRate;
+ private final MetricGroup buffers;
+
public TaskIOMetricGroup(TaskMetricGroup parent) {
super(parent);
@@ -45,6 +52,8 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
this.numBytesOutRate = meter("numBytesOutPerSecond", new MeterView(numBytesOut, 60));
this.numBytesInRateLocal = meter("numBytesInLocalPerSecond", new MeterView(numBytesInLocal, 60));
this.numBytesInRateRemote = meter("numBytesInRemotePerSecond", new MeterView(numBytesInRemote, 60));
+
+ this.buffers = addGroup("buffers");
}
public Counter getNumBytesOutCounter() {
@@ -70,4 +79,116 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
public Meter getNumBytesOutRateMeter() {
return numBytesOutRate;
}
+
+ public MetricGroup getBuffersGroup() {
+ return buffers;
+ }
+
+ // ------------------------------------------------------------------------
+ // metrics of Buffers group
+ // ------------------------------------------------------------------------
+
+ /**
+ * Input received buffers gauge of a task
+ */
+ public static final class InputBuffersGauge implements Gauge<Integer> {
+
+ private final Task task;
+
+ public InputBuffersGauge(Task task) {
+ this.task = task;
+ }
+
+ @Override
+ public Integer getValue() {
+ int totalBuffers = 0;
+
+ for (SingleInputGate inputGate : task.getAllInputGates()) {
+ totalBuffers += inputGate.getNumberOfQueuedBuffers();
+ }
+
+ return totalBuffers;
+ }
+ }
+
+ /**
+ * Output produced buffers gauge of a task
+ */
+ public static final class OutputBuffersGauge implements Gauge<Integer> {
+
+ private final Task task;
+
+ public OutputBuffersGauge(Task task) {
+ this.task = task;
+ }
+
+ @Override
+ public Integer getValue() {
+ int totalBuffers = 0;
+
+ for (ResultPartition producedPartition : task.getProducedPartitions()) {
+ totalBuffers += producedPartition.getNumberOfQueuedBuffers();
+ }
+
+ return totalBuffers;
+ }
+ }
+
+ /**
+ * Input buffer pool usage gauge of a task
+ */
+ public static final class InputBufferPoolUsageGauge implements Gauge<Float> {
+
+ private final Task task;
+
+ public InputBufferPoolUsageGauge(Task task) {
+ this.task = task;
+ }
+
+ @Override
+ public Float getValue() {
+ int availableBuffers = 0;
+ int bufferPoolSize = 0;
+
+ for (SingleInputGate inputGate : task.getAllInputGates()) {
+ availableBuffers += inputGate.getBufferPool().getNumberOfAvailableMemorySegments();
+ bufferPoolSize += inputGate.getBufferPool().getNumBuffers();
+ }
+
+ if (bufferPoolSize != 0) {
+ return ((float)(bufferPoolSize - availableBuffers)) / bufferPoolSize;
+ } else {
+ return 0.0f;
+ }
+ }
+ }
+
+ /**
+ * Output buffer pool usage gauge of a task
+ */
+ public static final class OutputBufferPoolUsageGauge implements Gauge<Float> {
+
+ private final Task task;
+
+ public OutputBufferPoolUsageGauge(Task task) {
+ this.task = task;
+ }
+
+ @Override
+ public Float getValue() {
+ int availableBuffers = 0;
+ int bufferPoolSize = 0;
+
+ for (ResultPartition resultPartition : task.getProducedPartitions()) {
+ availableBuffers += resultPartition.getBufferPool().getNumberOfAvailableMemorySegments();
+ bufferPoolSize += resultPartition.getBufferPool().getNumBuffers();
+ }
+
+ if (bufferPoolSize != 0) {
+ return ((float)(bufferPoolSize - availableBuffers)) / bufferPoolSize;
+ } else {
+ return 0.0f;
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/344fe94d/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index f09e88a..7ce9b0b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -57,6 +58,7 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.TaskStateHandles;
@@ -353,6 +355,15 @@ public class Task implements Runnable, TaskActions {
// finally, create the executing thread, but do not start it
executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);
+
+ // add metrics for buffers
+ if (this.metrics != null && this.metrics.getIOMetricGroup() != null) {
+ MetricGroup bufferMetrics = this.metrics.getIOMetricGroup().getBuffersGroup();
+ bufferMetrics.gauge("inputQueueLength", new TaskIOMetricGroup.InputBuffersGauge(this));
+ bufferMetrics.gauge("outputQueueLength", new TaskIOMetricGroup.OutputBuffersGauge(this));
+ bufferMetrics.gauge("inPoolUsage", new TaskIOMetricGroup.InputBufferPoolUsageGauge(this));
+ bufferMetrics.gauge("outPoolUsage", new TaskIOMetricGroup.OutputBufferPoolUsageGauge(this));
+ }
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/344fe94d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 7dd67ed..ed107c7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -44,19 +44,14 @@ import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.util.SerializedValue;
+
import org.junit.Before;
import org.junit.Test;
import java.net.URL;
-import java.util.Collection;
import java.util.Collections;
-import java.util.List;
import java.util.concurrent.Executor;
import static org.junit.Assert.assertFalse;