You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gu...@apache.org on 2022/07/01 03:35:28 UTC

[flink] branch master updated (202eacb2ee9 -> bb8e1d14f05)

This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 202eacb2ee9 [FLINK-26361][hive] Create LogicalFilter with CorrelationId to fix failed to rewrite subquery in hive dialect (#18920)
     new 6b8230cf83f [hotfix][rest] Remove the stale items in rest_v1_dispatcher
     new bc2a86e5bb5 [FLINK-28308] Introduce metrics of the accumulated time that a running task is busy / idle / back-pressured
     new bb8e1d14f05 [FLINK-28309][rest] Introduce metrics of the duration that a task stays in each status

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../shortcodes/generated/rest_v1_dispatcher.html   | 169 ++++++++-------------
 docs/static/generated/rest_v1_dispatcher.yml       |  14 ++
 .../src/test/resources/rest_api_v1.snapshot        |  63 ++++++++
 .../runtime/executiongraph/AccessExecution.java    |  15 ++
 .../runtime/executiongraph/ArchivedExecution.java  |  19 ++-
 .../flink/runtime/executiongraph/Execution.java    |  29 +++-
 .../flink/runtime/executiongraph/IOMetrics.java    |  37 ++++-
 .../apache/flink/runtime/metrics/MetricNames.java  |   3 +
 .../apache/flink/runtime/metrics/TimerGauge.java   |   9 ++
 .../runtime/metrics/groups/TaskIOMetricGroup.java  |  39 ++++-
 .../rest/handler/job/JobDetailsHandler.java        |   5 +-
 .../handler/job/JobVertexTaskManagersHandler.java  |   5 +-
 .../rest/handler/util/MutableIOMetrics.java        |  32 +++-
 .../job/SubtaskExecutionAttemptDetailsInfo.java    |  82 +++++++++-
 .../rest/messages/job/metrics/IOMetricsInfo.java   |  45 +++++-
 .../executiongraph/ArchivedExecutionGraphTest.java |  30 ++++
 .../DefaultExecutionGraphDeploymentTest.java       |   6 +-
 .../executiongraph/ExecutionHistoryTest.java       |   1 +
 .../ExecutionPartitionLifecycleTest.java           |   4 +-
 .../flink/runtime/metrics/TimerGaugeTest.java      |   4 +
 .../metrics/groups/TaskIOMetricGroupTest.java      |  15 ++
 .../rest/handler/job/JobExceptionsHandlerTest.java |   4 +-
 .../SubtaskCurrentAttemptDetailsHandlerTest.java   |  44 +++++-
 ...askExecutionAttemptAccumulatorsHandlerTest.java |   1 +
 .../SubtaskExecutionAttemptDetailsHandlerTest.java |  40 ++++-
 .../rest/messages/JobVertexDetailsInfoTest.java    |  15 +-
 .../messages/JobVertexTaskManagersInfoTest.java    |   5 +-
 .../rest/messages/job/JobDetailsInfoTest.java      |   5 +-
 .../SubtaskExecutionAttemptDetailsInfoTest.java    |  17 ++-
 .../adaptivebatch/AdaptiveBatchSchedulerTest.java  |   2 +-
 .../exceptionhistory/TestingAccessExecution.java   |  10 ++
 .../flink/streaming/runtime/tasks/StreamTask.java  |   1 +
 32 files changed, 621 insertions(+), 149 deletions(-)


[flink] 01/03: [hotfix][rest] Remove the stale items in rest_v1_dispatcher

Posted by gu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6b8230cf83fd33c4fdd3d7cc99e6a90d9839a350
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Thu Jun 30 14:30:38 2022 +0800

    [hotfix][rest] Remove the stale items in rest_v1_dispatcher
---
 .../shortcodes/generated/rest_v1_dispatcher.html   | 106 ---------------------
 1 file changed, 106 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index b7e8965c958..7638b3ee8da 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -583,112 +583,6 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
     </tr>
   </tbody>
 </table>
-<table class="rest-api table table-bordered">
-  <tbody>
-    <tr>
-      <td class="text-left" colspan="2"><h5><strong>/jars/:jarid/plan</strong></h5></td>
-    </tr>
-    <tr>
-      <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
-      <td class="text-left">Response code: <code>200 OK</code></td>
-    </tr>
-    <tr>
-      <td colspan="2">Returns the dataflow plan of a job contained in a jar previously uploaded via '/jars/upload'. Program arguments can be passed both via the JSON request (recommended) or query parameters.</td>
-    </tr>
-    <tr>
-      <td colspan="2">Path parameters</td>
-    </tr>
-    <tr>
-      <td colspan="2">
-        <ul>
-<li><code>jarid</code> - String value that identifies a jar. When uploading the jar a path is returned, where the filename is the ID. This value is equivalent to the `id` field in the list of uploaded jars (/jars).</li>
-        </ul>
-      </td>
-    </tr>
-    <tr>
-      <td colspan="2">Query parameters</td>
-    </tr>
-    <tr>
-      <td colspan="2">
-        <ul>
-<li><code>program-args</code> (optional): Deprecated, please use 'programArg' instead. String value that specifies the arguments for the program or plan</li>
-<li><code>programArg</code> (optional): Comma-separated list of program arguments.</li>
-<li><code>entry-class</code> (optional): String value that specifies the fully qualified name of the entry point class. Overrides the class defined in the jar file manifest.</li>
-<li><code>parallelism</code> (optional): Positive integer value that specifies the desired parallelism for the job.</li>
-        </ul>
-      </td>
-    </tr>
-    <tr>
-      <td colspan="2">
-      <div class="book-expand">
-        <label>
-          <div class="book-expand-head flex justify-between">
-            <span>Request</span>
-            &nbsp;            <span>▾</span>
-          </div>
-          <input type="checkbox" class="hidden">
-          <div class="book-expand-content markdown-inner">
-          <pre>
-            <code>
-{
-  "type" : "object",
-  "id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarPlanRequestBody",
-  "properties" : {
-    "entryClass" : {
-      "type" : "string"
-    },
-    "jobId" : {
-      "type" : "any"
-    },
-    "parallelism" : {
-      "type" : "integer"
-    },
-    "programArgs" : {
-      "type" : "string"
-    },
-    "programArgsList" : {
-      "type" : "array",
-      "items" : {
-        "type" : "string"
-      }
-    }
-  }
-}            </code>
-          </pre>
-          </div>
-        </label>
-      </div>
-      </td>
-    </tr>
-    <tr>
-      <td colspan="2">
-      <div class="book-expand">
-        <label>
-          <div class="book-expand-head flex justify-between">
-            <span>Response</span>
-            &nbsp;            <span>▾</span>
-          </div>
-          <input type="checkbox" class="hidden">
-          <div class="book-expand-content markdown-inner">
-          <pre>
-            <code>
-{
-  "type" : "object",
-  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobPlanInfo",
-  "properties" : {
-    "plan" : {
-      "type" : "any"
-    }
-  }
-}            </code>
-          </pre>
-          </div>
-        </label>
-      </div>
-      </td>
-    </tr>
-  </tbody>
-</table>
 <table class="rest-api table table-bordered">
   <tbody>
     <tr>


[flink] 02/03: [FLINK-28308] Introduce metrics of the accumulated time that a running task is busy / idle / back-pressured

Posted by gu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bc2a86e5bb5f048fb5e5007f916405773a88b5cc
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Mon May 23 16:50:37 2022 +0800

    [FLINK-28308] Introduce metrics of the accumulated time that a running task is busy / idle / back-pressured
    
    This closes #20110.
---
 .../shortcodes/generated/rest_v1_dispatcher.html   | 45 ++++++++++++++++++++++
 docs/static/generated/rest_v1_dispatcher.yml       |  9 +++++
 .../src/test/resources/rest_api_v1.snapshot        | 45 ++++++++++++++++++++++
 .../flink/runtime/executiongraph/IOMetrics.java    | 37 +++++++++++++++++-
 .../apache/flink/runtime/metrics/MetricNames.java  |  3 ++
 .../apache/flink/runtime/metrics/TimerGauge.java   |  9 +++++
 .../runtime/metrics/groups/TaskIOMetricGroup.java  | 39 ++++++++++++++++++-
 .../rest/handler/job/JobDetailsHandler.java        |  5 ++-
 .../handler/job/JobVertexTaskManagersHandler.java  |  5 ++-
 .../rest/handler/util/MutableIOMetrics.java        | 32 ++++++++++++++-
 .../job/SubtaskExecutionAttemptDetailsInfo.java    |  5 ++-
 .../rest/messages/job/metrics/IOMetricsInfo.java   | 45 ++++++++++++++++++++--
 .../DefaultExecutionGraphDeploymentTest.java       |  6 +--
 .../ExecutionPartitionLifecycleTest.java           |  4 +-
 .../flink/runtime/metrics/TimerGaugeTest.java      |  4 ++
 .../metrics/groups/TaskIOMetricGroupTest.java      | 15 ++++++++
 .../SubtaskCurrentAttemptDetailsHandlerTest.java   | 28 ++++++++++++--
 .../SubtaskExecutionAttemptDetailsHandlerTest.java | 28 ++++++++++++--
 .../rest/messages/JobVertexDetailsInfoTest.java    |  5 ++-
 .../messages/JobVertexTaskManagersInfoTest.java    |  5 ++-
 .../rest/messages/job/JobDetailsInfoTest.java      |  5 ++-
 .../SubtaskExecutionAttemptDetailsInfoTest.java    |  5 ++-
 .../adaptivebatch/AdaptiveBatchSchedulerTest.java  |  2 +-
 .../flink/streaming/runtime/tasks/StreamTask.java  |  1 +
 24 files changed, 361 insertions(+), 26 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index 7638b3ee8da..4b648e77e72 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -1497,6 +1497,15 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
             "type" : "object",
             "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:IOMetricsInfo",
             "properties" : {
+              "accumulated-backpressured-time" : {
+                "type" : "integer"
+              },
+              "accumulated-busy-time" : {
+                "type" : "number"
+              },
+              "accumulated-idle-time" : {
+                "type" : "integer"
+              },
               "read-bytes" : {
                 "type" : "integer"
               },
@@ -3658,6 +3667,15 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
             "type" : "object",
             "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:IOMetricsInfo",
             "properties" : {
+              "accumulated-backpressured-time" : {
+                "type" : "integer"
+              },
+              "accumulated-busy-time" : {
+                "type" : "number"
+              },
+              "accumulated-idle-time" : {
+                "type" : "integer"
+              },
               "read-bytes" : {
                 "type" : "integer"
               },
@@ -4343,6 +4361,15 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
       "type" : "object",
       "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:IOMetricsInfo",
       "properties" : {
+        "accumulated-backpressured-time" : {
+          "type" : "integer"
+        },
+        "accumulated-busy-time" : {
+          "type" : "number"
+        },
+        "accumulated-idle-time" : {
+          "type" : "integer"
+        },
         "read-bytes" : {
           "type" : "integer"
         },
@@ -4471,6 +4498,15 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
       "type" : "object",
       "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:IOMetricsInfo",
       "properties" : {
+        "accumulated-backpressured-time" : {
+          "type" : "integer"
+        },
+        "accumulated-busy-time" : {
+          "type" : "number"
+        },
+        "accumulated-idle-time" : {
+          "type" : "integer"
+        },
         "read-bytes" : {
           "type" : "integer"
         },
@@ -4883,6 +4919,15 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
             "type" : "object",
             "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:IOMetricsInfo",
             "properties" : {
+              "accumulated-backpressured-time" : {
+                "type" : "integer"
+              },
+              "accumulated-busy-time" : {
+                "type" : "number"
+              },
+              "accumulated-idle-time" : {
+                "type" : "integer"
+              },
               "read-bytes" : {
                 "type" : "integer"
               },
diff --git a/docs/static/generated/rest_v1_dispatcher.yml b/docs/static/generated/rest_v1_dispatcher.yml
index 859783719ae..259d5b6d9ba 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -1628,6 +1628,15 @@ components:
           format: int64
         write-records-complete:
           type: boolean
+        accumulated-backpressured-time:
+          type: integer
+          format: int64
+        accumulated-idle-time:
+          type: integer
+          format: int64
+        accumulated-busy-time:
+          type: number
+          format: double
     SavepointFormatType:
       type: string
       enum:
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 537e966deff..99cd6fb0cab 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -829,6 +829,15 @@
                   },
                   "write-records-complete" : {
                     "type" : "boolean"
+                  },
+                  "accumulated-backpressured-time" : {
+                    "type" : "integer"
+                  },
+                  "accumulated-idle-time" : {
+                    "type" : "integer"
+                  },
+                  "accumulated-busy-time" : {
+                    "type" : "number"
                   }
                 }
               }
@@ -2167,6 +2176,15 @@
                   },
                   "write-records-complete" : {
                     "type" : "boolean"
+                  },
+                  "accumulated-backpressured-time" : {
+                    "type" : "integer"
+                  },
+                  "accumulated-idle-time" : {
+                    "type" : "integer"
+                  },
+                  "accumulated-busy-time" : {
+                    "type" : "number"
                   }
                 }
               },
