You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/30 16:44:14 UTC

flink git commit: [FLINK-4923] [metrics] Expose Task's input/output buffer queue lengths and bufferPool usage as a metrics

Repository: flink
Updated Branches:
  refs/heads/master 5b54009eb -> 344fe94db


[FLINK-4923] [metrics] Expose Task's input/output buffer queue lengths and bufferPool usage as a metrics

This closes #2693


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/344fe94d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/344fe94d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/344fe94d

Branch: refs/heads/master
Commit: 344fe94db96ea44fa7714e105631eb192bb382e4
Parents: 5b54009
Author: zhuhaifengleon <zh...@gmail.com>
Authored: Wed Oct 26 15:48:31 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Oct 30 15:00:28 2016 +0100

----------------------------------------------------------------------
 .../partition/PipelinedSubpartition.java        |   5 +
 .../io/network/partition/ResultPartition.java   |  14 +++
 .../network/partition/ResultSubpartition.java   |   2 +
 .../partition/SpillableSubpartition.java        |   5 +
 .../partition/consumer/SingleInputGate.java     |  24 ++++
 .../metrics/groups/TaskIOMetricGroup.java       | 121 +++++++++++++++++++
 .../apache/flink/runtime/taskmanager/Task.java  |  11 ++
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   7 +-
 8 files changed, 183 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/344fe94d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 2d7097d..266f581 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -220,4 +220,9 @@ class PipelinedSubpartition extends ResultSubpartition {
 			throw new IllegalStateException("Already registered listener.");
 		}
 	}
