You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gu...@apache.org on 2022/07/01 03:35:31 UTC

[flink] 03/03: [FLINK-28309][rest] Introduce metrics of the duration that a task stays in each status

This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bb8e1d14f05aca186ec874437eba3d44fbb3bd97
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Tue May 24 11:25:17 2022 +0800

    [FLINK-28309][rest] Introduce metrics of the duration that a task stays in each status
    
    This closes #20111.
---
 .../shortcodes/generated/rest_v1_dispatcher.html   | 18 +++++
 docs/static/generated/rest_v1_dispatcher.yml       |  5 ++
 .../src/test/resources/rest_api_v1.snapshot        | 18 +++++
 .../runtime/executiongraph/AccessExecution.java    | 15 +++++
 .../runtime/executiongraph/ArchivedExecution.java  | 19 +++++-
 .../flink/runtime/executiongraph/Execution.java    | 29 +++++++-
 .../job/SubtaskExecutionAttemptDetailsInfo.java    | 77 ++++++++++++++++++++--
 .../executiongraph/ArchivedExecutionGraphTest.java | 30 +++++++++
 .../executiongraph/ExecutionHistoryTest.java       |  1 +
 .../rest/handler/job/JobExceptionsHandlerTest.java |  4 +-
 .../SubtaskCurrentAttemptDetailsHandlerTest.java   | 16 ++++-
 ...askExecutionAttemptAccumulatorsHandlerTest.java |  1 +
 .../SubtaskExecutionAttemptDetailsHandlerTest.java | 12 +++-
 .../rest/messages/JobVertexDetailsInfoTest.java    | 10 ++-
 .../SubtaskExecutionAttemptDetailsInfoTest.java    | 12 +++-
 .../exceptionhistory/TestingAccessExecution.java   | 10 +++
 16 files changed, 260 insertions(+), 17 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index 4b648e77e72..3f56fcef74b 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -3712,6 +3712,12 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
             "type" : "string",
             "enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "INITIALIZING" ]
           },
+          "status-duration" : {
+            "type" : "object",
+            "additionalProperties" : {
+              "type" : "integer"
+            }
+          },
           "subtask" : {
             "type" : "integer"
           },
@@ -4406,6 +4412,12 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
       "type" : "string",
       "enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "INITIALIZING" ]
     },
+    "status-duration" : {
+      "type" : "object",
+      "additionalProperties" : {
+        "type" : "integer"
+      }
+    },
     "subtask" : {
       "type" : "integer"
     },
@@ -4543,6 +4555,12 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
       "type" : "string",
       "enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "INITIALIZING" ]
     },
+    "status-duration" : {
+      "type" : "object",
+      "additionalProperties" : {
+        "type" : "integer"
+      }
+    },
     "subtask" : {
       "type" : "integer"
     },
diff --git a/docs/static/generated/rest_v1_dispatcher.yml b/docs/static/generated/rest_v1_dispatcher.yml
index 259d5b6d9ba..4b85119cd68 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -1687,6 +1687,11 @@ components:
           $ref: '#/components/schemas/IOMetricsInfo'
         taskmanager-id:
           type: string
+        status-duration:
+          type: object
+          additionalProperties:
+            type: integer
+            format: int64
     JobResult:
       type: object
       properties:
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 99cd6fb0cab..30cbe20f7d1 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -2191,6 +2191,12 @@
               "taskmanager-id" : {
                 "type" : "string"
               },
+              "status-duration" : {
+                "type" : "object",
+                "additionalProperties" : {
+                  "type" : "integer"
+                }
+              },
               "start_time" : {
                 "type" : "integer"
               }
@@ -2561,6 +2567,12 @@
         "taskmanager-id" : {
           "type" : "string"
         },
+        "status-duration" : {
+          "type" : "object",
+          "additionalProperties" : {
+            "type" : "integer"
+          }
+        },
         "start_time" : {
           "type" : "integer"
         }