@@ -2528,6 +2546,15 @@
             },
             "write-records-complete" : {
               "type" : "boolean"
+            },
+            "accumulated-backpressured-time" : {
+              "type" : "integer"
+            },
+            "accumulated-idle-time" : {
+              "type" : "integer"
+            },
+            "accumulated-busy-time" : {
+              "type" : "number"
             }
           }
         },
@@ -2614,6 +2641,15 @@
             },
             "write-records-complete" : {
               "type" : "boolean"
+            },
+            "accumulated-backpressured-time" : {
+              "type" : "integer"
+            },
+            "accumulated-idle-time" : {
+              "type" : "integer"
+            },
+            "accumulated-busy-time" : {
+              "type" : "number"
             }
           }
         },
@@ -2843,6 +2879,15 @@
                   },
                   "write-records-complete" : {
                     "type" : "boolean"
+                  },
+                  "accumulated-backpressured-time" : {
+                    "type" : "integer"
+                  },
+                  "accumulated-idle-time" : {
+                    "type" : "integer"
+                  },
+                  "accumulated-busy-time" : {
+                    "type" : "number"
                   }
                 }
               },
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
index b7bbf86fb3c..e612837531a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 
@@ -37,6 +38,10 @@ public class IOMetrics implements Serializable {
     protected long numBytesIn;
     protected long numBytesOut;
 
+    protected long accumulateBackPressuredTime;
+    protected double accumulateBusyTime;
+    protected long accumulateIdleTime;
+
     protected final Map<IntermediateResultPartitionID, Long> numBytesProducedOfPartitions =
             new HashMap<>();
 
@@ -45,11 +50,17 @@ public class IOMetrics implements Serializable {
             Meter recordsOut,
             Meter bytesIn,
             Meter bytesOut,
-            Map<IntermediateResultPartitionID, Counter> numBytesProducedCounters) {
+            Map<IntermediateResultPartitionID, Counter> numBytesProducedCounters,
+            Gauge<Long> accumulatedBackPressuredTime,
+            Gauge<Long> accumulatedIdleTime,
+            Gauge<Double> accumulatedBusyTime) {
         this.numRecordsIn = recordsIn.getCount();
         this.numRecordsOut = recordsOut.getCount();
         this.numBytesIn = bytesIn.getCount();
         this.numBytesOut = bytesOut.getCount();
+        this.accumulateBackPressuredTime = accumulatedBackPressuredTime.getValue();
+        this.accumulateBusyTime = accumulatedBusyTime.getValue();
+        this.accumulateIdleTime = accumulatedIdleTime.getValue();
 
         for (Map.Entry<IntermediateResultPartitionID, Counter> counter :
                 numBytesProducedCounters.entrySet()) {
@@ -57,11 +68,21 @@ public class IOMetrics implements Serializable {
         }
     }
 
-    public IOMetrics(long numBytesIn, long numBytesOut, long numRecordsIn, long numRecordsOut) {
+    public IOMetrics(
+            long numBytesIn,
+            long numBytesOut,
+            long numRecordsIn,
+            long numRecordsOut,
+            long accumulateIdleTime,
+            long accumulateBusyTime,
+            long accumulateBackPressuredTime) {
         this.numBytesIn = numBytesIn;
         this.numBytesOut = numBytesOut;
         this.numRecordsIn = numRecordsIn;
         this.numRecordsOut = numRecordsOut;
+        this.accumulateIdleTime = accumulateIdleTime;
+        this.accumulateBusyTime = accumulateBusyTime;
+        this.accumulateBackPressuredTime = accumulateBackPressuredTime;
     }
 
     public long getNumRecordsIn() {
@@ -80,6 +101,18 @@ public class IOMetrics implements Serializable {
         return numBytesOut;
     }
 
+    public double getAccumulateBusyTime() {
+        return accumulateBusyTime;
+    }
+
+    public long getAccumulateBackPressuredTime() {
+        return accumulateBackPressuredTime;
+    }
+
+    public long getAccumulateIdleTime() {
+        return accumulateIdleTime;
+    }
+
     public Map<IntermediateResultPartitionID, Long> getNumBytesProducedOfPartitions() {
         return numBytesProducedOfPartitions;
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
index 762c6671791..1f083520f01 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
@@ -72,6 +72,9 @@ public class MetricNames {
     public static final String TASK_IDLE_TIME = "idleTimeMs" + SUFFIX_RATE;
     public static final String TASK_BUSY_TIME = "busyTimeMs" + SUFFIX_RATE;
     public static final String TASK_BACK_PRESSURED_TIME = "backPressuredTimeMs" + SUFFIX_RATE;
+    public static final String ACC_TASK_IDLE_TIME = "accumulateIdleTimeMs";
+    public static final String ACC_TASK_BUSY_TIME = "accumulateBusyTimeMs";
+    public static final String ACC_TASK_BACK_PRESSURED_TIME = "accumulateBackPressuredTimeMs";
     public static final String TASK_SOFT_BACK_PRESSURED_TIME =
             "softBackPressuredTimeMs" + SUFFIX_RATE;
     public static final String TASK_HARD_BACK_PRESSURED_TIME =
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java
index d7f81d06c6a..21819c3d301 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java
@@ -47,6 +47,8 @@ public class TimerGauge implements Gauge<Long>, View {
     private long previousMaxSingleMeasurement;
     private long currentMaxSingleMeasurement;
 
+    private long accumulatedCount;
+
     public TimerGauge() {
         this(SystemClock.getInstance());
     }
@@ -66,6 +68,7 @@ public class TimerGauge implements Gauge<Long>, View {
         if (currentMeasurementStartTS != 0) {
             long currentMeasurement = clock.absoluteTimeMillis() - currentMeasurementStartTS;
             currentCount += currentMeasurement;
+            accumulatedCount += currentMeasurement;
             currentMaxSingleMeasurement = Math.max(currentMaxSingleMeasurement, currentMeasurement);
             currentUpdateTS = 0;
             currentMeasurementStartTS = 0;
@@ -79,6 +82,7 @@ public class TimerGauge implements Gauge<Long>, View {
             // we adding to the current count only the time elapsed since last markStart or update
             // call
             currentCount += now - currentUpdateTS;
+            accumulatedCount += now - currentUpdateTS;
             currentUpdateTS = now;
             // on the other hand, max measurement has to be always checked against last markStart
             // call
@@ -104,6 +108,11 @@ public class TimerGauge implements Gauge<Long>, View {
         return previousMaxSingleMeasurement;
     }
 
+    /** @return the accumulated period by the given * TimerGauge. */
+    public synchronized long getAccumulatedCount() {
+        return accumulatedCount;
+    }
+
     @VisibleForTesting
     public synchronized long getCount() {
         return currentCount;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index 0b580d9453e..17dca7b97b7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -60,12 +60,17 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
     private final TimerGauge hardBackPressuredTimePerSecond;
     private final Gauge<Long> maxSoftBackPressuredTime;
     private final Gauge<Long> maxHardBackPressuredTime;
+    private final Gauge<Long> accumulatedBackPressuredTime;
+    private final Gauge<Long> accumulatedIdleTime;
+    private final Gauge<Double> accumulatedBusyTime;
     private final Meter mailboxThroughput;
     private final Histogram mailboxLatency;
     private final SizeGauge mailboxSize;
 
     private volatile boolean busyTimeEnabled;
 
+    private long taskStartTime;
+
     private final Map<IntermediateResultPartitionID, Counter> numBytesProducedOfPartitions =
             new HashMap<>();
 
@@ -107,6 +112,15 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
 
         this.busyTimePerSecond = gauge(MetricNames.TASK_BUSY_TIME, this::getBusyTimePerSecond);
 
+        this.accumulatedBusyTime =
+                gauge(MetricNames.ACC_TASK_BUSY_TIME, this::getAccumulatedBusyTime);
+        this.accumulatedBackPressuredTime =
+                gauge(
+                        MetricNames.ACC_TASK_BACK_PRESSURED_TIME,
+                        this::getAccumulatedBackPressuredTimeMs);
+        this.accumulatedIdleTime =
+                gauge(MetricNames.ACC_TASK_IDLE_TIME, idleTimePerSecond::getAccumulatedCount);
+
         this.numMailsProcessed = new SimpleCounter();
         this.mailboxThroughput =
                 meter(MetricNames.MAILBOX_THROUGHPUT, new MeterView(numMailsProcessed));
@@ -121,7 +135,10 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
                 numRecordsOutRate,
                 numBytesInRate,
                 numBytesOutRate,
-                numBytesProducedOfPartitions);
+                numBytesProducedOfPartitions,
+                accumulatedBackPressuredTime,
+                accumulatedIdleTime,
+                accumulatedBusyTime);
     }
 
     // ============================================================================================
@@ -169,6 +186,15 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
                 + getHardBackPressuredTimePerSecond().getValue();
     }
 
+    public long getAccumulatedBackPressuredTimeMs() {
+        return getSoftBackPressuredTimePerSecond().getAccumulatedCount()
+                + getHardBackPressuredTimePerSecond().getAccumulatedCount();
+    }
+
+    public void markTaskStart() {
+        this.taskStartTime = System.currentTimeMillis();
+    }
+
     public void setEnableBusyTime(boolean enabled) {
         busyTimeEnabled = enabled;
     }
@@ -178,6 +204,17 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
         return busyTimeEnabled ? 1000.0 - Math.min(busyTime, 1000.0) : Double.NaN;
     }
 
+    private double getAccumulatedBusyTime() {
+        return busyTimeEnabled
+                ? Math.max(
+                        System.currentTimeMillis()
+                                - taskStartTime
+                                - idleTimePerSecond.getAccumulatedCount()
+                                - getAccumulatedBackPressuredTimeMs(),
+                        0)
+                : Double.NaN;
+    }
+
     public Meter getMailboxThroughput() {
         return mailboxThroughput;
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
index 68b16fd8652..4f850bc0712 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
@@ -218,7 +218,10 @@ public class JobDetailsHandler
                         counts.getNumRecordsIn(),
                         counts.isNumRecordsInComplete(),
                         counts.getNumRecordsOut(),
-                        counts.isNumRecordsOutComplete());
+                        counts.isNumRecordsOutComplete(),
+                        counts.getAccumulateBackPressuredTime(),
+                        counts.getAccumulateIdleTime(),
+                        counts.getAccumulateBusyTime());
 
         return new JobDetailsInfo.JobVertexDetailsInfo(
                 ejv.getJobVertexId(),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
index d1ed4030443..02df59952a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
@@ -206,7 +206,10 @@ public class JobVertexTaskManagersHandler
                             counts.getNumRecordsIn(),
                             counts.isNumRecordsInComplete(),
                             counts.getNumRecordsOut(),
-                            counts.isNumRecordsOutComplete());
+                            counts.isNumRecordsOutComplete(),
+                            counts.getAccumulateBackPressuredTime(),
+                            counts.getAccumulateIdleTime(),
+                            counts.getAccumulateBusyTime());
 
             Map<ExecutionState, Integer> statusCounts =
                     new HashMap<>(ExecutionState.values().length);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
index b5dac6ce5ad..7da9061a090 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
@@ -47,7 +47,7 @@ public class MutableIOMetrics extends IOMetrics {
     private boolean numRecordsOutComplete = true;
 
     public MutableIOMetrics() {
-        super(0, 0, 0, 0);
+        super(0, 0, 0, 0, 0, 0, 0);
     }
 
     public boolean isNumBytesInComplete() {
@@ -86,6 +86,13 @@ public class MutableIOMetrics extends IOMetrics {
                 this.numBytesOut += ioMetrics.getNumBytesOut();
                 this.numRecordsIn += ioMetrics.getNumRecordsIn();
                 this.numRecordsOut += ioMetrics.getNumRecordsOut();
+                this.accumulateBackPressuredTime += ioMetrics.getAccumulateBackPressuredTime();
+                this.accumulateIdleTime += ioMetrics.getAccumulateIdleTime();
+                if (Double.isNaN(ioMetrics.getAccumulateBusyTime())) {
+                    this.accumulateBusyTime = Double.NaN;
+                } else {
+                    this.accumulateBusyTime += ioMetrics.getAccumulateBusyTime();
+                }
             }
         } else { // execAttempt is still running, use MetricQueryService instead
             if (fetcher != null) {
@@ -127,6 +134,29 @@ public class MutableIOMetrics extends IOMetrics {
                         this.numRecordsOut +=
                                 Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT));
                     }
+
+                    if (metrics.getMetric(MetricNames.ACC_TASK_BACK_PRESSURED_TIME) != null) {
+                        this.accumulateBackPressuredTime +=
+                                Long.parseLong(
+                                        metrics.getMetric(
+                                                MetricNames.ACC_TASK_BACK_PRESSURED_TIME));
+                    }
+
+                    if (metrics.getMetric(MetricNames.ACC_TASK_IDLE_TIME) != null) {
+                        this.accumulateIdleTime +=
+                                Long.parseLong(metrics.getMetric(MetricNames.ACC_TASK_IDLE_TIME));
+                    }
+
+                    if (metrics.getMetric(MetricNames.ACC_TASK_BUSY_TIME) != null) {
+                        double busyTime =
+                                Double.parseDouble(
+                                        metrics.getMetric(MetricNames.ACC_TASK_BUSY_TIME));
+                        if (Double.isNaN(busyTime)) {
+                            this.accumulateBusyTime = Double.NaN;
+                        } else {
+                            this.accumulateBusyTime += busyTime;
+                        }
+                    }
                 } else {
                     this.numBytesInComplete = false;
                     this.numBytesOutComplete = false;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
index 42718d0070d..637538f1f48 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
@@ -226,7 +226,10 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
                         ioMetrics.getNumRecordsIn(),
                         ioMetrics.isNumRecordsInComplete(),
                         ioMetrics.getNumRecordsOut(),
-                        ioMetrics.isNumRecordsOutComplete());
+                        ioMetrics.isNumRecordsOutComplete(),
+                        ioMetrics.getAccumulateBackPressuredTime(),
+                        ioMetrics.getAccumulateIdleTime(),
+                        ioMetrics.getAccumulateBusyTime());
 
         return new SubtaskExecutionAttemptDetailsInfo(
                 execution.getParallelSubtaskIndex(),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java
index 29e3e62c8b7..02c37152fe8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java
@@ -42,6 +42,12 @@ public final class IOMetricsInfo {
 
     private static final String FIELD_NAME_RECORDS_WRITTEN_COMPLETE = "write-records-complete";
 
+    private static final String FIELD_NAME_ACC_BACK_PRESSURE = "accumulated-backpressured-time";
+
+    private static final String FIELD_NAME_ACC_IDLE = "accumulated-idle-time";
+
+    private static final String FIELD_NAME_ACC_BUSY = "accumulated-busy-time";
+
     @JsonProperty(FIELD_NAME_BYTES_READ)
     private final long bytesRead;
 
@@ -66,6 +72,15 @@ public final class IOMetricsInfo {
     @JsonProperty(FIELD_NAME_RECORDS_WRITTEN_COMPLETE)
     private final boolean recordsWrittenComplete;
 
+    @JsonProperty(FIELD_NAME_ACC_BACK_PRESSURE)
+    private final long accumulatedBackpressured;
+
+    @JsonProperty(FIELD_NAME_ACC_IDLE)
+    private final long accumulatedIdle;
+
+    @JsonProperty(FIELD_NAME_ACC_BUSY)
+    private final double accumulatedBusy;
+
     @JsonCreator
     public IOMetricsInfo(
             @JsonProperty(FIELD_NAME_BYTES_READ) long bytesRead,
@@ -75,7 +90,10 @@ public final class IOMetricsInfo {
             @JsonProperty(FIELD_NAME_RECORDS_READ) long recordsRead,
             @JsonProperty(FIELD_NAME_RECORDS_READ_COMPLETE) boolean recordsReadComplete,
             @JsonProperty(FIELD_NAME_RECORDS_WRITTEN) long recordsWritten,
-            @JsonProperty(FIELD_NAME_RECORDS_WRITTEN_COMPLETE) boolean recordsWrittenComplete) {
+            @JsonProperty(FIELD_NAME_RECORDS_WRITTEN_COMPLETE) boolean recordsWrittenComplete,
+            @JsonProperty(FIELD_NAME_ACC_BACK_PRESSURE) long accumulatedBackpressured,
+            @JsonProperty(FIELD_NAME_ACC_IDLE) long accumulatedIdle,
+            @JsonProperty(FIELD_NAME_ACC_BUSY) double accumulatedBusy) {
         this.bytesRead = bytesRead;
         this.bytesReadComplete = bytesReadComplete;
         this.bytesWritten = bytesWritten;
@@ -84,6 +102,9 @@ public final class IOMetricsInfo {
         this.recordsReadComplete = recordsReadComplete;
         this.recordsWritten = recordsWritten;
         this.recordsWrittenComplete = recordsWrittenComplete;
+        this.accumulatedBackpressured = accumulatedBackpressured;
+        this.accumulatedIdle = accumulatedIdle;
+        this.accumulatedBusy = accumulatedBusy;
     }
 
     public long getBytesRead() {
@@ -118,6 +139,18 @@ public final class IOMetricsInfo {
         return recordsWrittenComplete;
     }
 
+    public long getAccumulatedBackpressured() {
+        return accumulatedBackpressured;
+    }
+
+    public double getAccumulatedBusy() {
+        return accumulatedBusy;
+    }
+
+    public long getAccumulatedIdle() {
+        return accumulatedIdle;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -134,7 +167,10 @@ public final class IOMetricsInfo {
                 && recordsRead == that.recordsRead
                 && recordsReadComplete == that.recordsReadComplete
                 && recordsWritten == that.recordsWritten
-                && recordsWrittenComplete == that.recordsWrittenComplete;
+                && recordsWrittenComplete == that.recordsWrittenComplete
+                && accumulatedBackpressured == that.accumulatedBackpressured
+                && accumulatedBusy == that.accumulatedBusy
+                && accumulatedIdle == that.accumulatedIdle;
     }
 
     @Override
@@ -147,6 +183,9 @@ public final class IOMetricsInfo {
                 recordsRead,
                 recordsReadComplete,
                 recordsWritten,
-                recordsWrittenComplete);
+                recordsWrittenComplete,
+                accumulatedBackpressured,
+                accumulatedBusy,
+                accumulatedIdle);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
index f5c222e91c5..a2abb0b5901 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
@@ -344,7 +344,7 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger {
         // verify behavior for canceled executions
         Execution execution1 = executions.values().iterator().next();
 
-        IOMetrics ioMetrics = new IOMetrics(0, 0, 0, 0);
+        IOMetrics ioMetrics = new IOMetrics(0, 0, 0, 0, 0, 0, 0);
         Map<String, Accumulator<?, ?>> accumulators = new HashMap<>();
         accumulators.put("acc", new IntCounter(4));
         AccumulatorSnapshot accumulatorSnapshot =
@@ -367,7 +367,7 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger {
         // verify behavior for failed executions
         Execution execution2 = executions.values().iterator().next();
 
-        IOMetrics ioMetrics2 = new IOMetrics(0, 0, 0, 0);
+        IOMetrics ioMetrics2 = new IOMetrics(0, 0, 0, 0, 0, 0, 0);
         Map<String, Accumulator<?, ?>> accumulators2 = new HashMap<>();
         accumulators2.put("acc", new IntCounter(8));
         AccumulatorSnapshot accumulatorSnapshot2 =
@@ -405,7 +405,7 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger {
         Map<ExecutionAttemptID, Execution> executions =
                 scheduler.getExecutionGraph().getRegisteredExecutions();
 
-        IOMetrics ioMetrics = new IOMetrics(0, 0, 0, 0);
+        IOMetrics ioMetrics = new IOMetrics(0, 0, 0, 0, 0, 0, 0);
         Map<String, Accumulator<?, ?>> accumulators = Collections.emptyMap();
 
         Execution execution1 = executions.values().iterator().next();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
index 62f907f78cc..df765c69cd7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
@@ -159,7 +159,7 @@ public class ExecutionPartitionLifecycleTest extends TestLogger {
                 execution -> {
                     execution.cancel();
                     execution.completeCancelling(
-                            Collections.emptyMap(), new IOMetrics(0, 0, 0, 0), false);
+                            Collections.emptyMap(), new IOMetrics(0, 0, 0, 0, 0, 0, 0), false);
                 },
                 PartitionReleaseResult.STOP_TRACKING);
     }
@@ -182,7 +182,7 @@ public class ExecutionPartitionLifecycleTest extends TestLogger {
                                 new Exception("Test exception"),
                                 false,
                                 Collections.emptyMap(),
-                                new IOMetrics(0, 0, 0, 0),
+                                new IOMetrics(0, 0, 0, 0, 0, 0, 0),
                                 false,
                                 true),
                 PartitionReleaseResult.STOP_TRACKING);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TimerGaugeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TimerGaugeTest.java
index f066e776524..0ddbde97637 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TimerGaugeTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TimerGaugeTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 /** Tests for {@link TimerGauge}. */
@@ -49,6 +50,7 @@ public class TimerGaugeTest {
         gauge.update();
         assertThat(gauge.getValue(), is(0L));
         assertThat(gauge.getMaxSingleMeasurement(), is(0L));
+        assertEquals(gauge.getAccumulatedCount(), 0L);
 
         gauge.markStart();
         clock.advanceTime(SLEEP, TimeUnit.MILLISECONDS);
@@ -57,6 +59,7 @@ public class TimerGaugeTest {
 
         assertThat(gauge.getValue(), greaterThanOrEqualTo(SLEEP / View.UPDATE_INTERVAL_SECONDS));
         assertThat(gauge.getMaxSingleMeasurement(), is(SLEEP));
+        assertEquals(gauge.getAccumulatedCount(), SLEEP);
 
         // Check that the getMaxSingleMeasurement can go down after an update
         gauge.markStart();
@@ -65,6 +68,7 @@ public class TimerGaugeTest {
         gauge.update();
 
         assertThat(gauge.getMaxSingleMeasurement(), is(SLEEP / 2));
+        assertEquals(gauge.getAccumulatedCount(), SLEEP + SLEEP / 2);
     }
 
     @Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
index 7acc04bc604..71a978636bd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
@@ -38,6 +38,8 @@ public class TaskIOMetricGroupTest {
     public void testTaskIOMetricGroup() throws InterruptedException {
         TaskMetricGroup task = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
         TaskIOMetricGroup taskIO = task.getIOMetricGroup();
+        taskIO.setEnableBusyTime(true);
+        final long startTime = System.currentTimeMillis();
 
         // test counter forwarding
         assertNotNull(taskIO.getNumRecordsInCounter());
@@ -75,6 +77,19 @@ public class TaskIOMetricGroupTest {
         assertEquals(100L, io.getNumBytesIn());
         assertEquals(250L, io.getNumBytesOut());
         assertEquals(3L, taskIO.getNumBuffersOutCounter().getCount());
+        assertEquals(
+                taskIO.getIdleTimeMsPerSecond().getAccumulatedCount(), io.getAccumulateIdleTime());
+        assertEquals(
+                taskIO.getHardBackPressuredTimePerSecond().getAccumulatedCount()
+                        + taskIO.getSoftBackPressuredTimePerSecond().getAccumulatedCount(),
+                io.getAccumulateBackPressuredTime());
+        assertThat(
+                io.getAccumulateBusyTime(),
+                greaterThanOrEqualTo(
+                        (double) System.currentTimeMillis()
+                                - startTime
+                                - io.getAccumulateIdleTime()
+                                - io.getAccumulateBackPressuredTime()));
         assertThat(taskIO.getIdleTimeMsPerSecond().getCount(), greaterThanOrEqualTo(softSleepTime));
         assertThat(
                 taskIO.getSoftBackPressuredTimePerSecond().getCount(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
index 5ac759f7396..483ad2facf1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -72,8 +72,19 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
         final long bytesOut = 10L;
         final long recordsIn = 20L;
         final long recordsOut = 30L;
-
-        final IOMetrics ioMetrics = new IOMetrics(bytesIn, bytesOut, recordsIn, recordsOut);
+        final long accumulateIdleTime = 40L;
+        final long accumulateBusyTime = 50L;
+        final long accumulateBackPressuredTime = 60L;
+
+        final IOMetrics ioMetrics =
+                new IOMetrics(
+                        bytesIn,
+                        bytesOut,
+                        recordsIn,
+                        recordsOut,
+                        accumulateIdleTime,
+                        accumulateBusyTime,
+                        accumulateBackPressuredTime);
 
         final long[] timestamps = new long[ExecutionState.values().length];
         timestamps[ExecutionState.DEPLOYING.ordinal()] = deployingTs;
@@ -146,7 +157,18 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
 
         // Verify
         final IOMetricsInfo ioMetricsInfo =
-                new IOMetricsInfo(bytesIn, true, bytesOut, true, recordsIn, true, recordsOut, true);
+                new IOMetricsInfo(
+                        bytesIn,
+                        true,
+                        bytesOut,
+                        true,
+                        recordsIn,
+                        true,
+                        recordsOut,
+                        true,
+                        accumulateBackPressuredTime,
+                        accumulateIdleTime,
+                        accumulateBusyTime);
 
         final SubtaskExecutionAttemptDetailsInfo expectedDetailsInfo =
                 new SubtaskExecutionAttemptDetailsInfo(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
index 69a8e2b051c..4a288e738be 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -77,8 +77,19 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
         final long bytesOut = 10L;
         final long recordsIn = 20L;
         final long recordsOut = 30L;
-
-        final IOMetrics ioMetrics = new IOMetrics(bytesIn, bytesOut, recordsIn, recordsOut);
+        final long accumulateIdleTime = 40L;
+        final long accumulateBusyTime = 50L;
+        final long accumulateBackPressuredTime = 60L;
+
+        final IOMetrics ioMetrics =
+                new IOMetrics(
+                        bytesIn,
+                        bytesOut,
+                        recordsIn,
+                        recordsOut,
+                        accumulateIdleTime,
+                        accumulateBusyTime,
+                        accumulateBackPressuredTime);
 
         final ArchivedExecutionJobVertex archivedExecutionJobVertex =
                 new ArchivedExecutionJobVertex(
@@ -151,7 +162,18 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
 
         // Verify
         final IOMetricsInfo ioMetricsInfo =
-                new IOMetricsInfo(bytesIn, true, bytesOut, true, recordsIn, true, recordsOut, true);
+                new IOMetricsInfo(
+                        bytesIn,
+                        true,
+                        bytesOut,
+                        true,
+                        recordsIn,
+                        true,
+                        recordsOut,
+                        true,
+                        accumulateBackPressuredTime,
+                        accumulateIdleTime,
+                        accumulateBusyTime);
 
         final SubtaskExecutionAttemptDetailsInfo expectedDetailsInfo =
                 new SubtaskExecutionAttemptDetailsInfo(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java
index 707aa777542..806ce560494 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java
@@ -47,7 +47,10 @@ public class JobVertexDetailsInfoTest
                         random.nextLong(),
                         random.nextBoolean(),
                         random.nextLong(),
-                        random.nextBoolean());
+                        random.nextBoolean(),
+                        Math.abs(random.nextLong()),
+                        Math.abs(random.nextLong()),
+                        Math.abs(random.nextDouble()));
         List<SubtaskExecutionAttemptDetailsInfo> vertexTaskDetailList = new ArrayList<>();
         vertexTaskDetailList.add(
                 new SubtaskExecutionAttemptDetailsInfo(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfoTest.java
index cf7aad02879..d988ee017a8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfoTest.java
@@ -54,7 +54,10 @@ public class JobVertexTaskManagersInfoTest
                         random.nextLong(),
                         random.nextBoolean(),
                         random.nextLong(),
-                        random.nextBoolean());
+                        random.nextBoolean(),
+                        Math.abs(random.nextLong()),
+                        Math.abs(random.nextLong()),
+                        Math.abs(random.nextDouble()));
         int count = 100;
         for (ExecutionState executionState : ExecutionState.values()) {
             statusCounts.put(executionState, count++);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
index e867d2720d8..8c207c5b069 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
@@ -91,7 +91,10 @@ public class JobDetailsInfoTest extends RestResponseMarshallingTestBase<JobDetai
                         random.nextLong(),
                         random.nextBoolean(),
                         random.nextLong(),
-                        random.nextBoolean());
+                        random.nextBoolean(),
+                        Math.abs(random.nextLong()),
+                        Math.abs(random.nextLong()),
+                        Math.abs(random.nextDouble()));
 
         for (ExecutionState executionState : ExecutionState.values()) {
             tasksPerState.put(executionState, random.nextInt());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java
index e7ea728507d..f022d6ced26 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java
@@ -46,7 +46,10 @@ public class SubtaskExecutionAttemptDetailsInfoTest
                         Math.abs(random.nextLong()),
                         random.nextBoolean(),
                         Math.abs(random.nextLong()),
-                        random.nextBoolean());
+                        random.nextBoolean(),
+                        Math.abs(random.nextLong()),
+                        Math.abs(random.nextLong()),
+                        Math.abs(random.nextDouble()));
 
         return new SubtaskExecutionAttemptDetailsInfo(
                 Math.abs(random.nextInt()),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
index bf330d9edf3..e8636ea8435 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
@@ -133,7 +133,7 @@ public class AdaptiveBatchSchedulerTest extends TestLogger {
                             state,
                             null,
                             null,
-                            new IOMetrics(0, 0, 0, 0)));
+                            new IOMetrics(0, 0, 0, 0, 0, 0, 0)));
         }
     }
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 55a2f527a10..19bc707a363 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -772,6 +772,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
         scheduleBufferDebloater();
 
         // let the task do its work
+        getEnvironment().getMetricGroup().getIOMetricGroup().markTaskStart();
         runMailboxLoop();
 
         // if this left the run() method cleanly despite the fact that this was canceled,


[flink] 03/03: [FLINK-28309][rest] Introduce metrics of the duration that a task stays in each status

Posted by gu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bb8e1d14f05aca186ec874437eba3d44fbb3bd97
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Tue May 24 11:25:17 2022 +0800

    [FLINK-28309][rest] Introduce metrics of the duration that a task stays in each status
    
    This closes #20111.
---
 .../shortcodes/generated/rest_v1_dispatcher.html   | 18 +++++
 docs/static/generated/rest_v1_dispatcher.yml       |  5 ++
 .../src/test/resources/rest_api_v1.snapshot        | 18 +++++
 .../runtime/executiongraph/AccessExecution.java    | 15 +++++
 .../runtime/executiongraph/ArchivedExecution.java  | 19 +++++-
 .../flink/runtime/executiongraph/Execution.java    | 29 +++++++-
 .../job/SubtaskExecutionAttemptDetailsInfo.java    | 77 ++++++++++++++++++++--
 .../executiongraph/ArchivedExecutionGraphTest.java | 30 +++++++++
 .../executiongraph/ExecutionHistoryTest.java       |  1 +
 .../rest/handler/job/JobExceptionsHandlerTest.java |  4 +-
 .../SubtaskCurrentAttemptDetailsHandlerTest.java   | 16 ++++-
 ...askExecutionAttemptAccumulatorsHandlerTest.java |  1 +
 .../SubtaskExecutionAttemptDetailsHandlerTest.java | 12 +++-
 .../rest/messages/JobVertexDetailsInfoTest.java    | 10 ++-
 .../SubtaskExecutionAttemptDetailsInfoTest.java    | 12 +++-
 .../exceptionhistory/TestingAccessExecution.java   | 10 +++
 16 files changed, 260 insertions(+), 17 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index 4b648e77e72..3f56fcef74b 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -3712,6 +3712,12 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
             "type" : "string",
             "enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "INITIALIZING" ]
           },
+          "status-duration" : {
+            "type" : "object",
+            "additionalProperties" : {
+              "type" : "integer"
+            }
+          },
           "subtask" : {
             "type" : "integer"
           },
@@ -4406,6 +4412,12 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
       "type" : "string",
       "enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "INITIALIZING" ]
     },
+    "status-duration" : {
+      "type" : "object",
+      "additionalProperties" : {
+        "type" : "integer"
+      }
+    },
     "subtask" : {
       "type" : "integer"
     },
@@ -4543,6 +4555,12 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
       "type" : "string",
       "enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "INITIALIZING" ]
     },
+    "status-duration" : {
+      "type" : "object",
+      "additionalProperties" : {
+        "type" : "integer"
+      }
+    },
     "subtask" : {
       "type" : "integer"
     },
diff --git a/docs/static/generated/rest_v1_dispatcher.yml b/docs/static/generated/rest_v1_dispatcher.yml
index 259d5b6d9ba..4b85119cd68 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -1687,6 +1687,11 @@ components:
           $ref: '#/components/schemas/IOMetricsInfo'
         taskmanager-id:
           type: string
+        status-duration:
+          type: object
+          additionalProperties:
+            type: integer
+            format: int64
     JobResult:
       type: object
       properties:
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 99cd6fb0cab..30cbe20f7d1 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -2191,6 +2191,12 @@
               "taskmanager-id" : {
                 "type" : "string"
               },
+              "status-duration" : {
+                "type" : "object",
+                "additionalProperties" : {
+                  "type" : "integer"
+                }
+              },
               "start_time" : {
                 "type" : "integer"
               }
@@ -2561,6 +2567,12 @@
         "taskmanager-id" : {
           "type" : "string"
         },
+        "status-duration" : {
+          "type" : "object",
+          "additionalProperties" : {
+            "type" : "integer"
+          }
+        },
         "start_time" : {
           "type" : "integer"
         }
@@ -2656,6 +2668,12 @@
         "taskmanager-id" : {
           "type" : "string"
         },
+        "status-duration" : {
+          "type" : "object",
+          "additionalProperties" : {
+            "type" : "integer"
+          }
+        },
         "start_time" : {
           "type" : "integer"
         }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
index b0da913b4dd..c8a5dcf7f00 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
@@ -47,6 +47,13 @@ public interface AccessExecution {
      */
     long[] getStateTimestamps();
 
+    /**
+     * Returns the end timestamps for every {@link ExecutionState}.
+     *
+     * @return timestamps for each state
+     */
+    long[] getStateEndTimestamps();
+
     /**
      * Returns the current {@link ExecutionState} for this execution.
      *
@@ -79,6 +86,14 @@ public interface AccessExecution {
      */
     long getStateTimestamp(ExecutionState state);
 
+    /**
+     * Returns the end timestamp for the given {@link ExecutionState}.
+     *
+     * @param state state for which the timestamp should be returned
+     * @return timestamp for the given state
+     */
+    long getStateEndTimestamp(ExecutionState state);
+
     /**
      * Returns the user-defined accumulators as strings.
      *
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
index a270d072c4d..7cb2d8a39da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
@@ -37,6 +37,8 @@ public class ArchivedExecution implements AccessExecution, Serializable {
 
     private final long[] stateTimestamps;
 
+    private final long[] stateEndTimestamps;
+
     private final ExecutionState state;
 
     @Nullable private final ErrorInfo failureInfo; // once assigned, never changes
@@ -59,7 +61,8 @@ public class ArchivedExecution implements AccessExecution, Serializable {
                 execution.getFailureInfo().orElse(null),
                 execution.getAssignedResourceLocation(),
                 execution.getAssignedAllocationID(),
-                execution.getStateTimestamps());
+                execution.getStateTimestamps(),
+                execution.getStateEndTimestamps());
     }
 
     public ArchivedExecution(
@@ -70,7 +73,8 @@ public class ArchivedExecution implements AccessExecution, Serializable {
             @Nullable ErrorInfo failureCause,
             TaskManagerLocation assignedResourceLocation,
             AllocationID assignedAllocationID,
-            long[] stateTimestamps) {
+            long[] stateTimestamps,
+            long[] stateEndTimestamps) {
         this.userAccumulators = userAccumulators;
         this.ioMetrics = ioMetrics;
         this.failureInfo = failureCause;
@@ -78,6 +82,7 @@ public class ArchivedExecution implements AccessExecution, Serializable {
         this.attemptId = attemptId;
         this.state = state;
         this.stateTimestamps = stateTimestamps;
+        this.stateEndTimestamps = stateEndTimestamps;
         this.assignedAllocationID = assignedAllocationID;
     }
 
@@ -100,6 +105,11 @@ public class ArchivedExecution implements AccessExecution, Serializable {
         return stateTimestamps;
     }
 
+    @Override
+    public long[] getStateEndTimestamps() {
+        return stateEndTimestamps;
+    }
+
     @Override
     public ExecutionState getState() {
         return state;
@@ -124,6 +134,11 @@ public class ArchivedExecution implements AccessExecution, Serializable {
         return this.stateTimestamps[state.ordinal()];
     }
 
+    @Override
+    public long getStateEndTimestamp(ExecutionState state) {
+        return this.stateEndTimestamps[state.ordinal()];
+    }
+
     @Override
     public StringifiedAccumulatorResult[] getUserAccumulatorsStringified() {
         return userAccumulators;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 79c2cb9602f..459d9861980 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -133,6 +133,12 @@ public class Execution
      */
     private final long[] stateTimestamps;
 
+    /**
+     * The end timestamps when state transitions occurred, indexed by {@link
+     * ExecutionState#ordinal()}.
+     */
+    private final long[] stateEndTimestamps;
+
     private final Time rpcTimeout;
 
     private final Collection<PartitionInfo> partitionInfos;
@@ -211,6 +217,7 @@ public class Execution
         this.rpcTimeout = checkNotNull(rpcTimeout);
 
         this.stateTimestamps = new long[ExecutionState.values().length];
+        this.stateEndTimestamps = new long[ExecutionState.values().length];
         markTimestamp(CREATED, startTimestamp);
 
         this.partitionInfos = new ArrayList<>(16);
@@ -337,11 +344,21 @@ public class Execution
         return stateTimestamps;
     }
 
+    @Override
+    public long[] getStateEndTimestamps() {
+        return stateEndTimestamps;
+    }
+
     @Override
     public long getStateTimestamp(ExecutionState state) {
         return this.stateTimestamps[state.ordinal()];
     }
 
+    @Override
+    public long getStateEndTimestamp(ExecutionState state) {
+        return this.stateEndTimestamps[state.ordinal()];
+    }
+
     public boolean isFinished() {
         return state.isTerminal();
     }
@@ -1405,7 +1422,7 @@ public class Execution
 
         if (state == currentState) {
             state = targetState;
-            markTimestamp(targetState);
+            markTimestamp(currentState, targetState);
 
             if (error == null) {
                 LOG.info(
@@ -1454,14 +1471,20 @@ public class Execution
         }
     }
 
-    private void markTimestamp(ExecutionState state) {
-        markTimestamp(state, System.currentTimeMillis());
+    private void markTimestamp(ExecutionState currentState, ExecutionState targetState) {
+        long now = System.currentTimeMillis();
+        markTimestamp(targetState, now);
+        markEndTimestamp(currentState, now);
     }
 
     private void markTimestamp(ExecutionState state, long timestamp) {
         this.stateTimestamps[state.ordinal()] = timestamp;
     }
 
+    private void markEndTimestamp(ExecutionState state, long timestamp) {
+        this.stateEndTimestamps[state.ordinal()] = timestamp;
+    }
+
     public String getVertexWithAttempt() {
         return vertex.getTaskNameWithSubtaskIndex() + " - execution #" + getAttemptNumber();
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
index 637538f1f48..a1a3115369c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
@@ -36,6 +36,8 @@ import io.swagger.v3.oas.annotations.Hidden;
 
 import javax.annotation.Nullable;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Objects;
 
 /** The sub task execution attempt response. */
@@ -61,6 +63,8 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
 
     public static final String FIELD_NAME_TASKMANAGER_ID = "taskmanager-id";
 
+    public static final String FIELD_NAME_STATUS_DURATION = "status-duration";
+
     @JsonProperty(FIELD_NAME_SUBTASK_INDEX)
     private final int subtaskIndex;
 
@@ -92,6 +96,9 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
     @JsonProperty(FIELD_NAME_TASKMANAGER_ID)
     private final String taskmanagerId;
 
+    @JsonProperty(FIELD_NAME_STATUS_DURATION)
+    private final Map<ExecutionState, Long> statusDuration;
+
     @JsonCreator
     public SubtaskExecutionAttemptDetailsInfo(
             @JsonProperty(FIELD_NAME_SUBTASK_INDEX) int subtaskIndex,
@@ -102,7 +109,8 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
             @JsonProperty(FIELD_NAME_END_TIME) long endTime,
             @JsonProperty(FIELD_NAME_DURATION) long duration,
             @JsonProperty(FIELD_NAME_METRICS) IOMetricsInfo ioMetricsInfo,
-            @JsonProperty(FIELD_NAME_TASKMANAGER_ID) String taskmanagerId) {
+            @JsonProperty(FIELD_NAME_TASKMANAGER_ID) String taskmanagerId,
+            @JsonProperty(FIELD_NAME_STATUS_DURATION) Map<ExecutionState, Long> statusDuration) {
 
         this.subtaskIndex = subtaskIndex;
         this.status = Preconditions.checkNotNull(status);
@@ -114,6 +122,7 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
         this.duration = duration;
         this.ioMetricsInfo = Preconditions.checkNotNull(ioMetricsInfo);
         this.taskmanagerId = Preconditions.checkNotNull(taskmanagerId);
+        this.statusDuration = Preconditions.checkNotNull(statusDuration);
     }
 
     public int getSubtaskIndex() {
@@ -148,6 +157,14 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
         return duration;
     }
 
+    public Map<ExecutionState, Long> getStatusDuration() {
+        return statusDuration;
+    }
+
+    public long getStatusDuration(ExecutionState state) {
+        return statusDuration.get(state);
+    }
+
     public IOMetricsInfo getIoMetricsInfo() {
         return ioMetricsInfo;
     }
@@ -176,7 +193,8 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
                 && endTime == that.endTime
                 && duration == that.duration
                 && Objects.equals(ioMetricsInfo, that.ioMetricsInfo)
-                && Objects.equals(taskmanagerId, that.taskmanagerId);
+                && Objects.equals(taskmanagerId, that.taskmanagerId)
+                && Objects.equals(statusDuration, that.statusDuration);
     }
 
     @Override
@@ -191,7 +209,8 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
                 endTime,
                 duration,
                 ioMetricsInfo,
-                taskmanagerId);
+                taskmanagerId,
+                statusDuration);
     }
 
     public static SubtaskExecutionAttemptDetailsInfo create(
@@ -240,6 +259,56 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
                 endTime,
                 duration,
                 ioMetricsInfo,
-                taskmanagerId);
+                taskmanagerId,
+                getExecutionStateDuration(execution));
+    }
+
+    private static Map<ExecutionState, Long> getExecutionStateDuration(AccessExecution execution) {
+        Map<ExecutionState, Long> executionStateDuration = new HashMap<>();
+        long now = System.currentTimeMillis();
+        ExecutionState state = execution.getState();
+        executionStateDuration.put(
+                ExecutionState.CREATED,
+                calculateStateDuration(
+                        execution.getStateTimestamp(ExecutionState.CREATED),
+                        state == ExecutionState.CREATED
+                                ? now
+                                : execution.getStateEndTimestamp(ExecutionState.CREATED)));
+        executionStateDuration.put(
+                ExecutionState.SCHEDULED,
+                calculateStateDuration(
+                        execution.getStateTimestamp(ExecutionState.SCHEDULED),
+                        state == ExecutionState.SCHEDULED
+                                ? now
+                                : execution.getStateEndTimestamp(ExecutionState.SCHEDULED)));
+        executionStateDuration.put(
+                ExecutionState.DEPLOYING,
+                calculateStateDuration(
+                        execution.getStateTimestamp(ExecutionState.DEPLOYING),
+                        state == ExecutionState.DEPLOYING
+                                ? now
+                                : execution.getStateEndTimestamp(ExecutionState.DEPLOYING)));
+        executionStateDuration.put(
+                ExecutionState.INITIALIZING,
+                calculateStateDuration(
+                        execution.getStateTimestamp(ExecutionState.INITIALIZING),
+                        state == ExecutionState.INITIALIZING
+                                ? now
+                                : execution.getStateEndTimestamp(ExecutionState.INITIALIZING)));
+        executionStateDuration.put(
+                ExecutionState.RUNNING,
+                calculateStateDuration(
+                        execution.getStateTimestamp(ExecutionState.RUNNING),
+                        state == ExecutionState.RUNNING
+                                ? now
+                                : execution.getStateEndTimestamp(ExecutionState.RUNNING)));
+        return executionStateDuration;
+    }
+
+    private static long calculateStateDuration(long start, long end) {
+        if (start == 0 || end == 0) {
+            return -1;
+        }
+        return end - start;
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 6819ef941c7..b6e9cefc2b4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -435,6 +435,9 @@ public class ArchivedExecutionGraphTest extends TestLogger {
         assertEquals(runtimeExecution.getAttemptNumber(), archivedExecution.getAttemptNumber());
         assertArrayEquals(
                 runtimeExecution.getStateTimestamps(), archivedExecution.getStateTimestamps());
+        assertArrayEquals(
+                runtimeExecution.getStateEndTimestamps(),
+                archivedExecution.getStateEndTimestamps());
         assertEquals(runtimeExecution.getState(), archivedExecution.getState());
         assertEquals(
                 runtimeExecution.getAssignedResourceLocation(),
@@ -472,6 +475,33 @@ public class ArchivedExecutionGraphTest extends TestLogger {
         assertEquals(
                 runtimeExecution.getStateTimestamp(ExecutionState.FAILED),
                 archivedExecution.getStateTimestamp(ExecutionState.FAILED));
+        assertEquals(
+                runtimeExecution.getStateEndTimestamp(ExecutionState.CREATED),
+                archivedExecution.getStateEndTimestamp(ExecutionState.CREATED));
+        assertEquals(
+                runtimeExecution.getStateEndTimestamp(ExecutionState.SCHEDULED),
+                archivedExecution.getStateEndTimestamp(ExecutionState.SCHEDULED));
+        assertEquals(
+                runtimeExecution.getStateEndTimestamp(ExecutionState.DEPLOYING),
+                archivedExecution.getStateEndTimestamp(ExecutionState.DEPLOYING));
+        assertEquals(
+                runtimeExecution.getStateEndTimestamp(ExecutionState.INITIALIZING),
+                archivedExecution.getStateEndTimestamp(ExecutionState.INITIALIZING));
+        assertEquals(
+                runtimeExecution.getStateEndTimestamp(ExecutionState.RUNNING),
+                archivedExecution.getStateEndTimestamp(ExecutionState.RUNNING));
+        assertEquals(
+                runtimeExecution.getStateEndTimestamp(ExecutionState.FINISHED),
+                archivedExecution.getStateEndTimestamp(ExecutionState.FINISHED));
+        assertEquals(
+                runtimeExecution.getStateEndTimestamp(ExecutionState.CANCELING),
+                archivedExecution.getStateEndTimestamp(ExecutionState.CANCELING));
+        assertEquals(
+                runtimeExecution.getStateEndTimestamp(ExecutionState.CANCELED),
+                archivedExecution.getStateEndTimestamp(ExecutionState.CANCELED));
+        assertEquals(
+                runtimeExecution.getStateEndTimestamp(ExecutionState.FAILED),
+                archivedExecution.getStateEndTimestamp(ExecutionState.FAILED));
         compareStringifiedAccumulators(
                 runtimeExecution.getUserAccumulatorsStringified(),
                 archivedExecution.getUserAccumulatorsStringified());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionHistoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionHistoryTest.java
index c719fecf860..3f1c54b1238 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionHistoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionHistoryTest.java
@@ -80,6 +80,7 @@ class ExecutionHistoryTest {
                 null,
                 null,
                 null,
+                new long[ExecutionState.values().length],
                 new long[ExecutionState.values().length]);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
index b5eab9a20a2..93d3cf9c99d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
@@ -330,6 +330,7 @@ public class JobExceptionsHandlerTest extends TestLogger {
         final StringifiedAccumulatorResult[] emptyAccumulators =
                 new StringifiedAccumulatorResult[0];
         final long[] timestamps = new long[ExecutionState.values().length];
+        final long[] endTimestamps = new long[ExecutionState.values().length];
         final ExecutionState expectedState = ExecutionState.RUNNING;
 
         final LocalTaskManagerLocation assignedResourceLocation = new LocalTaskManagerLocation();
@@ -352,7 +353,8 @@ public class JobExceptionsHandlerTest extends TestLogger {
                                             System.currentTimeMillis()),
                                     assignedResourceLocation,
                                     allocationID,
-                                    timestamps),
+                                    timestamps,
+                                    endTimestamps),
                             new ExecutionHistory(0))
                 },
                 jobVertexID,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
index 483ad2facf1..4c267329a74 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -50,6 +50,7 @@ import org.junit.Test;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Map;
 
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
 import static org.junit.Assert.assertEquals;
@@ -87,7 +88,9 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
                         accumulateBackPressuredTime);
 
         final long[] timestamps = new long[ExecutionState.values().length];
+        final long[] endTimestamps = new long[ExecutionState.values().length];
         timestamps[ExecutionState.DEPLOYING.ordinal()] = deployingTs;
+        endTimestamps[ExecutionState.DEPLOYING.ordinal()] = deployingTs + 10;
         final ExecutionState expectedState = ExecutionState.FINISHED;
 
         timestamps[expectedState.ordinal()] = finishedTs;
@@ -106,7 +109,8 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
                         null,
                         assignedResourceLocation,
                         allocationID,
-                        timestamps);
+                        timestamps,
+                        endTimestamps);
 
         final ArchivedExecutionVertex executionVertex =
                 new ArchivedExecutionVertex(
@@ -170,6 +174,13 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
                         accumulateIdleTime,
                         accumulateBusyTime);
 
+        final Map<ExecutionState, Long> statusDuration = new HashMap<>();
+        statusDuration.put(ExecutionState.CREATED, -1L);
+        statusDuration.put(ExecutionState.SCHEDULED, -1L);
+        statusDuration.put(ExecutionState.DEPLOYING, 10L);
+        statusDuration.put(ExecutionState.INITIALIZING, -1L);
+        statusDuration.put(ExecutionState.RUNNING, -1L);
+
         final SubtaskExecutionAttemptDetailsInfo expectedDetailsInfo =
                 new SubtaskExecutionAttemptDetailsInfo(
                         subtaskIndex,
@@ -180,7 +191,8 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
                         finishedTs,
                         finishedTs - deployingTs,
                         ioMetricsInfo,
-                        assignedResourceLocation.getResourceID().getResourceIdString());
+                        assignedResourceLocation.getResourceID().getResourceIdString(),
+                        statusDuration);
 
         assertEquals(expectedDetailsInfo, detailsInfo);
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
index e5d98dbf8f4..bf1b9561e43 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
@@ -99,6 +99,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandlerTest extends TestLogger {
                         null,
                         null,
                         null,
+                        new long[ExecutionState.values().length],
                         new long[ExecutionState.values().length]);
 
         // Invoke tested method.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
index 4a288e738be..a044f3f87ff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -52,6 +52,7 @@ import org.junit.Test;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Map;
 
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
 import static org.junit.Assert.assertEquals;
@@ -107,6 +108,7 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
                                             null,
                                             null,
                                             null,
+                                            new long[ExecutionState.values().length],
                                             new long[ExecutionState.values().length]),
                                     new ExecutionHistory(0))
                         },
