You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gu...@apache.org on 2022/07/01 03:35:30 UTC
[flink] 02/03: [FLINK-28308] Introduce metrics of the accumulated time that a running task is busy / idle / back-pressured
This is an automated email from the ASF dual-hosted git repository.
guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit bc2a86e5bb5f048fb5e5007f916405773a88b5cc
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Mon May 23 16:50:37 2022 +0800
[FLINK-28308] Introduce metrics of the accumulated time that a running task is busy / idle / back-pressured
This closes #20110.
---
.../shortcodes/generated/rest_v1_dispatcher.html | 45 ++++++++++++++++++++++
docs/static/generated/rest_v1_dispatcher.yml | 9 +++++
.../src/test/resources/rest_api_v1.snapshot | 45 ++++++++++++++++++++++
.../flink/runtime/executiongraph/IOMetrics.java | 37 +++++++++++++++++-
.../apache/flink/runtime/metrics/MetricNames.java | 3 ++
.../apache/flink/runtime/metrics/TimerGauge.java | 9 +++++
.../runtime/metrics/groups/TaskIOMetricGroup.java | 39 ++++++++++++++++++-
.../rest/handler/job/JobDetailsHandler.java | 5 ++-
.../handler/job/JobVertexTaskManagersHandler.java | 5 ++-
.../rest/handler/util/MutableIOMetrics.java | 32 ++++++++++++++-
.../job/SubtaskExecutionAttemptDetailsInfo.java | 5 ++-
.../rest/messages/job/metrics/IOMetricsInfo.java | 45 ++++++++++++++++++++--
.../DefaultExecutionGraphDeploymentTest.java | 6 +--
.../ExecutionPartitionLifecycleTest.java | 4 +-
.../flink/runtime/metrics/TimerGaugeTest.java | 4 ++
.../metrics/groups/TaskIOMetricGroupTest.java | 15 ++++++++
.../SubtaskCurrentAttemptDetailsHandlerTest.java | 28 ++++++++++++--
.../SubtaskExecutionAttemptDetailsHandlerTest.java | 28 ++++++++++++--
.../rest/messages/JobVertexDetailsInfoTest.java | 5 ++-
.../messages/JobVertexTaskManagersInfoTest.java | 5 ++-
.../rest/messages/job/JobDetailsInfoTest.java | 5 ++-
.../SubtaskExecutionAttemptDetailsInfoTest.java | 5 ++-
.../adaptivebatch/AdaptiveBatchSchedulerTest.java | 2 +-
.../flink/streaming/runtime/tasks/StreamTask.java | 1 +
24 files changed, 361 insertions(+), 26 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index 7638b3ee8da..4b648e77e72 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -1497,6 +1497,15 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:IOMetricsInfo",
"properties" : {
+ "accumulated-backpressured-time" : {
+ "type" : "integer"
+ },
+ "accumulated-busy-time" : {
+ "type" : "number"
+ },
+ "accumulated-idle-time" : {
+ "type" : "integer"
+ },
"read-bytes" : {
"type" : "integer"
},
@@ -3658,6 +3667,15 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:IOMetricsInfo",
"properties" : {
+ "accumulated-backpressured-time" : {
+ "type" : "integer"
+ },
+ "accumulated-busy-time" : {
+ "type" : "number"
+ },
+ "accumulated-idle-time" : {
+ "type" : "integer"
+ },
"read-bytes" : {
"type" : "integer"
},
@@ -4343,6 +4361,15 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:IOMetricsInfo",
"properties" : {
+ "accumulated-backpressured-time" : {
+ "type" : "integer"
+ },
+ "accumulated-busy-time" : {
+ "type" : "number"
+ },
+ "accumulated-idle-time" : {
+ "type" : "integer"
+ },
"read-bytes" : {
"type" : "integer"
},
@@ -4471,6 +4498,15 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:IOMetricsInfo",
"properties" : {
+ "accumulated-backpressured-time" : {
+ "type" : "integer"
+ },
+ "accumulated-busy-time" : {
+ "type" : "number"
+ },
+ "accumulated-idle-time" : {
+ "type" : "integer"
+ },
"read-bytes" : {
"type" : "integer"
},
@@ -4883,6 +4919,15 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:IOMetricsInfo",
"properties" : {
+ "accumulated-backpressured-time" : {
+ "type" : "integer"
+ },
+ "accumulated-busy-time" : {
+ "type" : "number"
+ },
+ "accumulated-idle-time" : {
+ "type" : "integer"
+ },
"read-bytes" : {
"type" : "integer"
},
diff --git a/docs/static/generated/rest_v1_dispatcher.yml b/docs/static/generated/rest_v1_dispatcher.yml
index 859783719ae..259d5b6d9ba 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -1628,6 +1628,15 @@ components:
format: int64
write-records-complete:
type: boolean
+ accumulated-backpressured-time:
+ type: integer
+ format: int64
+ accumulated-idle-time:
+ type: integer
+ format: int64
+ accumulated-busy-time:
+ type: number
+ format: double
SavepointFormatType:
type: string
enum:
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 537e966deff..99cd6fb0cab 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -829,6 +829,15 @@
},
"write-records-complete" : {
"type" : "boolean"
+ },
+ "accumulated-backpressured-time" : {
+ "type" : "integer"
+ },
+ "accumulated-idle-time" : {
+ "type" : "integer"
+ },
+ "accumulated-busy-time" : {
+ "type" : "number"
}
}
}
@@ -2167,6 +2176,15 @@
},
"write-records-complete" : {
"type" : "boolean"
+ },
+ "accumulated-backpressured-time" : {
+ "type" : "integer"
+ },
+ "accumulated-idle-time" : {
+ "type" : "integer"
+ },
+ "accumulated-busy-time" : {
+ "type" : "number"
}
}
},
@@ -2528,6 +2546,15 @@
},
"write-records-complete" : {
"type" : "boolean"
+ },
+ "accumulated-backpressured-time" : {
+ "type" : "integer"
+ },
+ "accumulated-idle-time" : {
+ "type" : "integer"
+ },
+ "accumulated-busy-time" : {
+ "type" : "number"
}
}
},
@@ -2614,6 +2641,15 @@
},
"write-records-complete" : {
"type" : "boolean"
+ },
+ "accumulated-backpressured-time" : {
+ "type" : "integer"
+ },
+ "accumulated-idle-time" : {
+ "type" : "integer"
+ },
+ "accumulated-busy-time" : {
+ "type" : "number"
}
}
},
@@ -2843,6 +2879,15 @@
},
"write-records-complete" : {
"type" : "boolean"
+ },
+ "accumulated-backpressured-time" : {
+ "type" : "integer"
+ },
+ "accumulated-idle-time" : {
+ "type" : "integer"
+ },
+ "accumulated-busy-time" : {
+ "type" : "number"
}
}
},
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
index b7bbf86fb3c..e612837531a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -37,6 +38,10 @@ public class IOMetrics implements Serializable {
protected long numBytesIn;
protected long numBytesOut;
+ protected long accumulateBackPressuredTime;
+ protected double accumulateBusyTime;
+ protected long accumulateIdleTime;
+
protected final Map<IntermediateResultPartitionID, Long> numBytesProducedOfPartitions =
new HashMap<>();
@@ -45,11 +50,17 @@ public class IOMetrics implements Serializable {
Meter recordsOut,
Meter bytesIn,
Meter bytesOut,
- Map<IntermediateResultPartitionID, Counter> numBytesProducedCounters) {
+ Map<IntermediateResultPartitionID, Counter> numBytesProducedCounters,
+ Gauge<Long> accumulatedBackPressuredTime,
+ Gauge<Long> accumulatedIdleTime,
+ Gauge<Double> accumulatedBusyTime) {
this.numRecordsIn = recordsIn.getCount();
this.numRecordsOut = recordsOut.getCount();
this.numBytesIn = bytesIn.getCount();
this.numBytesOut = bytesOut.getCount();
+ this.accumulateBackPressuredTime = accumulatedBackPressuredTime.getValue();
+ this.accumulateBusyTime = accumulatedBusyTime.getValue();
+ this.accumulateIdleTime = accumulatedIdleTime.getValue();
for (Map.Entry<IntermediateResultPartitionID, Counter> counter :
numBytesProducedCounters.entrySet()) {
@@ -57,11 +68,21 @@ public class IOMetrics implements Serializable {
}
}
- public IOMetrics(long numBytesIn, long numBytesOut, long numRecordsIn, long numRecordsOut) {
+ public IOMetrics(
+ long numBytesIn,
+ long numBytesOut,
+ long numRecordsIn,
+ long numRecordsOut,
+ long accumulateIdleTime,
+ long accumulateBusyTime,
+ long accumulateBackPressuredTime) {
this.numBytesIn = numBytesIn;
this.numBytesOut = numBytesOut;
this.numRecordsIn = numRecordsIn;
this.numRecordsOut = numRecordsOut;
+ this.accumulateIdleTime = accumulateIdleTime;
+ this.accumulateBusyTime = accumulateBusyTime;
+ this.accumulateBackPressuredTime = accumulateBackPressuredTime;
}
public long getNumRecordsIn() {
@@ -80,6 +101,18 @@ public class IOMetrics implements Serializable {
return numBytesOut;
}
+ public double getAccumulateBusyTime() {
+ return accumulateBusyTime;
+ }
+
+ public long getAccumulateBackPressuredTime() {
+ return accumulateBackPressuredTime;
+ }
+
+ public long getAccumulateIdleTime() {
+ return accumulateIdleTime;
+ }
+
public Map<IntermediateResultPartitionID, Long> getNumBytesProducedOfPartitions() {
return numBytesProducedOfPartitions;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
index 762c6671791..1f083520f01 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
@@ -72,6 +72,9 @@ public class MetricNames {
public static final String TASK_IDLE_TIME = "idleTimeMs" + SUFFIX_RATE;
public static final String TASK_BUSY_TIME = "busyTimeMs" + SUFFIX_RATE;
public static final String TASK_BACK_PRESSURED_TIME = "backPressuredTimeMs" + SUFFIX_RATE;
+ public static final String ACC_TASK_IDLE_TIME = "accumulateIdleTimeMs";
+ public static final String ACC_TASK_BUSY_TIME = "accumulateBusyTimeMs";
+ public static final String ACC_TASK_BACK_PRESSURED_TIME = "accumulateBackPressuredTimeMs";
public static final String TASK_SOFT_BACK_PRESSURED_TIME =
"softBackPressuredTimeMs" + SUFFIX_RATE;
public static final String TASK_HARD_BACK_PRESSURED_TIME =
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java
index d7f81d06c6a..21819c3d301 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java
@@ -47,6 +47,8 @@ public class TimerGauge implements Gauge<Long>, View {
private long previousMaxSingleMeasurement;
private long currentMaxSingleMeasurement;
+ private long accumulatedCount;
+
public TimerGauge() {
this(SystemClock.getInstance());
}
@@ -66,6 +68,7 @@ public class TimerGauge implements Gauge<Long>, View {
if (currentMeasurementStartTS != 0) {
long currentMeasurement = clock.absoluteTimeMillis() - currentMeasurementStartTS;
currentCount += currentMeasurement;
+ accumulatedCount += currentMeasurement;
currentMaxSingleMeasurement = Math.max(currentMaxSingleMeasurement, currentMeasurement);
currentUpdateTS = 0;
currentMeasurementStartTS = 0;
@@ -79,6 +82,7 @@ public class TimerGauge implements Gauge<Long>, View {
// we adding to the current count only the time elapsed since last markStart or update
// call
currentCount += now - currentUpdateTS;
+ accumulatedCount += now - currentUpdateTS;
currentUpdateTS = now;
// on the other hand, max measurement has to be always checked against last markStart
// call
@@ -104,6 +108,11 @@ public class TimerGauge implements Gauge<Long>, View {
return previousMaxSingleMeasurement;
}
+ /** @return the accumulated period by the given * TimerGauge. */
+ public synchronized long getAccumulatedCount() {
+ return accumulatedCount;
+ }
+
@VisibleForTesting
public synchronized long getCount() {
return currentCount;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index 0b580d9453e..17dca7b97b7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -60,12 +60,17 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
private final TimerGauge hardBackPressuredTimePerSecond;
private final Gauge<Long> maxSoftBackPressuredTime;
private final Gauge<Long> maxHardBackPressuredTime;
+ private final Gauge<Long> accumulatedBackPressuredTime;
+ private final Gauge<Long> accumulatedIdleTime;
+ private final Gauge<Double> accumulatedBusyTime;
private final Meter mailboxThroughput;
private final Histogram mailboxLatency;
private final SizeGauge mailboxSize;
private volatile boolean busyTimeEnabled;
+ private long taskStartTime;
+
private final Map<IntermediateResultPartitionID, Counter> numBytesProducedOfPartitions =
new HashMap<>();
@@ -107,6 +112,15 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
this.busyTimePerSecond = gauge(MetricNames.TASK_BUSY_TIME, this::getBusyTimePerSecond);
+ this.accumulatedBusyTime =
+ gauge(MetricNames.ACC_TASK_BUSY_TIME, this::getAccumulatedBusyTime);
+ this.accumulatedBackPressuredTime =
+ gauge(
+ MetricNames.ACC_TASK_BACK_PRESSURED_TIME,
+ this::getAccumulatedBackPressuredTimeMs);
+ this.accumulatedIdleTime =
+ gauge(MetricNames.ACC_TASK_IDLE_TIME, idleTimePerSecond::getAccumulatedCount);
+
this.numMailsProcessed = new SimpleCounter();
this.mailboxThroughput =
meter(MetricNames.MAILBOX_THROUGHPUT, new MeterView(numMailsProcessed));
@@ -121,7 +135,10 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
numRecordsOutRate,
numBytesInRate,
numBytesOutRate,
- numBytesProducedOfPartitions);
+ numBytesProducedOfPartitions,
+ accumulatedBackPressuredTime,
+ accumulatedIdleTime,
+ accumulatedBusyTime);
}
// ============================================================================================
@@ -169,6 +186,15 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
+ getHardBackPressuredTimePerSecond().getValue();
}
+ public long getAccumulatedBackPressuredTimeMs() {
+ return getSoftBackPressuredTimePerSecond().getAccumulatedCount()
+ + getHardBackPressuredTimePerSecond().getAccumulatedCount();
+ }
+
+ public void markTaskStart() {
+ this.taskStartTime = System.currentTimeMillis();
+ }
+
public void setEnableBusyTime(boolean enabled) {
busyTimeEnabled = enabled;
}
@@ -178,6 +204,17 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
return busyTimeEnabled ? 1000.0 - Math.min(busyTime, 1000.0) : Double.NaN;
}
+ private double getAccumulatedBusyTime() {
+ return busyTimeEnabled
+ ? Math.max(
+ System.currentTimeMillis()
+ - taskStartTime
+ - idleTimePerSecond.getAccumulatedCount()
+ - getAccumulatedBackPressuredTimeMs(),
+ 0)
+ : Double.NaN;
+ }
+
public Meter getMailboxThroughput() {
return mailboxThroughput;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
index 68b16fd8652..4f850bc0712 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
@@ -218,7 +218,10 @@ public class JobDetailsHandler
counts.getNumRecordsIn(),
counts.isNumRecordsInComplete(),
counts.getNumRecordsOut(),
- counts.isNumRecordsOutComplete());
+ counts.isNumRecordsOutComplete(),
+ counts.getAccumulateBackPressuredTime(),
+ counts.getAccumulateIdleTime(),
+ counts.getAccumulateBusyTime());
return new JobDetailsInfo.JobVertexDetailsInfo(
ejv.getJobVertexId(),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
index d1ed4030443..02df59952a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
@@ -206,7 +206,10 @@ public class JobVertexTaskManagersHandler
counts.getNumRecordsIn(),
counts.isNumRecordsInComplete(),
counts.getNumRecordsOut(),
- counts.isNumRecordsOutComplete());
+ counts.isNumRecordsOutComplete(),
+ counts.getAccumulateBackPressuredTime(),
+ counts.getAccumulateIdleTime(),
+ counts.getAccumulateBusyTime());
Map<ExecutionState, Integer> statusCounts =
new HashMap<>(ExecutionState.values().length);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
index b5dac6ce5ad..7da9061a090 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
@@ -47,7 +47,7 @@ public class MutableIOMetrics extends IOMetrics {
private boolean numRecordsOutComplete = true;
public MutableIOMetrics() {
- super(0, 0, 0, 0);
+ super(0, 0, 0, 0, 0, 0, 0);
}
public boolean isNumBytesInComplete() {
@@ -86,6 +86,13 @@ public class MutableIOMetrics extends IOMetrics {
this.numBytesOut += ioMetrics.getNumBytesOut();
this.numRecordsIn += ioMetrics.getNumRecordsIn();
this.numRecordsOut += ioMetrics.getNumRecordsOut();
+ this.accumulateBackPressuredTime += ioMetrics.getAccumulateBackPressuredTime();
+ this.accumulateIdleTime += ioMetrics.getAccumulateIdleTime();
+ if (Double.isNaN(ioMetrics.getAccumulateBusyTime())) {
+ this.accumulateBusyTime = Double.NaN;
+ } else {
+ this.accumulateBusyTime += ioMetrics.getAccumulateBusyTime();
+ }
}
} else { // execAttempt is still running, use MetricQueryService instead
if (fetcher != null) {
@@ -127,6 +134,29 @@ public class MutableIOMetrics extends IOMetrics {
this.numRecordsOut +=
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT));
}
+
+ if (metrics.getMetric(MetricNames.ACC_TASK_BACK_PRESSURED_TIME) != null) {
+ this.accumulateBackPressuredTime +=
+ Long.parseLong(
+ metrics.getMetric(
+ MetricNames.ACC_TASK_BACK_PRESSURED_TIME));
+ }
+
+ if (metrics.getMetric(MetricNames.ACC_TASK_IDLE_TIME) != null) {
+ this.accumulateIdleTime +=
+ Long.parseLong(metrics.getMetric(MetricNames.ACC_TASK_IDLE_TIME));
+ }
+
+ if (metrics.getMetric(MetricNames.ACC_TASK_BUSY_TIME) != null) {
+ double busyTime =
+ Double.parseDouble(
+ metrics.getMetric(MetricNames.ACC_TASK_BUSY_TIME));
+ if (Double.isNaN(busyTime)) {
+ this.accumulateBusyTime = Double.NaN;
+ } else {
+ this.accumulateBusyTime += busyTime;
+ }
+ }
} else {
this.numBytesInComplete = false;
this.numBytesOutComplete = false;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
index 42718d0070d..637538f1f48 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
@@ -226,7 +226,10 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
ioMetrics.getNumRecordsIn(),
ioMetrics.isNumRecordsInComplete(),
ioMetrics.getNumRecordsOut(),
- ioMetrics.isNumRecordsOutComplete());
+ ioMetrics.isNumRecordsOutComplete(),
+ ioMetrics.getAccumulateBackPressuredTime(),
+ ioMetrics.getAccumulateIdleTime(),
+ ioMetrics.getAccumulateBusyTime());
return new SubtaskExecutionAttemptDetailsInfo(
execution.getParallelSubtaskIndex(),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java
index 29e3e62c8b7..02c37152fe8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java
@@ -42,6 +42,12 @@ public final class IOMetricsInfo {
private static final String FIELD_NAME_RECORDS_WRITTEN_COMPLETE = "write-records-complete";
+ private static final String FIELD_NAME_ACC_BACK_PRESSURE = "accumulated-backpressured-time";
+
+ private static final String FIELD_NAME_ACC_IDLE = "accumulated-idle-time";
+
+ private static final String FIELD_NAME_ACC_BUSY = "accumulated-busy-time";
+
@JsonProperty(FIELD_NAME_BYTES_READ)
private final long bytesRead;
@@ -66,6 +72,15 @@ public final class IOMetricsInfo {
@JsonProperty(FIELD_NAME_RECORDS_WRITTEN_COMPLETE)
private final boolean recordsWrittenComplete;
+ @JsonProperty(FIELD_NAME_ACC_BACK_PRESSURE)
+ private final long accumulatedBackpressured;
+
+ @JsonProperty(FIELD_NAME_ACC_IDLE)
+ private final long accumulatedIdle;
+
+ @JsonProperty(FIELD_NAME_ACC_BUSY)
+ private final double accumulatedBusy;
+
@JsonCreator
public IOMetricsInfo(
@JsonProperty(FIELD_NAME_BYTES_READ) long bytesRead,
@@ -75,7 +90,10 @@ public final class IOMetricsInfo {
@JsonProperty(FIELD_NAME_RECORDS_READ) long recordsRead,
@JsonProperty(FIELD_NAME_RECORDS_READ_COMPLETE) boolean recordsReadComplete,
@JsonProperty(FIELD_NAME_RECORDS_WRITTEN) long recordsWritten,
- @JsonProperty(FIELD_NAME_RECORDS_WRITTEN_COMPLETE) boolean recordsWrittenComplete) {
+ @JsonProperty(FIELD_NAME_RECORDS_WRITTEN_COMPLETE) boolean recordsWrittenComplete,
+ @JsonProperty(FIELD_NAME_ACC_BACK_PRESSURE) long accumulatedBackpressured,
+ @JsonProperty(FIELD_NAME_ACC_IDLE) long accumulatedIdle,
+ @JsonProperty(FIELD_NAME_ACC_BUSY) double accumulatedBusy) {
this.bytesRead = bytesRead;
this.bytesReadComplete = bytesReadComplete;
this.bytesWritten = bytesWritten;
@@ -84,6 +102,9 @@ public final class IOMetricsInfo {
this.recordsReadComplete = recordsReadComplete;
this.recordsWritten = recordsWritten;
this.recordsWrittenComplete = recordsWrittenComplete;
+ this.accumulatedBackpressured = accumulatedBackpressured;
+ this.accumulatedIdle = accumulatedIdle;
+ this.accumulatedBusy = accumulatedBusy;
}
public long getBytesRead() {
@@ -118,6 +139,18 @@ public final class IOMetricsInfo {
return recordsWrittenComplete;
}
+ public long getAccumulatedBackpressured() {
+ return accumulatedBackpressured;
+ }
+
+ public double getAccumulatedBusy() {
+ return accumulatedBusy;
+ }
+
+ public long getAccumulatedIdle() {
+ return accumulatedIdle;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -134,7 +167,10 @@ public final class IOMetricsInfo {
&& recordsRead == that.recordsRead
&& recordsReadComplete == that.recordsReadComplete
&& recordsWritten == that.recordsWritten
- && recordsWrittenComplete == that.recordsWrittenComplete;
+ && recordsWrittenComplete == that.recordsWrittenComplete
+ && accumulatedBackpressured == that.accumulatedBackpressured
+ && accumulatedBusy == that.accumulatedBusy
+ && accumulatedIdle == that.accumulatedIdle;
}
@Override
@@ -147,6 +183,9 @@ public final class IOMetricsInfo {
recordsRead,
recordsReadComplete,
recordsWritten,
- recordsWrittenComplete);
+ recordsWrittenComplete,
+ accumulatedBackpressured,
+ accumulatedBusy,
+ accumulatedIdle);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
index f5c222e91c5..a2abb0b5901 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
@@ -344,7 +344,7 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger {
// verify behavior for canceled executions
Execution execution1 = executions.values().iterator().next();
- IOMetrics ioMetrics = new IOMetrics(0, 0, 0, 0);
+ IOMetrics ioMetrics = new IOMetrics(0, 0, 0, 0, 0, 0, 0);
Map<String, Accumulator<?, ?>> accumulators = new HashMap<>();
accumulators.put("acc", new IntCounter(4));
AccumulatorSnapshot accumulatorSnapshot =
@@ -367,7 +367,7 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger {
// verify behavior for failed executions
Execution execution2 = executions.values().iterator().next();
- IOMetrics ioMetrics2 = new IOMetrics(0, 0, 0, 0);
+ IOMetrics ioMetrics2 = new IOMetrics(0, 0, 0, 0, 0, 0, 0);
Map<String, Accumulator<?, ?>> accumulators2 = new HashMap<>();
accumulators2.put("acc", new IntCounter(8));
AccumulatorSnapshot accumulatorSnapshot2 =
@@ -405,7 +405,7 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger {
Map<ExecutionAttemptID, Execution> executions =
scheduler.getExecutionGraph().getRegisteredExecutions();
- IOMetrics ioMetrics = new IOMetrics(0, 0, 0, 0);
+ IOMetrics ioMetrics = new IOMetrics(0, 0, 0, 0, 0, 0, 0);
Map<String, Accumulator<?, ?>> accumulators = Collections.emptyMap();
Execution execution1 = executions.values().iterator().next();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
index 62f907f78cc..df765c69cd7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
@@ -159,7 +159,7 @@ public class ExecutionPartitionLifecycleTest extends TestLogger {
execution -> {
execution.cancel();
execution.completeCancelling(
- Collections.emptyMap(), new IOMetrics(0, 0, 0, 0), false);
+ Collections.emptyMap(), new IOMetrics(0, 0, 0, 0, 0, 0, 0), false);
},
PartitionReleaseResult.STOP_TRACKING);
}
@@ -182,7 +182,7 @@ public class ExecutionPartitionLifecycleTest extends TestLogger {
new Exception("Test exception"),
false,
Collections.emptyMap(),
- new IOMetrics(0, 0, 0, 0),
+ new IOMetrics(0, 0, 0, 0, 0, 0, 0),
false,
true),
PartitionReleaseResult.STOP_TRACKING);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TimerGaugeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TimerGaugeTest.java
index f066e776524..0ddbde97637 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TimerGaugeTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TimerGaugeTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
/** Tests for {@link TimerGauge}. */
@@ -49,6 +50,7 @@ public class TimerGaugeTest {
gauge.update();
assertThat(gauge.getValue(), is(0L));
assertThat(gauge.getMaxSingleMeasurement(), is(0L));
+ assertEquals(gauge.getAccumulatedCount(), 0L);
gauge.markStart();
clock.advanceTime(SLEEP, TimeUnit.MILLISECONDS);
@@ -57,6 +59,7 @@ public class TimerGaugeTest {
assertThat(gauge.getValue(), greaterThanOrEqualTo(SLEEP / View.UPDATE_INTERVAL_SECONDS));
assertThat(gauge.getMaxSingleMeasurement(), is(SLEEP));
+ assertEquals(gauge.getAccumulatedCount(), SLEEP);
// Check that the getMaxSingleMeasurement can go down after an update
gauge.markStart();
@@ -65,6 +68,7 @@ public class TimerGaugeTest {
gauge.update();
assertThat(gauge.getMaxSingleMeasurement(), is(SLEEP / 2));
+ assertEquals(gauge.getAccumulatedCount(), SLEEP + SLEEP / 2);
}
@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
index 7acc04bc604..71a978636bd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
@@ -38,6 +38,8 @@ public class TaskIOMetricGroupTest {
public void testTaskIOMetricGroup() throws InterruptedException {
TaskMetricGroup task = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
TaskIOMetricGroup taskIO = task.getIOMetricGroup();
+ taskIO.setEnableBusyTime(true);
+ final long startTime = System.currentTimeMillis();
// test counter forwarding
assertNotNull(taskIO.getNumRecordsInCounter());
@@ -75,6 +77,19 @@ public class TaskIOMetricGroupTest {
assertEquals(100L, io.getNumBytesIn());
assertEquals(250L, io.getNumBytesOut());
assertEquals(3L, taskIO.getNumBuffersOutCounter().getCount());
+ assertEquals(
+ taskIO.getIdleTimeMsPerSecond().getAccumulatedCount(), io.getAccumulateIdleTime());
+ assertEquals(
+ taskIO.getHardBackPressuredTimePerSecond().getAccumulatedCount()
+ + taskIO.getSoftBackPressuredTimePerSecond().getAccumulatedCount(),
+ io.getAccumulateBackPressuredTime());
+ assertThat(
+ io.getAccumulateBusyTime(),
+ greaterThanOrEqualTo(
+ (double) System.currentTimeMillis()
+ - startTime
+ - io.getAccumulateIdleTime()
+ - io.getAccumulateBackPressuredTime()));
assertThat(taskIO.getIdleTimeMsPerSecond().getCount(), greaterThanOrEqualTo(softSleepTime));
assertThat(
taskIO.getSoftBackPressuredTimePerSecond().getCount(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
index 5ac759f7396..483ad2facf1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -72,8 +72,19 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
final long bytesOut = 10L;
final long recordsIn = 20L;
final long recordsOut = 30L;
-
- final IOMetrics ioMetrics = new IOMetrics(bytesIn, bytesOut, recordsIn, recordsOut);
+ final long accumulateIdleTime = 40L;
+ final long accumulateBusyTime = 50L;
+ final long accumulateBackPressuredTime = 60L;
+
+ final IOMetrics ioMetrics =
+ new IOMetrics(
+ bytesIn,
+ bytesOut,
+ recordsIn,
+ recordsOut,
+ accumulateIdleTime,
+ accumulateBusyTime,
+ accumulateBackPressuredTime);
final long[] timestamps = new long[ExecutionState.values().length];
timestamps[ExecutionState.DEPLOYING.ordinal()] = deployingTs;
@@ -146,7 +157,18 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
// Verify
final IOMetricsInfo ioMetricsInfo =
- new IOMetricsInfo(bytesIn, true, bytesOut, true, recordsIn, true, recordsOut, true);
+ new IOMetricsInfo(
+ bytesIn,
+ true,
+ bytesOut,
+ true,
+ recordsIn,
+ true,
+ recordsOut,
+ true,
+ accumulateBackPressuredTime,
+ accumulateIdleTime,
+ accumulateBusyTime);
final SubtaskExecutionAttemptDetailsInfo expectedDetailsInfo =
new SubtaskExecutionAttemptDetailsInfo(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
index 69a8e2b051c..4a288e738be 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -77,8 +77,19 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
final long bytesOut = 10L;
final long recordsIn = 20L;
final long recordsOut = 30L;
-
- final IOMetrics ioMetrics = new IOMetrics(bytesIn, bytesOut, recordsIn, recordsOut);
+ final long accumulateIdleTime = 40L;
+ final long accumulateBusyTime = 50L;
+ final long accumulateBackPressuredTime = 60L;
+
+ final IOMetrics ioMetrics =
+ new IOMetrics(
+ bytesIn,
+ bytesOut,
+ recordsIn,
+ recordsOut,
+ accumulateIdleTime,
+ accumulateBusyTime,
+ accumulateBackPressuredTime);
final ArchivedExecutionJobVertex archivedExecutionJobVertex =
new ArchivedExecutionJobVertex(
@@ -151,7 +162,18 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
// Verify
final IOMetricsInfo ioMetricsInfo =
- new IOMetricsInfo(bytesIn, true, bytesOut, true, recordsIn, true, recordsOut, true);
+ new IOMetricsInfo(
+ bytesIn,
+ true,
+ bytesOut,
+ true,
+ recordsIn,
+ true,
+ recordsOut,
+ true,
+ accumulateBackPressuredTime,
+ accumulateIdleTime,
+ accumulateBusyTime);
final SubtaskExecutionAttemptDetailsInfo expectedDetailsInfo =
new SubtaskExecutionAttemptDetailsInfo(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java
index 707aa777542..806ce560494 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java
@@ -47,7 +47,10 @@ public class JobVertexDetailsInfoTest
random.nextLong(),
random.nextBoolean(),
random.nextLong(),
- random.nextBoolean());
+ random.nextBoolean(),
+ Math.abs(random.nextLong()),
+ Math.abs(random.nextLong()),
+ Math.abs(random.nextDouble()));
List<SubtaskExecutionAttemptDetailsInfo> vertexTaskDetailList = new ArrayList<>();
vertexTaskDetailList.add(
new SubtaskExecutionAttemptDetailsInfo(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfoTest.java
index cf7aad02879..d988ee017a8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfoTest.java
@@ -54,7 +54,10 @@ public class JobVertexTaskManagersInfoTest
random.nextLong(),
random.nextBoolean(),
random.nextLong(),
- random.nextBoolean());
+ random.nextBoolean(),
+ Math.abs(random.nextLong()),
+ Math.abs(random.nextLong()),
+ Math.abs(random.nextDouble()));
int count = 100;
for (ExecutionState executionState : ExecutionState.values()) {
statusCounts.put(executionState, count++);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
index e867d2720d8..8c207c5b069 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
@@ -91,7 +91,10 @@ public class JobDetailsInfoTest extends RestResponseMarshallingTestBase<JobDetai
random.nextLong(),
random.nextBoolean(),
random.nextLong(),
- random.nextBoolean());
+ random.nextBoolean(),
+ Math.abs(random.nextLong()),
+ Math.abs(random.nextLong()),
+ Math.abs(random.nextDouble()));
for (ExecutionState executionState : ExecutionState.values()) {
tasksPerState.put(executionState, random.nextInt());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java
index e7ea728507d..f022d6ced26 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java
@@ -46,7 +46,10 @@ public class SubtaskExecutionAttemptDetailsInfoTest
Math.abs(random.nextLong()),
random.nextBoolean(),
Math.abs(random.nextLong()),
- random.nextBoolean());
+ random.nextBoolean(),
+ Math.abs(random.nextLong()),
+ Math.abs(random.nextLong()),
+ Math.abs(random.nextDouble()));
return new SubtaskExecutionAttemptDetailsInfo(
Math.abs(random.nextInt()),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
index bf330d9edf3..e8636ea8435 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
@@ -133,7 +133,7 @@ public class AdaptiveBatchSchedulerTest extends TestLogger {
state,
null,
null,
- new IOMetrics(0, 0, 0, 0)));
+ new IOMetrics(0, 0, 0, 0, 0, 0, 0)));
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 55a2f527a10..19bc707a363 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -772,6 +772,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
scheduleBufferDebloater();
// let the task do its work
+ getEnvironment().getMetricGroup().getIOMetricGroup().markTaskStart();
runMailboxLoop();
// if this left the run() method cleanly despite the fact that this was canceled,