@@ -2656,6 +2668,12 @@
         "taskmanager-id" : {
           "type" : "string"
         },
+        "status-duration" : {
+          "type" : "object",
+          "additionalProperties" : {
+            "type" : "integer"
+          }
+        },
         "start_time" : {
           "type" : "integer"
         }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
index b0da913b4dd..c8a5dcf7f00 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
@@ -47,6 +47,13 @@ public interface AccessExecution {
      */
     long[] getStateTimestamps();
 
+    /**
+     * Returns the end timestamps for every {@link ExecutionState}.
+     *
+     * @return timestamps for each state
+     */
+    long[] getStateEndTimestamps();
+
     /**
      * Returns the current {@link ExecutionState} for this execution.
      *
@@ -79,6 +86,14 @@ public interface AccessExecution {
      */
     long getStateTimestamp(ExecutionState state);
 
+    /**
+     * Returns the end timestamp for the given {@link ExecutionState}.
+     *
+     * @param state state for which the timestamp should be returned
+     * @return timestamp for the given state
+     */
+    long getStateEndTimestamp(ExecutionState state);
+
     /**
      * Returns the user-defined accumulators as strings.
      *
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
index a270d072c4d..7cb2d8a39da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
@@ -37,6 +37,8 @@ public class ArchivedExecution implements AccessExecution, Serializable {
 
     private final long[] stateTimestamps;
 
+    private final long[] stateEndTimestamps;
+
     private final ExecutionState state;
 
     @Nullable private final ErrorInfo failureInfo; // once assigned, never changes
@@ -59,7 +61,8 @@ public class ArchivedExecution implements AccessExecution, Serializable {
                 execution.getFailureInfo().orElse(null),
                 execution.getAssignedResourceLocation(),
                 execution.getAssignedAllocationID(),
-                execution.getStateTimestamps());
+                execution.getStateTimestamps(),
+                execution.getStateEndTimestamps());
     }
 
     public ArchivedExecution(
@@ -70,7 +73,8 @@ public class ArchivedExecution implements AccessExecution, Serializable {
             @Nullable ErrorInfo failureCause,
             TaskManagerLocation assignedResourceLocation,
             AllocationID assignedAllocationID,
-            long[] stateTimestamps) {
+            long[] stateTimestamps,
+            long[] stateEndTimestamps) {
         this.userAccumulators = userAccumulators;
         this.ioMetrics = ioMetrics;
         this.failureInfo = failureCause;
@@ -78,6 +82,7 @@ public class ArchivedExecution implements AccessExecution, Serializable {
         this.attemptId = attemptId;
         this.state = state;
         this.stateTimestamps = stateTimestamps;
+        this.stateEndTimestamps = stateEndTimestamps;
         this.assignedAllocationID = assignedAllocationID;
     }
 
@@ -100,6 +105,11 @@ public class ArchivedExecution implements AccessExecution, Serializable {
         return stateTimestamps;
     }
 
+    @Override
+    public long[] getStateEndTimestamps() {
+        return stateEndTimestamps;
+    }
+
     @Override
     public ExecutionState getState() {
         return state;
@@ -124,6 +134,11 @@ public class ArchivedExecution implements AccessExecution, Serializable {
         return this.stateTimestamps[state.ordinal()];
     }
 
+    @Override
+    public long getStateEndTimestamp(ExecutionState state) {
+        return this.stateEndTimestamps[state.ordinal()];
+    }
+
     @Override
     public StringifiedAccumulatorResult[] getUserAccumulatorsStringified() {
         return userAccumulators;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 79c2cb9602f..459d9861980 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -133,6 +133,12 @@ public class Execution
      */
     private final long[] stateTimestamps;
 
+    /**
+     * The end timestamps when state transitions occurred, indexed by {@link
+     * ExecutionState#ordinal()}.
+     */
+    private final long[] stateEndTimestamps;
+
     private final Time rpcTimeout;
 
     private final Collection<PartitionInfo> partitionInfos;
@@ -211,6 +217,7 @@ public class Execution
         this.rpcTimeout = checkNotNull(rpcTimeout);
 
         this.stateTimestamps = new long[ExecutionState.values().length];
+        this.stateEndTimestamps = new long[ExecutionState.values().length];
         markTimestamp(CREATED, startTimestamp);
 
         this.partitionInfos = new ArrayList<>(16);
@@ -337,11 +344,21 @@ public class Execution
         return stateTimestamps;
     }
 