@@ -175,6 +177,13 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
                         accumulateIdleTime,
                         accumulateBusyTime);
 
+        final Map<ExecutionState, Long> statusDuration = new HashMap<>();
+        statusDuration.put(ExecutionState.CREATED, -1L);
+        statusDuration.put(ExecutionState.SCHEDULED, -1L);
+        statusDuration.put(ExecutionState.DEPLOYING, -1L);
+        statusDuration.put(ExecutionState.INITIALIZING, -1L);
+        statusDuration.put(ExecutionState.RUNNING, -1L);
+
         final SubtaskExecutionAttemptDetailsInfo expectedDetailsInfo =
                 new SubtaskExecutionAttemptDetailsInfo(
                         subtaskIndex,
@@ -185,7 +194,8 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
                         0L,
                         -1L,
                         ioMetricsInfo,
-                        "(unassigned)");
+                        "(unassigned)",
+                        statusDuration);
 
         assertEquals(expectedDetailsInfo, detailsInfo);
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java
index 806ce560494..50c0865f85c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetails
 import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 
@@ -62,7 +63,8 @@ public class JobVertexDetailsInfoTest
                         System.currentTimeMillis(),
                         1L,
                         jobVertexMetrics,
-                        "taskmanagerId1"));
+                        "taskmanagerId1",
+                        Collections.singletonMap(ExecutionState.CREATED, 10L)));
         vertexTaskDetailList.add(
                 new SubtaskExecutionAttemptDetailsInfo(
                         1,
@@ -73,7 +75,8 @@ public class JobVertexDetailsInfoTest
                         System.currentTimeMillis(),
                         1L,
                         jobVertexMetrics,
-                        "taskmanagerId2"));
+                        "taskmanagerId2",
+                        Collections.singletonMap(ExecutionState.CREATED, 10L)));
         vertexTaskDetailList.add(
                 new SubtaskExecutionAttemptDetailsInfo(
                         2,
@@ -84,7 +87,8 @@ public class JobVertexDetailsInfoTest
                         System.currentTimeMillis(),
                         1L,
                         jobVertexMetrics,
-                        "taskmanagerId3"));
+                        "taskmanagerId3",
+                        Collections.singletonMap(ExecutionState.CREATED, 10L)));
 
         int parallelism = 1 + (random.nextInt() / 3);
         return new JobVertexDetailsInfo(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java
index f022d6ced26..48158c05b56 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java
@@ -22,6 +22,8 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
 import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Random;
 
 /** Tests (un)marshalling of the {@link SubtaskExecutionAttemptDetailsInfo}. */
@@ -51,6 +53,13 @@ public class SubtaskExecutionAttemptDetailsInfoTest
                         Math.abs(random.nextLong()),
                         Math.abs(random.nextDouble()));
 
