You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/10/28 09:11:24 UTC
flink git commit: [FLINK-4925] [metrics] Integrate meters into
IOMetricGroups
Repository: flink
Updated Branches:
refs/heads/master 3bc9cad04 -> a582882da
[FLINK-4925] [metrics] Integrate meters into IOMetricGroups
This closes #2694.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a582882d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a582882d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a582882d
Branch: refs/heads/master
Commit: a582882dae125da2c4cef249428371c37fd64c53
Parents: 3bc9cad
Author: zentol <ch...@apache.org>
Authored: Wed Oct 26 13:14:03 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Oct 28 11:11:00 2016 +0200
----------------------------------------------------------------------
.../metrics/groups/OperatorIOMetricGroup.java | 15 ++++++++++++++
.../metrics/groups/TaskIOMetricGroup.java | 21 ++++++++++++++++++++
2 files changed, 36 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a582882d/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
index 8a69029..32611fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
/**
* Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is
@@ -28,10 +30,15 @@ public class OperatorIOMetricGroup extends ProxyMetricGroup<OperatorMetricGroup>
private final Counter numRecordsIn;
private final Counter numRecordsOut;
+ private final Meter numRecordsInRate;
+ private final Meter numRecordsOutRate;
+
public OperatorIOMetricGroup(OperatorMetricGroup parentMetricGroup) {
super(parentMetricGroup);
numRecordsIn = parentMetricGroup.counter("numRecordsIn");
numRecordsOut = parentMetricGroup.counter("numRecordsOut");
+ numRecordsInRate = parentMetricGroup.meter("numRecordsInPerSecond", new MeterView(numRecordsIn, 60));
+ numRecordsOutRate = parentMetricGroup.meter("numRecordsOutPerSecond", new MeterView(numRecordsOut, 60));
}
public Counter getNumRecordsInCounter() {
@@ -41,4 +48,12 @@ public class OperatorIOMetricGroup extends ProxyMetricGroup<OperatorMetricGroup>
public Counter getNumRecordsOutCounter() {
return numRecordsOut;
}
+
+ public Meter getNumRecordsInRateMeter() {
+ return numRecordsInRate;
+ }
+
+ public Meter getNumRecordsOutRate() {
+ return numRecordsOutRate;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a582882d/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index a726c26..ab7ceb2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -19,6 +19,8 @@
package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
/**
* Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is
@@ -30,12 +32,19 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
private final Counter numBytesInLocal;
private final Counter numBytesInRemote;
+ private final Meter numBytesInRateLocal;
+ private final Meter numBytesInRateRemote;
+ private final Meter numBytesOutRate;
+
public TaskIOMetricGroup(TaskMetricGroup parent) {
super(parent);
this.numBytesOut = counter("numBytesOut");
this.numBytesInLocal = counter("numBytesInLocal");
this.numBytesInRemote = counter("numBytesInRemote");
+ this.numBytesOutRate = meter("numBytesOutPerSecond", new MeterView(numBytesOut, 60));
+ this.numBytesInRateLocal = meter("numBytesInLocalPerSecond", new MeterView(numBytesInLocal, 60));
+ this.numBytesInRateRemote = meter("numBytesInRemotePerSecond", new MeterView(numBytesInRemote, 60));
}
public Counter getNumBytesOutCounter() {
@@ -49,4 +58,16 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
public Counter getNumBytesInRemoteCounter() {
return numBytesInRemote;
}
+
+ public Meter getNumBytesInRateLocalMeter() {
+ return numBytesInRateLocal;
+ }
+
+ public Meter getNumBytesInRateRemoteMeter() {
+ return numBytesInRateRemote;
+ }
+
+ public Meter getNumBytesOutRateMeter() {
+ return numBytesOutRate;
+ }
}