+    @Override
+    public long[] getStateEndTimestamps() {
+        return stateEndTimestamps;
+    }
+
     @Override
     public long getStateTimestamp(ExecutionState state) {
         return this.stateTimestamps[state.ordinal()];
     }
 
+    @Override
+    public long getStateEndTimestamp(ExecutionState state) {
+        return this.stateEndTimestamps[state.ordinal()];
+    }
+
     public boolean isFinished() {
         return state.isTerminal();
     }
@@ -1405,7 +1422,7 @@ public class Execution
 
         if (state == currentState) {
             state = targetState;
-            markTimestamp(targetState);
+            markTimestamp(currentState, targetState);
 
             if (error == null) {
                 LOG.info(
@@ -1454,14 +1471,20 @@ public class Execution
         }
     }
 
-    private void markTimestamp(ExecutionState state) {
-        markTimestamp(state, System.currentTimeMillis());
+    private void markTimestamp(ExecutionState currentState, ExecutionState targetState) {
+        long now = System.currentTimeMillis();
+        markTimestamp(targetState, now);
+        markEndTimestamp(currentState, now);
     }
 
     private void markTimestamp(ExecutionState state, long timestamp) {
         this.stateTimestamps[state.ordinal()] = timestamp;
     }
 
+    private void markEndTimestamp(ExecutionState state, long timestamp) {
+        this.stateEndTimestamps[state.ordinal()] = timestamp;
+    }
+
     public String getVertexWithAttempt() {
         return vertex.getTaskNameWithSubtaskIndex() + " - execution #" + getAttemptNumber();
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
index 637538f1f48..a1a3115369c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
@@ -36,6 +36,8 @@ import io.swagger.v3.oas.annotations.Hidden;
 
 import javax.annotation.Nullable;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Objects;
 
 /** The sub task execution attempt response. */
@@ -61,6 +63,8 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
 
     public static final String FIELD_NAME_TASKMANAGER_ID = "taskmanager-id";
 
+    public static final String FIELD_NAME_STATUS_DURATION = "status-duration";
+
     @JsonProperty(FIELD_NAME_SUBTASK_INDEX)
     private final int subtaskIndex;
 
@@ -92,6 +96,9 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
     @JsonProperty(FIELD_NAME_TASKMANAGER_ID)
     private final String taskmanagerId;
 
+    @JsonProperty(FIELD_NAME_STATUS_DURATION)
+    private final Map<ExecutionState, Long> statusDuration;
+
     @JsonCreator
     public SubtaskExecutionAttemptDetailsInfo(
             @JsonProperty(FIELD_NAME_SUBTASK_INDEX) int subtaskIndex,
@@ -102,7 +109,8 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
             @JsonProperty(FIELD_NAME_END_TIME) long endTime,
             @JsonProperty(FIELD_NAME_DURATION) long duration,
             @JsonProperty(FIELD_NAME_METRICS) IOMetricsInfo ioMetricsInfo,
-            @JsonProperty(FIELD_NAME_TASKMANAGER_ID) String taskmanagerId) {
+            @JsonProperty(FIELD_NAME_TASKMANAGER_ID) String taskmanagerId,
+            @JsonProperty(FIELD_NAME_STATUS_DURATION) Map<ExecutionState, Long> statusDuration) {
 
         this.subtaskIndex = subtaskIndex;
         this.status = Preconditions.checkNotNull(status);
@@ -114,6 +122,7 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
         this.duration = duration;
         this.ioMetricsInfo = Preconditions.checkNotNull(ioMetricsInfo);
         this.taskmanagerId = Preconditions.checkNotNull(taskmanagerId);
+        this.statusDuration = Preconditions.checkNotNull(statusDuration);
     }
 
     public int getSubtaskIndex() {
@@ -148,6 +157,14 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
         return duration;
     }
 
+    public Map<ExecutionState, Long> getStatusDuration() {
+        return statusDuration;
+    }
+
+    public long getStatusDuration(ExecutionState state) {
+        return statusDuration.get(state);
+    }
+
     public IOMetricsInfo getIoMetricsInfo() {
         return ioMetricsInfo;
     }
@@ -176,7 +193,8 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
                 && endTime == that.endTime
                 && duration == that.duration
                 && Objects.equals(ioMetricsInfo, that.ioMetricsInfo)
-                && Objects.equals(taskmanagerId, that.taskmanagerId);
+                && Objects.equals(taskmanagerId, that.taskmanagerId)
+                && Objects.equals(statusDuration, that.statusDuration);
     }
 
     @Override
@@ -191,7 +209,8 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
                 endTime,
                 duration,
                 ioMetricsInfo,
-                taskmanagerId);
+                taskmanagerId,
+                statusDuration);
     }
 
     public static SubtaskExecutionAttemptDetailsInfo create(
@@ -240,6 +259,56 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
                 endTime,
                 duration,
                 ioMetricsInfo,
-                taskmanagerId);
+                taskmanagerId,
+                getExecutionStateDuration(execution));
+    }
+
+    private static Map<ExecutionState, Long> getExecutionStateDuration(AccessExecution execution) {
+        Map<ExecutionState, Long> executionStateDuration = new HashMap<>();
+        long now = System.currentTimeMillis();
+        ExecutionState state = execution.getState();
+        executionStateDuration.put(
+                ExecutionState.CREATED,
+                calculateStateDuration(
+                        execution.getStateTimestamp(ExecutionState.CREATED),
+                        state == ExecutionState.CREATED
+                                ? now
+                                : execution.getStateEndTimestamp(ExecutionState.CREATED)));
+        executionStateDuration.put(
+                ExecutionState.SCHEDULED,
+                calculateStateDuration(
+                        execution.getStateTimestamp(ExecutionState.SCHEDULED),
+                        state == ExecutionState.SCHEDULED
+                                ? now
+                                : execution.getStateEndTimestamp(ExecutionState.SCHEDULED)));
+        executionStateDuration.put(
+                ExecutionState.DEPLOYING,
+                calculateStateDuration(
+                        execution.getStateTimestamp(ExecutionState.DEPLOYING),
+                        state == ExecutionState.DEPLOYING
+                                ? now
+                                : execution.getStateEndTimestamp(ExecutionState.DEPLOYING)));
+        executionStateDuration.put(
+                ExecutionState.INITIALIZING,
+                calculateStateDuration(
+                        execution.getStateTimestamp(ExecutionState.INITIALIZING),
+                        state == ExecutionState.INITIALIZING
+                                ? now
+                                : execution.getStateEndTimestamp(ExecutionState.INITIALIZING)));
+        executionStateDuration.put(
+                ExecutionState.RUNNING,
+                calculateStateDuration(
+                        execution.getStateTimestamp(ExecutionState.RUNNING),
+                        state == ExecutionState.RUNNING
+                                ? now
+                                : execution.getStateEndTimestamp(ExecutionState.RUNNING)));
+        return executionStateDuration;
+    }
+
+    private static long calculateStateDuration(long start, long end) {
+        if (start == 0 || end == 0) {
+            return -1;
+        }
+        return end - start;
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 6819ef941c7..b6e9cefc2b4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -435,6 +435,9 @@ public class ArchivedExecutionGraphTest extends TestLogger {
         assertEquals(runtimeExecution.getAttemptNumber(), archivedExecution.getAttemptNumber());
         assertArrayEquals(
                 runtimeExecution.getStateTimestamps(), archivedExecution.getStateTimestamps());
+        assertArrayEquals(
+                runtimeExecution.getStateEndTimestamps(),
+                archivedExecution.getStateEndTimestamps());
         assertEquals(runtimeExecution.getState(), archivedExecution.getState());
         assertEquals(
                 runtimeExecution.getAssignedResourceLocation(),
@@ -472,6 +475,33 @@ public class ArchivedExecutionGraphTest extends TestLogger {
         assertEquals(
                 runtimeExecution.getStateTimestamp(ExecutionState.FAILED),
                 archivedExecution.getStateTimestamp(ExecutionState.FAILED));
+        assertEquals(
+                runtimeExecution.getStateEndTimestamp(ExecutionState.CREATED),
+                archivedExecution.getStateEndTimestamp(ExecutionState.CREATED));
+        assertEquals(
+                runtimeExecution.getStateEndTimestamp(ExecutionState.SCHEDULED),
+                archivedExecution.getStateEndTimestamp(ExecutionState.SCHEDULED));
+        assertEquals(
+                runtimeExecution.getStateEndTimestamp(ExecutionState.DEPLOYING),
+                archivedExecution.getStateEndTimestamp(ExecutionState.DEPLOYING));
+        assertEquals(
+                runtimeExecution.getStateEndTimestamp(ExecutionState.INITIALIZING),
+                archivedExecution.getStateEndTimestamp(ExecutionState.INITIALIZING));
+        assertEquals(
+                runtimeExecution.getStateEndTimestamp(ExecutionState.RUNNING),
+                archivedExecution.getStateEndTimestamp(ExecutionState.RUNNING));
+        assertEquals(
+                runtimeExecution.getStateEndTimestamp(ExecutionState.FINISHED),
+                archivedExecution.getStateEndTimestamp(ExecutionState.FINISHED));
+        assertEquals(
+                runtimeExecution.getStateEndTimestamp(ExecutionState.CANCELING),
+                archivedExecution.getStateEndTimestamp(ExecutionState.CANCELING));
+        assertEquals(
+                runtimeExecution.getStateEndTimestamp(ExecutionState.CANCELED),
+                archivedExecution.getStateEndTimestamp(ExecutionState.CANCELED));
+        assertEquals(
+                runtimeExecution.getStateEndTimestamp(ExecutionState.FAILED),
+                archivedExecution.getStateEndTimestamp(ExecutionState.FAILED));
         compareStringifiedAccumulators(
                 runtimeExecution.getUserAccumulatorsStringified(),
                 archivedExecution.getUserAccumulatorsStringified());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionHistoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionHistoryTest.java
index c719fecf860..3f1c54b1238 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionHistoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionHistoryTest.java
@@ -80,6 +80,7 @@ class ExecutionHistoryTest {
                 null,
                 null,
                 null,
+                new long[ExecutionState.values().length],
                 new long[ExecutionState.values().length]);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
index b5eab9a20a2..93d3cf9c99d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
@@ -330,6 +330,7 @@ public class JobExceptionsHandlerTest extends TestLogger {
         final StringifiedAccumulatorResult[] emptyAccumulators =
                 new StringifiedAccumulatorResult[0];
         final long[] timestamps = new long[ExecutionState.values().length];
+        final long[] endTimestamps = new long[ExecutionState.values().length];
         final ExecutionState expectedState = ExecutionState.RUNNING;
 
         final LocalTaskManagerLocation assignedResourceLocation = new LocalTaskManagerLocation();
@@ -352,7 +353,8 @@ public class JobExceptionsHandlerTest extends TestLogger {
                                             System.currentTimeMillis()),
                                     assignedResourceLocation,
                                     allocationID,
-                                    timestamps),
+                                    timestamps,
+                                    endTimestamps),
                             new ExecutionHistory(0))
                 },
                 jobVertexID,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
index 483ad2facf1..4c267329a74 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -50,6 +50,7 @@ import org.junit.Test;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Map;
 
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
 import static org.junit.Assert.assertEquals;
@@ -87,7 +88,9 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
                         accumulateBackPressuredTime);
 
         final long[] timestamps = new long[ExecutionState.values().length];
