You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gu...@apache.org on 2022/07/01 03:35:30 UTC

[flink] 02/03: [FLINK-28308] Introduce metrics of the accumulated time that a running task is busy / idle / back-pressured

This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bc2a86e5bb5f048fb5e5007f916405773a88b5cc
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Mon May 23 16:50:37 2022 +0800

    [FLINK-28308] Introduce metrics of the accumulated time that a running task is busy / idle / back-pressured
    
    This closes #20110.
---
 .../shortcodes/generated/rest_v1_dispatcher.html   | 45 ++++++++++++++++++++++
 docs/static/generated/rest_v1_dispatcher.yml       |  9 +++++
 .../src/test/resources/rest_api_v1.snapshot        | 45 ++++++++++++++++++++++
 .../flink/runtime/executiongraph/IOMetrics.java    | 37 +++++++++++++++++-
 .../apache/flink/runtime/metrics/MetricNames.java  |  3 ++
 .../apache/flink/runtime/metrics/TimerGauge.java   |  9 +++++
 .../runtime/metrics/groups/TaskIOMetricGroup.java  | 39 ++++++++++++++++++-
 .../rest/handler/job/JobDetailsHandler.java        |  5 ++-
 .../handler/job/JobVertexTaskManagersHandler.java  |  5 ++-
 .../rest/handler/util/MutableIOMetrics.java        | 32 ++++++++++++++-
 .../job/SubtaskExecutionAttemptDetailsInfo.java    |  5 ++-
 .../rest/messages/job/metrics/IOMetricsInfo.java   | 45 ++++++++++++++++++++--
 .../DefaultExecutionGraphDeploymentTest.java       |  6 +--
 .../ExecutionPartitionLifecycleTest.java           |  4 +-
 .../flink/runtime/metrics/TimerGaugeTest.java      |  4 ++
 .../metrics/groups/TaskIOMetricGroupTest.java      | 15 ++++++++
 .../SubtaskCurrentAttemptDetailsHandlerTest.java   | 28 ++++++++++++--
 .../SubtaskExecutionAttemptDetailsHandlerTest.java | 28 ++++++++++++--
 .../rest/messages/JobVertexDetailsInfoTest.java    |  5 ++-
 .../messages/JobVertexTaskManagersInfoTest.java    |  5 ++-
 .../rest/messages/job/JobDetailsInfoTest.java      |  5 ++-
 .../SubtaskExecutionAttemptDetailsInfoTest.java    |  5 ++-
 .../adaptivebatch/AdaptiveBatchSchedulerTest.java  |  2 +-
 .../flink/streaming/runtime/tasks/StreamTask.java  |  1 +
 24 files changed, 361 insertions(+), 26 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index 7638b3ee8da..4b648e77e72 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -1497,6 +1497,15 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
             "type" : "object",
             "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:IOMetricsInfo",
             "properties" : {
+              "accumulated-backpressured-time" : {
+                "type" : "integer"
+              },
+              "accumulated-busy-time" : {
+                "type" : "number"
+              },
+              "accumulated-idle-time" : {
+                "type" : "integer"
+              },
               "read-bytes" : {
                 "type" : "integer"
               },
@@ -3658,6 +3667,15 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
             "type" : "object",
             "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:IOMetricsInfo",
             "properties" : {
+              "accumulated-backpressured-time" : {
+                "type" : "integer"
+              },
+              "accumulated-busy-time" : {
+                "type" : "number"
+              },
+              "accumulated-idle-time" : {
+                "type" : "integer"
+              },
               "read-bytes" : {
                 "type" : "integer"
               },
@@ -4343,6 +4361,15 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
       "type" : "object",
       "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:IOMetricsInfo",
       "properties" : {
+        "accumulated-backpressured-time" : {
+          "type" : "integer"
+        },
+        "accumulated-busy-time" : {
+          "type" : "number"
+        },
+        "accumulated-idle-time" : {
+          "type" : "integer"
+        },
         "read-bytes" : {
           "type" : "integer"
         },
@@ -4471,6 +4498,15 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
       "type" : "object",
       "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:IOMetricsInfo",
       "properties" : {
+        "accumulated-backpressured-time" : {
+          "type" : "integer"
+        },
+        "accumulated-busy-time" : {
+          "type" : "number"
+        },
+        "accumulated-idle-time" : {
+          "type" : "integer"
+        },
         "read-bytes" : {
           "type" : "integer"
         },
@@ -4883,6 +4919,15 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
             "type" : "object",
             "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:IOMetricsInfo",
             "properties" : {
+              "accumulated-backpressured-time" : {
+                "type" : "integer"
+              },
+              "accumulated-busy-time" : {
+                "type" : "number"
+              },
+              "accumulated-idle-time" : {
+                "type" : "integer"
+              },
               "read-bytes" : {
                 "type" : "integer"
               },
diff --git a/docs/static/generated/rest_v1_dispatcher.yml b/docs/static/generated/rest_v1_dispatcher.yml
index 859783719ae..259d5b6d9ba 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -1628,6 +1628,15 @@ components:
           format: int64
         write-records-complete:
           type: boolean
+        accumulated-backpressured-time:
+          type: integer
+          format: int64
+        accumulated-idle-time:
+          type: integer
+          format: int64
+        accumulated-busy-time:
+          type: number
+          format: double
     SavepointFormatType:
       type: string
       enum:
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 537e966deff..99cd6fb0cab 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -829,6 +829,15 @@
                   },
                   "write-records-complete" : {
                     "type" : "boolean"
+                  },
+                  "accumulated-backpressured-time" : {
+                    "type" : "integer"
+                  },
+                  "accumulated-idle-time" : {
+                    "type" : "integer"
+                  },
+                  "accumulated-busy-time" : {
+                    "type" : "number"
                   }
                 }
               }
@@ -2167,6 +2176,15 @@
                   },
                   "write-records-complete" : {
                     "type" : "boolean"
+                  },
+                  "accumulated-backpressured-time" : {
+                    "type" : "integer"
+                  },
+                  "accumulated-idle-time" : {
+                    "type" : "integer"
+                  },
+                  "accumulated-busy-time" : {
+                    "type" : "number"
                   }
                 }
               },