+        final Map<ExecutionState, Long> statusDuration = new HashMap<>();
+        statusDuration.put(ExecutionState.CREATED, 10L);
+        statusDuration.put(ExecutionState.SCHEDULED, 20L);
+        statusDuration.put(ExecutionState.DEPLOYING, 30L);
+        statusDuration.put(ExecutionState.INITIALIZING, 40L);
+        statusDuration.put(ExecutionState.RUNNING, 50L);
+
         return new SubtaskExecutionAttemptDetailsInfo(
                 Math.abs(random.nextInt()),
                 ExecutionState.values()[random.nextInt(ExecutionState.values().length)],
@@ -60,6 +69,7 @@ public class SubtaskExecutionAttemptDetailsInfoTest
                 Math.abs(random.nextLong()),
                 Math.abs(random.nextLong()),
                 ioMetricsInfo,
-                "taskmanagerId");
+                "taskmanagerId",
+                statusDuration);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/TestingAccessExecution.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/TestingAccessExecution.java
index f3ec6fe2f1d..5173ba7c2d0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/TestingAccessExecution.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/TestingAccessExecution.java
@@ -77,6 +77,11 @@ public class TestingAccessExecution implements AccessExecution {
         throw new UnsupportedOperationException("getStateTimestamps should not be called.");
     }
 
+    @Override
+    public long[] getStateEndTimestamps() {
+        throw new UnsupportedOperationException("getStateTimestamps should not be called.");
+    }
+
     @Override
     public ExecutionState getState() {
         return state;
@@ -87,6 +92,11 @@ public class TestingAccessExecution implements AccessExecution {
         throw new UnsupportedOperationException("getStateTimestamp should not be called.");
     }
 
+    @Override
+    public long getStateEndTimestamp(ExecutionState state) {
+        throw new UnsupportedOperationException("getStateTimestamp should not be called.");
+    }
+
     @Override
     public StringifiedAccumulatorResult[] getUserAccumulatorsStringified() {
         throw new UnsupportedOperationException(