+        final long[] endTimestamps = new long[ExecutionState.values().length];
         timestamps[ExecutionState.DEPLOYING.ordinal()] = deployingTs;
+        endTimestamps[ExecutionState.DEPLOYING.ordinal()] = deployingTs + 10;
         final ExecutionState expectedState = ExecutionState.FINISHED;
 
         timestamps[expectedState.ordinal()] = finishedTs;
@@ -106,7 +109,8 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
                         null,
                         assignedResourceLocation,
                         allocationID,
-                        timestamps);
+                        timestamps,
+                        endTimestamps);
 
         final ArchivedExecutionVertex executionVertex =
                 new ArchivedExecutionVertex(
@@ -170,6 +174,13 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
                         accumulateIdleTime,
                         accumulateBusyTime);
 
+        final Map<ExecutionState, Long> statusDuration = new HashMap<>();
+        statusDuration.put(ExecutionState.CREATED, -1L);
+        statusDuration.put(ExecutionState.SCHEDULED, -1L);
+        statusDuration.put(ExecutionState.DEPLOYING, 10L);
+        statusDuration.put(ExecutionState.INITIALIZING, -1L);
+        statusDuration.put(ExecutionState.RUNNING, -1L);
+
         final SubtaskExecutionAttemptDetailsInfo expectedDetailsInfo =
                 new SubtaskExecutionAttemptDetailsInfo(
                         subtaskIndex,
@@ -180,7 +191,8 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
                         finishedTs,
                         finishedTs - deployingTs,
                         ioMetricsInfo,
-                        assignedResourceLocation.getResourceID().getResourceIdString());
+                        assignedResourceLocation.getResourceID().getResourceIdString(),
+                        statusDuration);
 
         assertEquals(expectedDetailsInfo, detailsInfo);
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
index e5d98dbf8f4..bf1b9561e43 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
@@ -99,6 +99,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandlerTest extends TestLogger {
                         null,
                         null,
                         null,
+                        new long[ExecutionState.values().length],
                         new long[ExecutionState.values().length]);
 
         // Invoke tested method.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