@@ -2528,6 +2546,15 @@
             },
             "write-records-complete" : {
               "type" : "boolean"
+            },
+            "accumulated-backpressured-time" : {
+              "type" : "integer"
+            },
+            "accumulated-idle-time" : {
+              "type" : "integer"
+            },
+            "accumulated-busy-time" : {
+              "type" : "number"
             }
           }
         },
@@ -2614,6 +2641,15 @@
             },
             "write-records-complete" : {
               "type" : "boolean"
+            },
+            "accumulated-backpressured-time" : {
+              "type" : "integer"
+            },
+            "accumulated-idle-time" : {
+              "type" : "integer"
+            },
+            "accumulated-busy-time" : {
+              "type" : "number"
             }
           }
         },
@@ -2843,6 +2879,15 @@
                   },
                   "write-records-complete" : {
                     "type" : "boolean"
+                  },
+                  "accumulated-backpressured-time" : {
+                    "type" : "integer"
+                  },
+                  "accumulated-idle-time" : {
+                    "type" : "integer"
+                  },
+                  "accumulated-busy-time" : {
+                    "type" : "number"
                   }
                 }
               },
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
index b7bbf86fb3c..e612837531a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 
@@ -37,6 +38,10 @@ public class IOMetrics implements Serializable {
     protected long numBytesIn;
     protected long numBytesOut;
 
+    protected long accumulateBackPressuredTime;
+    protected double accumulateBusyTime;
+    protected long accumulateIdleTime;
+
     protected final Map<IntermediateResultPartitionID, Long> numBytesProducedOfPartitions =
             new HashMap<>();
 
@@ -45,11 +50,17 @@ public class IOMetrics implements Serializable {
             Meter recordsOut,
             Meter bytesIn,
             Meter bytesOut,
-            Map<IntermediateResultPartitionID, Counter> numBytesProducedCounters) {
+            Map<IntermediateResultPartitionID, Counter> numBytesProducedCounters,
+            Gauge<Long> accumulatedBackPressuredTime,
+            Gauge<Long> accumulatedIdleTime,
+            Gauge<Double> accumulatedBusyTime) {
         this.numRecordsIn = recordsIn.getCount();
         this.numRecordsOut = recordsOut.getCount();
         this.numBytesIn = bytesIn.getCount();
         this.numBytesOut = bytesOut.getCount();
+        this.accumulateBackPressuredTime = accumulatedBackPressuredTime.getValue();
+        this.accumulateBusyTime = accumulatedBusyTime.getValue();
+        this.accumulateIdleTime = accumulatedIdleTime.getValue();
 
         for (Map.Entry<IntermediateResultPartitionID, Counter> counter :
                 numBytesProducedCounters.entrySet()) {
@@ -57,11 +68,21 @@ public class IOMetrics implements Serializable {
         }
     }
 
-    public IOMetrics(long numBytesIn, long numBytesOut, long numRecordsIn, long numRecordsOut) {
+    public IOMetrics(
+            long numBytesIn,
+            long numBytesOut,
+            long numRecordsIn,
+            long numRecordsOut,
+            long accumulateIdleTime,
+            long accumulateBusyTime,
+            long accumulateBackPressuredTime) {
         this.numBytesIn = numBytesIn;
         this.numBytesOut = numBytesOut;
         this.numRecordsIn = numRecordsIn;
         this.numRecordsOut = numRecordsOut;
+        this.accumulateIdleTime = accumulateIdleTime;
+        this.accumulateBusyTime = accumulateBusyTime;
+        this.accumulateBackPressuredTime = accumulateBackPressuredTime;
     }
 
     public long getNumRecordsIn() {
@@ -80,6 +101,18 @@ public class IOMetrics implements Serializable {
         return numBytesOut;
     }
 
+    public double getAccumulateBusyTime() {
+        return accumulateBusyTime;
+    }
+
+    public long getAccumulateBackPressuredTime() {
+        return accumulateBackPressuredTime;
+    }
+
+    public long getAccumulateIdleTime() {
+        return accumulateIdleTime;
+    }
+
     public Map<IntermediateResultPartitionID, Long> getNumBytesProducedOfPartitions() {
         return numBytesProducedOfPartitions;
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
index 762c6671791..1f083520f01 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
@@ -72,6 +72,9 @@ public class MetricNames {
     public static final String TASK_IDLE_TIME = "idleTimeMs" + SUFFIX_RATE;
     public static final String TASK_BUSY_TIME = "busyTimeMs" + SUFFIX_RATE;
     public static final String TASK_BACK_PRESSURED_TIME = "backPressuredTimeMs" + SUFFIX_RATE;
+    public static final String ACC_TASK_IDLE_TIME = "accumulateIdleTimeMs";
+    public static final String ACC_TASK_BUSY_TIME = "accumulateBusyTimeMs";
+    public static final String ACC_TASK_BACK_PRESSURED_TIME = "accumulateBackPressuredTimeMs";
     public static final String TASK_SOFT_BACK_PRESSURED_TIME =
             "softBackPressuredTimeMs" + SUFFIX_RATE;
     public static final String TASK_HARD_BACK_PRESSURED_TIME =
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java
index d7f81d06c6a..21819c3d301 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java
@@ -47,6 +47,8 @@ public class TimerGauge implements Gauge<Long>, View {
     private long previousMaxSingleMeasurement;
     private long currentMaxSingleMeasurement;
 
+    private long accumulatedCount;
+
     public TimerGauge() {
         this(SystemClock.getInstance());
     }
@@ -66,6 +68,7 @@ public class TimerGauge implements Gauge<Long>, View {
         if (currentMeasurementStartTS != 0) {
             long currentMeasurement = clock.absoluteTimeMillis() - currentMeasurementStartTS;
             currentCount += currentMeasurement;
+            accumulatedCount += currentMeasurement;
             currentMaxSingleMeasurement = Math.max(currentMaxSingleMeasurement, currentMeasurement);
             currentUpdateTS = 0;
             currentMeasurementStartTS = 0;
@@ -79,6 +82,7 @@ public class TimerGauge implements Gauge<Long>, View {
             // we adding to the current count only the time elapsed since last markStart or update
             // call
             currentCount += now - currentUpdateTS;
+            accumulatedCount += now - currentUpdateTS;
             currentUpdateTS = now;
             // on the other hand, max measurement has to be always checked against last markStart
             // call
@@ -104,6 +108,11 @@ public class TimerGauge implements Gauge<Long>, View {
         return previousMaxSingleMeasurement;
     }
 
+    /** @return the accumulated period by the given * TimerGauge. */
+    public synchronized long getAccumulatedCount() {
+        return accumulatedCount;
+    }
+
     @VisibleForTesting
     public synchronized long getCount() {
         return currentCount;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index 0b580d9453e..17dca7b97b7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -60,12 +60,17 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
     private final TimerGauge hardBackPressuredTimePerSecond;
     private final Gauge<Long> maxSoftBackPressuredTime;
     private final Gauge<Long> maxHardBackPressuredTime;
+    private final Gauge<Long> accumulatedBackPressuredTime;
+    private final Gauge<Long> accumulatedIdleTime;
+    private final Gauge<Double> accumulatedBusyTime;
     private final Meter mailboxThroughput;
     private final Histogram mailboxLatency;
     private final SizeGauge mailboxSize;
 
     private volatile boolean busyTimeEnabled;
 
+    private long taskStartTime;
+
     private final Map<IntermediateResultPartitionID, Counter> numBytesProducedOfPartitions =
             new HashMap<>();
 
@@ -107,6 +112,15 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
 
         this.busyTimePerSecond = gauge(MetricNames.TASK_BUSY_TIME, this::getBusyTimePerSecond);
 
+        this.accumulatedBusyTime =
+                gauge(MetricNames.ACC_TASK_BUSY_TIME, this::getAccumulatedBusyTime);
+        this.accumulatedBackPressuredTime =
+                gauge(
+                        MetricNames.ACC_TASK_BACK_PRESSURED_TIME,
+                        this::getAccumulatedBackPressuredTimeMs);
+        this.accumulatedIdleTime =
+                gauge(MetricNames.ACC_TASK_IDLE_TIME, idleTimePerSecond::getAccumulatedCount);
+
         this.numMailsProcessed = new SimpleCounter();
         this.mailboxThroughput =
                 meter(MetricNames.MAILBOX_THROUGHPUT, new MeterView(numMailsProcessed));
@@ -121,7 +135,10 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
                 numRecordsOutRate,
                 numBytesInRate,
                 numBytesOutRate,
-                numBytesProducedOfPartitions);
+                numBytesProducedOfPartitions,
+                accumulatedBackPressuredTime,
+                accumulatedIdleTime,
+                accumulatedBusyTime);
     }
 
     // ============================================================================================
@@ -169,6 +186,15 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
                 + getHardBackPressuredTimePerSecond().getValue();
     }
 
+    public long getAccumulatedBackPressuredTimeMs() {
+        return getSoftBackPressuredTimePerSecond().getAccumulatedCount()
+                + getHardBackPressuredTimePerSecond().getAccumulatedCount();
+    }
+
+    public void markTaskStart() {
+        this.taskStartTime = System.currentTimeMillis();
+    }
+
     public void setEnableBusyTime(boolean enabled) {
         busyTimeEnabled = enabled;
     }
@@ -178,6 +204,17 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
         return busyTimeEnabled ? 1000.0 - Math.min(busyTime, 1000.0) : Double.NaN;
     }
 
+    private double getAccumulatedBusyTime() {
+        return busyTimeEnabled
+                ? Math.max(
+                        System.currentTimeMillis()
+                                - taskStartTime
+                                - idleTimePerSecond.getAccumulatedCount()
+                                - getAccumulatedBackPressuredTimeMs(),
+                        0)
+                : Double.NaN;
+    }
+
     public Meter getMailboxThroughput() {
         return mailboxThroughput;
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
index 68b16fd8652..4f850bc0712 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
@@ -218,7 +218,10 @@ public class JobDetailsHandler
                         counts.getNumRecordsIn(),
                         counts.isNumRecordsInComplete(),
                         counts.getNumRecordsOut(),
-                        counts.isNumRecordsOutComplete());
+                        counts.isNumRecordsOutComplete(),
+                        counts.getAccumulateBackPressuredTime(),
+                        counts.getAccumulateIdleTime(),
+                        counts.getAccumulateBusyTime());
 
         return new JobDetailsInfo.JobVertexDetailsInfo(
                 ejv.getJobVertexId(),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
index d1ed4030443..02df59952a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
@@ -206,7 +206,10 @@ public class JobVertexTaskManagersHandler
                             counts.getNumRecordsIn(),
                             counts.isNumRecordsInComplete(),
                             counts.getNumRecordsOut(),
-                            counts.isNumRecordsOutComplete());
+                            counts.isNumRecordsOutComplete(),
+                            counts.getAccumulateBackPressuredTime(),
+                            counts.getAccumulateIdleTime(),
+                            counts.getAccumulateBusyTime());
 
             Map<ExecutionState, Integer> statusCounts =
                     new HashMap<>(ExecutionState.values().length);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
index b5dac6ce5ad..7da9061a090 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
@@ -47,7 +47,7 @@ public class MutableIOMetrics extends IOMetrics {
     private boolean numRecordsOutComplete = true;
 
     public MutableIOMetrics() {
-        super(0, 0, 0, 0);
+        super(0, 0, 0, 0, 0, 0, 0);
     }
 
     public boolean isNumBytesInComplete() {
@@ -86,6 +86,13 @@ public class MutableIOMetrics extends IOMetrics {
                 this.numBytesOut += ioMetrics.getNumBytesOut();
                 this.numRecordsIn += ioMetrics.getNumRecordsIn();
                 this.numRecordsOut += ioMetrics.getNumRecordsOut();
+                this.accumulateBackPressuredTime += ioMetrics.getAccumulateBackPressuredTime();
+                this.accumulateIdleTime += ioMetrics.getAccumulateIdleTime();
+                if (Double.isNaN(ioMetrics.getAccumulateBusyTime())) {
+                    this.accumulateBusyTime = Double.NaN;
+                } else {
+                    this.accumulateBusyTime += ioMetrics.getAccumulateBusyTime();
+                }
             }
         } else { // execAttempt is still running, use MetricQueryService instead
             if (fetcher != null) {
@@ -127,6 +134,29 @@ public class MutableIOMetrics extends IOMetrics {
                         this.numRecordsOut +=
                                 Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT));
                     }
+
+                    if (metrics.getMetric(MetricNames.ACC_TASK_BACK_PRESSURED_TIME) != null) {
+                        this.accumulateBackPressuredTime +=
+                                Long.parseLong(
+                                        metrics.getMetric(
+                                                MetricNames.ACC_TASK_BACK_PRESSURED_TIME));
+                    }
+
+                    if (metrics.getMetric(MetricNames.ACC_TASK_IDLE_TIME) != null) {
+                        this.accumulateIdleTime +=
+                                Long.parseLong(metrics.getMetric(MetricNames.ACC_TASK_IDLE_TIME));
+                    }
+
+                    if (metrics.getMetric(MetricNames.ACC_TASK_BUSY_TIME) != null) {
+                        double busyTime =
+                                Double.parseDouble(
+                                        metrics.getMetric(MetricNames.ACC_TASK_BUSY_TIME));
+                        if (Double.isNaN(busyTime)) {
+                            this.accumulateBusyTime = Double.NaN;
+                        } else {
+                            this.accumulateBusyTime += busyTime;
+                        }
+                    }
                 } else {
                     this.numBytesInComplete = false;
                     this.numBytesOutComplete = false;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
index 42718d0070d..637538f1f48 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
@@ -226,7 +226,10 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
                         ioMetrics.getNumRecordsIn(),
                         ioMetrics.isNumRecordsInComplete(),
                         ioMetrics.getNumRecordsOut(),
-                        ioMetrics.isNumRecordsOutComplete());
+                        ioMetrics.isNumRecordsOutComplete(),
+                        ioMetrics.getAccumulateBackPressuredTime(),
+                        ioMetrics.getAccumulateIdleTime(),
+                        ioMetrics.getAccumulateBusyTime());
 
         return new SubtaskExecutionAttemptDetailsInfo(
                 execution.getParallelSubtaskIndex(),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java
index 29e3e62c8b7..02c37152fe8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java
@@ -42,6 +42,12 @@ public final class IOMetricsInfo {
 
     private static final String FIELD_NAME_RECORDS_WRITTEN_COMPLETE = "write-records-complete";
 
+    private static final String FIELD_NAME_ACC_BACK_PRESSURE = "accumulated-backpressured-time";
+
+    private static final String FIELD_NAME_ACC_IDLE = "accumulated-idle-time";
+
+    private static final String FIELD_NAME_ACC_BUSY = "accumulated-busy-time";
+
     @JsonProperty(FIELD_NAME_BYTES_READ)
     private final long bytesRead;
 
@@ -66,6 +72,15 @@ public final class IOMetricsInfo {
     @JsonProperty(FIELD_NAME_RECORDS_WRITTEN_COMPLETE)
     private final boolean recordsWrittenComplete;
 
+    @JsonProperty(FIELD_NAME_ACC_BACK_PRESSURE)
+    private final long accumulatedBackpressured;
+
+    @JsonProperty(FIELD_NAME_ACC_IDLE)
+    private final long accumulatedIdle;
+
+    @JsonProperty(FIELD_NAME_ACC_BUSY)
+    private final double accumulatedBusy;
+
     @JsonCreator
     public IOMetricsInfo(
             @JsonProperty(FIELD_NAME_BYTES_READ) long bytesRead,
@@ -75,7 +90,10 @@ public final class IOMetricsInfo {
             @JsonProperty(FIELD_NAME_RECORDS_READ) long recordsRead,
             @JsonProperty(FIELD_NAME_RECORDS_READ_COMPLETE) boolean recordsReadComplete,
             @JsonProperty(FIELD_NAME_RECORDS_WRITTEN) long recordsWritten,
-            @JsonProperty(FIELD_NAME_RECORDS_WRITTEN_COMPLETE) boolean recordsWrittenComplete) {
+            @JsonProperty(FIELD_NAME_RECORDS_WRITTEN_COMPLETE) boolean recordsWrittenComplete,
+            @JsonProperty(FIELD_NAME_ACC_BACK_PRESSURE) long accumulatedBackpressured,
+            @JsonProperty(FIELD_NAME_ACC_IDLE) long accumulatedIdle,
+            @JsonProperty(FIELD_NAME_ACC_BUSY) double accumulatedBusy) {
         this.bytesRead = bytesRead;
         this.bytesReadComplete = bytesReadComplete;
         this.bytesWritten = bytesWritten;
@@ -84,6 +102,9 @@ public final class IOMetricsInfo {
         this.recordsReadComplete = recordsReadComplete;
         this.recordsWritten = recordsWritten;
         this.recordsWrittenComplete = recordsWrittenComplete;
+        this.accumulatedBackpressured = accumulatedBackpressured;
+        this.accumulatedIdle = accumulatedIdle;
+        this.accumulatedBusy = accumulatedBusy;
     }
 
     public long getBytesRead() {
@@ -118,6 +139,18 @@ public final class IOMetricsInfo {
         return recordsWrittenComplete;
     }
 
+    public long getAccumulatedBackpressured() {
+        return accumulatedBackpressured;
+    }
+
+    public double getAccumulatedBusy() {
+        return accumulatedBusy;
+    }
+
+    public long getAccumulatedIdle() {
+        return accumulatedIdle;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -134,7 +167,10 @@ public final class IOMetricsInfo {
                 && recordsRead == that.recordsRead
                 && recordsReadComplete == that.recordsReadComplete
                 && recordsWritten == that.recordsWritten
-                && recordsWrittenComplete == that.recordsWrittenComplete;
+                && recordsWrittenComplete == that.recordsWrittenComplete
+                && accumulatedBackpressured == that.accumulatedBackpressured
+                && accumulatedBusy == that.accumulatedBusy
+                && accumulatedIdle == that.accumulatedIdle;
     }
 
     @Override
@@ -147,6 +183,9 @@ public final class IOMetricsInfo {
                 recordsRead,
                 recordsReadComplete,
                 recordsWritten,
-                recordsWrittenComplete);
+                recordsWrittenComplete,
+                accumulatedBackpressured,
+                accumulatedBusy,
+                accumulatedIdle);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
index f5c222e91c5..a2abb0b5901 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
@@ -344,7 +344,7 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger {
         // verify behavior for canceled executions
         Execution execution1 = executions.values().iterator().next();
 
-        IOMetrics ioMetrics = new IOMetrics(0, 0, 0, 0);
+        IOMetrics ioMetrics = new IOMetrics(0, 0, 0, 0, 0, 0, 0);
         Map<String, Accumulator<?, ?>> accumulators = new HashMap<>();
         accumulators.put("acc", new IntCounter(4));
         AccumulatorSnapshot accumulatorSnapshot =
@@ -367,7 +367,7 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger {
         // verify behavior for failed executions
         Execution execution2 = executions.values().iterator().next();
 
-        IOMetrics ioMetrics2 = new IOMetrics(0, 0, 0, 0);
+        IOMetrics ioMetrics2 = new IOMetrics(0, 0, 0, 0, 0, 0, 0);
         Map<String, Accumulator<?, ?>> accumulators2 = new HashMap<>();
         accumulators2.put("acc", new IntCounter(8));
         AccumulatorSnapshot accumulatorSnapshot2 =
@@ -405,7 +405,7 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger {
         Map<ExecutionAttemptID, Execution> executions =
                 scheduler.getExecutionGraph().getRegisteredExecutions();
 
-        IOMetrics ioMetrics = new IOMetrics(0, 0, 0, 0);
+        IOMetrics ioMetrics = new IOMetrics(0, 0, 0, 0, 0, 0, 0);
         Map<String, Accumulator<?, ?>> accumulators = Collections.emptyMap();
 
         Execution execution1 = executions.values().iterator().next();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
index 62f907f78cc..df765c69cd7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
@@ -159,7 +159,7 @@ public class ExecutionPartitionLifecycleTest extends TestLogger {
                 execution -> {
                     execution.cancel();
                     execution.completeCancelling(
-                            Collections.emptyMap(), new IOMetrics(0, 0, 0, 0), false);
+                            Collections.emptyMap(), new IOMetrics(0, 0, 0, 0, 0, 0, 0), false);
                 },
                 PartitionReleaseResult.STOP_TRACKING);
     }
@@ -182,7 +182,7 @@ public class ExecutionPartitionLifecycleTest extends TestLogger {
                                 new Exception("Test exception"),
                                 false,
                                 Collections.emptyMap(),
-                                new IOMetrics(0, 0, 0, 0),
+                                new IOMetrics(0, 0, 0, 0, 0, 0, 0),
                                 false,
                                 true),
                 PartitionReleaseResult.STOP_TRACKING);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TimerGaugeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TimerGaugeTest.java
index f066e776524..0ddbde97637 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TimerGaugeTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TimerGaugeTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 /** Tests for {@link TimerGauge}. */
@@ -49,6 +50,7 @@ public class TimerGaugeTest {
         gauge.update();
         assertThat(gauge.getValue(), is(0L));
         assertThat(gauge.getMaxSingleMeasurement(), is(0L));
+        assertEquals(gauge.getAccumulatedCount(), 0L);
 
         gauge.markStart();
         clock.advanceTime(SLEEP, TimeUnit.MILLISECONDS);
@@ -57,6 +59,7 @@ public class TimerGaugeTest {
 
         assertThat(gauge.getValue(), greaterThanOrEqualTo(SLEEP / View.UPDATE_INTERVAL_SECONDS));
         assertThat(gauge.getMaxSingleMeasurement(), is(SLEEP));
+        assertEquals(gauge.getAccumulatedCount(), SLEEP);
 
         // Check that the getMaxSingleMeasurement can go down after an update
         gauge.markStart();
@@ -65,6 +68,7 @@ public class TimerGaugeTest {
         gauge.update();
 
         assertThat(gauge.getMaxSingleMeasurement(), is(SLEEP / 2));
+        assertEquals(gauge.getAccumulatedCount(), SLEEP + SLEEP / 2);
     }
 
     @Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
index 7acc04bc604..71a978636bd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
@@ -38,6 +38,8 @@ public class TaskIOMetricGroupTest {
     public void testTaskIOMetricGroup() throws InterruptedException {
         TaskMetricGroup task = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
         TaskIOMetricGroup taskIO = task.getIOMetricGroup();
+        taskIO.setEnableBusyTime(true);
+        final long startTime = System.currentTimeMillis();
 
         // test counter forwarding
         assertNotNull(taskIO.getNumRecordsInCounter());
@@ -75,6 +77,19 @@ public class TaskIOMetricGroupTest {
         assertEquals(100L, io.getNumBytesIn());
         assertEquals(250L, io.getNumBytesOut());
         assertEquals(3L, taskIO.getNumBuffersOutCounter().getCount());
+        assertEquals(
+                taskIO.getIdleTimeMsPerSecond().getAccumulatedCount(), io.getAccumulateIdleTime());
+        assertEquals(
+                taskIO.getHardBackPressuredTimePerSecond().getAccumulatedCount()
+                        + taskIO.getSoftBackPressuredTimePerSecond().getAccumulatedCount(),
+                io.getAccumulateBackPressuredTime());
+        assertThat(
+                io.getAccumulateBusyTime(),
+                greaterThanOrEqualTo(
+                        (double) System.currentTimeMillis()
+                                - startTime
+                                - io.getAccumulateIdleTime()
+                                - io.getAccumulateBackPressuredTime()));
         assertThat(taskIO.getIdleTimeMsPerSecond().getCount(), greaterThanOrEqualTo(softSleepTime));
         assertThat(
                 taskIO.getSoftBackPressuredTimePerSecond().getCount(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
index 5ac759f7396..483ad2facf1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -72,8 +72,19 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
         final long bytesOut = 10L;
         final long recordsIn = 20L;
         final long recordsOut = 30L;
-
-        final IOMetrics ioMetrics = new IOMetrics(bytesIn, bytesOut, recordsIn, recordsOut);
+        final long accumulateIdleTime = 40L;
+        final long accumulateBusyTime = 50L;
+        final long accumulateBackPressuredTime = 60L;
+
+        final IOMetrics ioMetrics =
+                new IOMetrics(
+                        bytesIn,
+                        bytesOut,
+                        recordsIn,
+                        recordsOut,
+                        accumulateIdleTime,
+                        accumulateBusyTime,
+                        accumulateBackPressuredTime);
 
         final long[] timestamps = new long[ExecutionState.values().length];
         timestamps[ExecutionState.DEPLOYING.ordinal()] = deployingTs;
@@ -146,7 +157,18 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
 
         // Verify
         final IOMetricsInfo ioMetricsInfo =
-                new IOMetricsInfo(bytesIn, true, bytesOut, true, recordsIn, true, recordsOut, true);
+                new IOMetricsInfo(
+                        bytesIn,
+                        true,
+                        bytesOut,
+                        true,
+                        recordsIn,
+                        true,
+                        recordsOut,
+                        true,
+                        accumulateBackPressuredTime,
+                        accumulateIdleTime,
+                        accumulateBusyTime);
 
         final SubtaskExecutionAttemptDetailsInfo expectedDetailsInfo =
                 new SubtaskExecutionAttemptDetailsInfo(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
index 69a8e2b051c..4a288e738be 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -77,8 +77,19 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
         final long bytesOut = 10L;
         final long recordsIn = 20L;
         final long recordsOut = 30L;
-
-        final IOMetrics ioMetrics = new IOMetrics(bytesIn, bytesOut, recordsIn, recordsOut);
+        final long accumulateIdleTime = 40L;
+        final long accumulateBusyTime = 50L;
+        final long accumulateBackPressuredTime = 60L;
+
+        final IOMetrics ioMetrics =
+                new IOMetrics(
+                        bytesIn,
+                        bytesOut,
+                        recordsIn,
+                        recordsOut,
+                        accumulateIdleTime,
+                        accumulateBusyTime,
+                        accumulateBackPressuredTime);
 
         final ArchivedExecutionJobVertex archivedExecutionJobVertex =
                 new ArchivedExecutionJobVertex(
@@ -151,7 +162,18 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
 
         // Verify
         final IOMetricsInfo ioMetricsInfo =
-                new IOMetricsInfo(bytesIn, true, bytesOut, true, recordsIn, true, recordsOut, true);
+                new IOMetricsInfo(
+                        bytesIn,
+                        true,
+                        bytesOut,
+                        true,
+                        recordsIn,
+                        true,
+                        recordsOut,
+                        true,
+                        accumulateBackPressuredTime,
+                        accumulateIdleTime,
+                        accumulateBusyTime);
 
         final SubtaskExecutionAttemptDetailsInfo expectedDetailsInfo =
                 new SubtaskExecutionAttemptDetailsInfo(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java
index 707aa777542..806ce560494 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java
@@ -47,7 +47,10 @@ public class JobVertexDetailsInfoTest
                         random.nextLong(),
                         random.nextBoolean(),
                         random.nextLong(),
-                        random.nextBoolean());
+                        random.nextBoolean(),
+                        Math.abs(random.nextLong()),
+                        Math.abs(random.nextLong()),
+                        Math.abs(random.nextDouble()));
         List<SubtaskExecutionAttemptDetailsInfo> vertexTaskDetailList = new ArrayList<>();
         vertexTaskDetailList.add(
                 new SubtaskExecutionAttemptDetailsInfo(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfoTest.java
index cf7aad02879..d988ee017a8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfoTest.java
@@ -54,7 +54,10 @@ public class JobVertexTaskManagersInfoTest
                         random.nextLong(),
                         random.nextBoolean(),
                         random.nextLong(),
-                        random.nextBoolean());
+                        random.nextBoolean(),
+                        Math.abs(random.nextLong()),
+                        Math.abs(random.nextLong()),
+                        Math.abs(random.nextDouble()));
         int count = 100;
         for (ExecutionState executionState : ExecutionState.values()) {
             statusCounts.put(executionState, count++);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
index e867d2720d8..8c207c5b069 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
@@ -91,7 +91,10 @@ public class JobDetailsInfoTest extends RestResponseMarshallingTestBase<JobDetai
                         random.nextLong(),
                         random.nextBoolean(),
                         random.nextLong(),
-                        random.nextBoolean());
+                        random.nextBoolean(),
+                        Math.abs(random.nextLong()),
+                        Math.abs(random.nextLong()),
+                        Math.abs(random.nextDouble()));
 
         for (ExecutionState executionState : ExecutionState.values()) {
             tasksPerState.put(executionState, random.nextInt());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java
index e7ea728507d..f022d6ced26 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java
@@ -46,7 +46,10 @@ public class SubtaskExecutionAttemptDetailsInfoTest
                         Math.abs(random.nextLong()),
                         random.nextBoolean(),
                         Math.abs(random.nextLong()),
-                        random.nextBoolean());
+                        random.nextBoolean(),
+                        Math.abs(random.nextLong()),
+                        Math.abs(random.nextLong()),
+                        Math.abs(random.nextDouble()));
 
         return new SubtaskExecutionAttemptDetailsInfo(
                 Math.abs(random.nextInt()),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
index bf330d9edf3..e8636ea8435 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
@@ -133,7 +133,7 @@ public class AdaptiveBatchSchedulerTest extends TestLogger {
                             state,
                             null,
                             null,
-                            new IOMetrics(0, 0, 0, 0)));
+                            new IOMetrics(0, 0, 0, 0, 0, 0, 0)));
         }
     }
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 55a2f527a10..19bc707a363 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -772,6 +772,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
         scheduleBufferDebloater();
 
         // let the task do its work
+        getEnvironment().getMetricGroup().getIOMetricGroup().markTaskStart();
         runMailboxLoop();
 
         // if this left the run() method cleanly despite the fact that this was canceled,