+
+	@Override
+	public int getNumberOfQueuedBuffers() {
+			return buffers.size();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/344fe94d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 5bbfab1..f06cb43 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -220,6 +220,10 @@ public class ResultPartition implements BufferPoolOwner {
 		return bufferPool;
 	}
 
+	public BufferPool getBufferPool() {
+		return bufferPool;
+	}
+
 	public int getTotalNumberOfBuffers() {
 		return totalNumberOfBuffers;
 	}
@@ -228,6 +232,16 @@ public class ResultPartition implements BufferPoolOwner {
 		return totalNumberOfBytes;
 	}
 
+	public int getNumberOfQueuedBuffers() {
+		int totalBuffers = 0;
+
+		for (ResultSubpartition subpartition : subpartitions) {
+			totalBuffers += subpartition.getNumberOfQueuedBuffers();
+		}
+
+		return totalBuffers;
+	}
+
 	// ------------------------------------------------------------------------
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/344fe94d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
index b7ca9c4..31c8f73 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -83,4 +83,6 @@ public abstract class ResultSubpartition {
 
 	abstract public boolean isReleased();
 
+	abstract public int getNumberOfQueuedBuffers();
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/344fe94d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 3e4692a..3f19559 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -227,4 +227,9 @@ class SpillableSubpartition extends ResultSubpartition {
 				getTotalNumberOfBuffers(), getTotalNumberOfBytes(), isFinished, readView != null,
 				spillWriter != null);
 	}
+
+	@Override
+	public int getNumberOfQueuedBuffers() {
+			return buffers.size();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/344fe94d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index f4e4325..af5fd89 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -212,6 +212,10 @@ public class SingleInputGate implements InputGate {
 		return bufferPool;
 	}
 
+	public BufferPool getBufferPool() {
+		return bufferPool;
+	}
+
 	@Override
 	public int getPageSize() {
 		if (bufferPool != null) {
@@ -222,6 +226,26 @@ public class SingleInputGate implements InputGate {
 		}
 	}
 
+	public int getNumberOfQueuedBuffers() {
+		// re-try 3 times, if fails, return 0 for "unknown"
+		for (int retry = 0; retry < 3; retry++) {
+			try {
+				int totalBuffers = 0;
+
+				for (InputChannel channel : inputChannels.values()) {
+					if (channel instanceof RemoteInputChannel) {
+						totalBuffers += ((RemoteInputChannel) channel).getNumberOfQueuedBuffers();
+					}
+				}
+
+				return  totalBuffers;
+			}
+			catch (Exception ignored) {}
+		}
+
+		return 0;
+	}
+
 	// ------------------------------------------------------------------------
 	// Setup/Life-cycle
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/344fe94d/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index ab7ceb2..b2884ec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -19,8 +19,13 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.taskmanager.Task;
 
 /**
  * Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is
@@ -36,6 +41,8 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
 	private final Meter numBytesInRateRemote;
 	private final Meter numBytesOutRate;
 
+	private final MetricGroup buffers;
+
 	public TaskIOMetricGroup(TaskMetricGroup parent) {
 		super(parent);
 
@@ -45,6 +52,8 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
 		this.numBytesOutRate = meter("numBytesOutPerSecond", new MeterView(numBytesOut, 60));
 		this.numBytesInRateLocal = meter("numBytesInLocalPerSecond", new MeterView(numBytesInLocal, 60));
 		this.numBytesInRateRemote = meter("numBytesInRemotePerSecond", new MeterView(numBytesInRemote, 60));
+
+		this.buffers = addGroup("buffers");
 	}
 
 	public Counter getNumBytesOutCounter() {
@@ -70,4 +79,116 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
 	public Meter getNumBytesOutRateMeter() {
 		return numBytesOutRate;
 	}
+
+	public MetricGroup getBuffersGroup() {
+		return buffers;
+	}
+
+	// ------------------------------------------------------------------------
+	//  metrics of Buffers group
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Input received buffers gauge of a task
+	 */
+	public static final class InputBuffersGauge implements Gauge<Integer> {
+
+		private final Task task;
+
+		public InputBuffersGauge(Task task) {
+			this.task = task;
+		}
+
+		@Override
+		public Integer getValue() {
+			int totalBuffers = 0;
+
+			for (SingleInputGate inputGate : task.getAllInputGates()) {
+				totalBuffers += inputGate.getNumberOfQueuedBuffers();
+			}
+
+			return totalBuffers;
+		}
+	}
+
+	/**
+	 * Output produced buffers gauge of a task
+	 */
+	public static final class OutputBuffersGauge implements Gauge<Integer> {
+
+		private final Task task;
+
+		public OutputBuffersGauge(Task task) {
+			this.task = task;
+		}
+
+		@Override
+		public Integer getValue() {
+			int totalBuffers = 0;
+
+			for (ResultPartition producedPartition : task.getProducedPartitions()) {
+				totalBuffers += producedPartition.getNumberOfQueuedBuffers();
+			}
+
+			return totalBuffers;
+		}
+	}
+
+	/**
+	 * Input buffer pool usage gauge of a task
+	 */
+	public static final class InputBufferPoolUsageGauge implements Gauge<Float> {
+
+		private final Task task;
+
+		public InputBufferPoolUsageGauge(Task task) {
+			this.task = task;
+		}
+
+		@Override
+		public Float getValue() {
+			int availableBuffers = 0;
+			int bufferPoolSize = 0;
+
+			for (SingleInputGate inputGate : task.getAllInputGates()) {
+				availableBuffers += inputGate.getBufferPool().getNumberOfAvailableMemorySegments();
+				bufferPoolSize += inputGate.getBufferPool().getNumBuffers();
+			}
+
+			if (bufferPoolSize != 0) {
+				return ((float)(bufferPoolSize - availableBuffers)) / bufferPoolSize;
+			} else {
+				return 0.0f;
+			}
+		}
+	}
+
+	/**
+	 * Output buffer pool usage gauge of a task
+	 */
+	public static final class OutputBufferPoolUsageGauge implements Gauge<Float> {
+
+		private final Task task;
+
+		public OutputBufferPoolUsageGauge(Task task) {
+			this.task = task;
+		}
+
+		@Override
+		public Float getValue() {
+			int availableBuffers = 0;
+			int bufferPoolSize = 0;
+
+			for (ResultPartition resultPartition : task.getProducedPartitions()) {
+				availableBuffers += resultPartition.getBufferPool().getNumberOfAvailableMemorySegments();
+				bufferPoolSize += resultPartition.getBufferPool().getNumBuffers();
+			}
+
+			if (bufferPoolSize != 0) {
+				return ((float)(bufferPoolSize - availableBuffers)) / bufferPoolSize;
+			} else {
+				return 0.0f;
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/344fe94d/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index f09e88a..7ce9b0b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -57,6 +58,7 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.TaskStateHandles;
@@ -353,6 +355,15 @@ public class Task implements Runnable, TaskActions {
 
 		// finally, create the executing thread, but do not start it
 		executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);
+
+		// add metrics for buffers
+		if (this.metrics != null && this.metrics.getIOMetricGroup() != null) {
+			MetricGroup bufferMetrics = this.metrics.getIOMetricGroup().getBuffersGroup();
+			bufferMetrics.gauge("inputQueueLength", new TaskIOMetricGroup.InputBuffersGauge(this));
+			bufferMetrics.gauge("outputQueueLength", new TaskIOMetricGroup.OutputBuffersGauge(this));
+			bufferMetrics.gauge("inPoolUsage", new TaskIOMetricGroup.InputBufferPoolUsageGauge(this));
+			bufferMetrics.gauge("outPoolUsage", new TaskIOMetricGroup.OutputBufferPoolUsageGauge(this));
+		}
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/344fe94d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 7dd67ed..ed107c7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -44,19 +44,14 @@ import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.util.SerializedValue;
+
 import org.junit.Before;
 import org.junit.Test;
 
 import java.net.URL;
-import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
 import java.util.concurrent.Executor;
 
 import static org.junit.Assert.assertFalse;