index 4a288e738be..a044f3f87ff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -52,6 +52,7 @@ import org.junit.Test;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Map;
 
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
 import static org.junit.Assert.assertEquals;
@@ -107,6 +108,7 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
                                             null,
                                             null,
                                             null,
+                                            new long[ExecutionState.values().length],
                                             new long[ExecutionState.values().length]),
                                     new ExecutionHistory(0))
                         },
@@ -175,6 +177,13 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
                         accumulateIdleTime,
                         accumulateBusyTime);
 
+        final Map<ExecutionState, Long> statusDuration = new HashMap<>();
+        statusDuration.put(ExecutionState.CREATED, -1L);
+        statusDuration.put(ExecutionState.SCHEDULED, -1L);
+        statusDuration.put(ExecutionState.DEPLOYING, -1L);
+        statusDuration.put(ExecutionState.INITIALIZING, -1L);
+        statusDuration.put(ExecutionState.RUNNING, -1L);
+
         final SubtaskExecutionAttemptDetailsInfo expectedDetailsInfo =
                 new SubtaskExecutionAttemptDetailsInfo(
                         subtaskIndex,
@@ -185,7 +194,8 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
                         0L,
                         -1L,
                         ioMetricsInfo,
-                        "(unassigned)");
+                        "(unassigned)",
+                        statusDuration);
 
         assertEquals(expectedDetailsInfo, detailsInfo);
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java
index 806ce560494..50c0865f85c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetails
 import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 
