You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/10/28 09:11:24 UTC

flink git commit: [FLINK-4925] [metrics] Integrate meters into IOMetricGroups

Repository: flink
Updated Branches:
  refs/heads/master 3bc9cad04 -> a582882da


[FLINK-4925] [metrics] Integrate meters into IOMetricGroups

This closes #2694.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a582882d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a582882d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a582882d

Branch: refs/heads/master
Commit: a582882dae125da2c4cef249428371c37fd64c53
Parents: 3bc9cad
Author: zentol <ch...@apache.org>
Authored: Wed Oct 26 13:14:03 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Oct 28 11:11:00 2016 +0200

----------------------------------------------------------------------
 .../metrics/groups/OperatorIOMetricGroup.java   | 15 ++++++++++++++
 .../metrics/groups/TaskIOMetricGroup.java       | 21 ++++++++++++++++++++
 2 files changed, 36 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a582882d/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
index 8a69029..32611fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
@@ -18,6 +18,8 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
 
 /**
  * Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is
@@ -28,10 +30,15 @@ public class OperatorIOMetricGroup extends ProxyMetricGroup<OperatorMetricGroup>
 	private final Counter numRecordsIn;
 	private final Counter numRecordsOut;
 
+	private final Meter numRecordsInRate;
+	private final Meter numRecordsOutRate;
+
 	public OperatorIOMetricGroup(OperatorMetricGroup parentMetricGroup) {
 		super(parentMetricGroup);
 		numRecordsIn = parentMetricGroup.counter("numRecordsIn");
 		numRecordsOut = parentMetricGroup.counter("numRecordsOut");
+		numRecordsInRate = parentMetricGroup.meter("numRecordsInPerSecond", new MeterView(numRecordsIn, 60));
+		numRecordsOutRate = parentMetricGroup.meter("numRecordsOutPerSecond", new MeterView(numRecordsOut, 60));
 	}
 
 	public Counter getNumRecordsInCounter() {
@@ -41,4 +48,12 @@ public class OperatorIOMetricGroup extends ProxyMetricGroup<OperatorMetricGroup>
 	public Counter getNumRecordsOutCounter() {
 		return numRecordsOut;
 	}
+
+	public Meter getNumRecordsInRateMeter() {
+		return numRecordsInRate;
+	}
+
+	public Meter getNumRecordsOutRate() {
+		return numRecordsOutRate;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a582882d/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index a726c26..ab7ceb2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
 
 /**
  * Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is
@@ -30,12 +32,19 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
 	private final Counter numBytesInLocal;
 	private final Counter numBytesInRemote;
 
+	private final Meter numBytesInRateLocal;
+	private final Meter numBytesInRateRemote;
+	private final Meter numBytesOutRate;
+
 	public TaskIOMetricGroup(TaskMetricGroup parent) {
 		super(parent);
 
 		this.numBytesOut = counter("numBytesOut");
 		this.numBytesInLocal = counter("numBytesInLocal");
 		this.numBytesInRemote = counter("numBytesInRemote");
+		this.numBytesOutRate = meter("numBytesOutPerSecond", new MeterView(numBytesOut, 60));
+		this.numBytesInRateLocal = meter("numBytesInLocalPerSecond", new MeterView(numBytesInLocal, 60));
+		this.numBytesInRateRemote = meter("numBytesInRemotePerSecond", new MeterView(numBytesInRemote, 60));
 	}
 
 	public Counter getNumBytesOutCounter() {
@@ -49,4 +58,16 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
 	public Counter getNumBytesInRemoteCounter() {
 		return numBytesInRemote;
 	}
+
+	public Meter getNumBytesInRateLocalMeter() {
+		return numBytesInRateLocal;
+	}
+
+	public Meter getNumBytesInRateRemoteMeter() {
+		return numBytesInRateRemote;
+	}
+
+	public Meter getNumBytesOutRateMeter() {
+		return numBytesOutRate;
+	}
 }