@@ -62,7 +63,8 @@ public class JobVertexDetailsInfoTest
                         System.currentTimeMillis(),
                         1L,
                         jobVertexMetrics,
-                        "taskmanagerId1"));
+                        "taskmanagerId1",
+                        Collections.singletonMap(ExecutionState.CREATED, 10L)));
         vertexTaskDetailList.add(
                 new SubtaskExecutionAttemptDetailsInfo(
                         1,
@@ -73,7 +75,8 @@ public class JobVertexDetailsInfoTest
                         System.currentTimeMillis(),
                         1L,
                         jobVertexMetrics,
-                        "taskmanagerId2"));
+                        "taskmanagerId2",
+                        Collections.singletonMap(ExecutionState.CREATED, 10L)));
         vertexTaskDetailList.add(
                 new SubtaskExecutionAttemptDetailsInfo(
                         2,
@@ -84,7 +87,8 @@ public class JobVertexDetailsInfoTest
                         System.currentTimeMillis(),
                         1L,
                         jobVertexMetrics,
-                        "taskmanagerId3"));
+                        "taskmanagerId3",
+                        Collections.singletonMap(ExecutionState.CREATED, 10L)));
 
         int parallelism = 1 + (random.nextInt() / 3);
         return new JobVertexDetailsInfo(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java
index f022d6ced26..48158c05b56 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java
@@ -22,6 +22,8 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
 import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Random;
 
 /** Tests (un)marshalling of the {@link SubtaskExecutionAttemptDetailsInfo}. */
@@ -51,6 +53,13 @@ public class SubtaskExecutionAttemptDetailsInfoTest
                         Math.abs(random.nextLong()),
                         Math.abs(random.nextDouble()));
 
+        final Map<ExecutionState, Long> statusDuration = new HashMap<>();
+        statusDuration.put(ExecutionState.CREATED, 10L);
+        statusDuration.put(ExecutionState.SCHEDULED, 20L);
+        statusDuration.put(ExecutionState.DEPLOYING, 30L);
+        statusDuration.put(ExecutionState.INITIALIZING, 40L);
+        statusDuration.put(ExecutionState.RUNNING, 50L);
+
         return new SubtaskExecutionAttemptDetailsInfo(
                 Math.abs(random.nextInt()),
                 ExecutionState.values()[random.nextInt(ExecutionState.values().length)],
@@ -60,6 +69,7 @@ public class SubtaskExecutionAttemptDetailsInfoTest
                 Math.abs(random.nextLong()),
                 Math.abs(random.nextLong()),
                 ioMetricsInfo,
-                "taskmanagerId");
+                "taskmanagerId",
+                statusDuration);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/TestingAccessExecution.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/TestingAccessExecution.java
index f3ec6fe2f1d..5173ba7c2d0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/TestingAccessExecution.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/TestingAccessExecution.java
@@ -77,6 +77,11 @@ public class TestingAccessExecution implements AccessExecution {
         throw new UnsupportedOperationException("getStateTimestamps should not be called.");
     }
 
+    @Override
+    public long[] getStateEndTimestamps() {
+        throw new UnsupportedOperationException("getStateTimestamps should not be called.");
+    }
+
     @Override
     public ExecutionState getState() {
         return state;
@@ -87,6 +92,11 @@ public class TestingAccessExecution implements AccessExecution {
         throw new UnsupportedOperationException("getStateTimestamp should not be called.");
     }
 
+    @Override
+    public long getStateEndTimestamp(ExecutionState state) {
+        throw new UnsupportedOperationException("getStateTimestamp should not be called.");
+    }
+
     @Override
     public StringifiedAccumulatorResult[] getUserAccumulatorsStringified() {
         throw new UnsupportedOperationException(