You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/08/01 10:30:59 UTC

[flink] 06/06: [FLINK-28588][rest] Acquire information of all current executions in REST handlers if applicable

This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f436b20429b55ada2a9e8936a5e80fc672a397de
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Sun Jul 31 22:11:46 2022 +0800

    [FLINK-28588][rest] Acquire information of all current executions in REST handlers if applicable
    
    This closes #20296.
---
 .../src/test/resources/rest_api_v1.snapshot        |  31 +++
 .../handler/job/AbstractSubtaskAttemptHandler.java |  15 +-
 .../rest/handler/job/JobDetailsHandler.java        |   2 +
 .../rest/handler/job/JobExceptionsHandler.java     |  33 ++--
 .../handler/job/JobVertexBackPressureHandler.java  |  87 +++++++--
 .../rest/handler/job/JobVertexDetailsHandler.java  |  17 +-
 .../handler/job/JobVertexTaskManagersHandler.java  |  73 +++++---
 .../job/SubtaskCurrentAttemptDetailsHandler.java   |  19 +-
 ...SubtaskExecutionAttemptAccumulatorsHandler.java |  38 ++--
 .../job/SubtaskExecutionAttemptDetailsHandler.java |  52 +++---
 .../job/SubtasksAllAccumulatorsHandler.java        |  32 ++--
 .../rest/handler/job/SubtasksTimesHandler.java     |   3 +-
 .../rest/messages/JobVertexBackPressureInfo.java   |  45 ++++-
 .../job/SubtaskExecutionAttemptDetailsInfo.java    | 105 +++++++----
 .../threadinfo/JobVertexThreadInfoTracker.java     |  24 +--
 .../job/JobVertexBackPressureHandlerTest.java      | 207 +++++++++++++++++++++
 .../SubtaskCurrentAttemptDetailsHandlerTest.java   |   3 +-
 .../SubtaskExecutionAttemptDetailsHandlerTest.java |   3 +-
 .../messages/AggregatedTaskDetailsInfoTest.java    |   3 +-
 .../messages/JobVertexBackPressureInfoTest.java    |  24 ++-
 .../rest/messages/JobVertexDetailsInfoTest.java    |  24 ++-
 .../SubtaskExecutionAttemptDetailsInfoTest.java    |   3 +-
 22 files changed, 651 insertions(+), 192 deletions(-)

diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 873e5062d7d..85337fa8795 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -2403,6 +2403,13 @@
                   "type" : "integer"
                 }
               },
+              "other-concurrent-attempts" : {
+                "type" : "array",
+                "items" : {
+                  "type" : "object",
+                  "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtaskExecutionAttemptDetailsInfo"
+                }
+              },
               "start_time" : {
                 "type" : "integer"
               }
@@ -2522,6 +2529,9 @@
               "subtask" : {
                 "type" : "integer"
               },
+              "attempt-number" : {
+                "type" : "integer"
+              },
               "backpressure-level" : {
                 "type" : "string",
                 "enum" : [ "ok", "low", "high" ]
@@ -2534,6 +2544,13 @@
               },
               "busyRatio" : {
                 "type" : "number"
+              },
+              "other-concurrent-attempts" : {
+                "type" : "array",
+                "items" : {
+                  "type" : "object",
+                  "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobVertexBackPressureInfo:SubtaskBackPressureInfo"
+                }
               }
             }
           }
@@ -2803,6 +2820,13 @@
             "type" : "integer"
           }
         },
+        "other-concurrent-attempts" : {
+          "type" : "array",
+          "items" : {
+            "type" : "object",
+            "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtaskExecutionAttemptDetailsInfo"
+          }
+        },
         "start_time" : {
           "type" : "integer"
         }
@@ -2904,6 +2928,13 @@
             "type" : "integer"
           }
         },
+        "other-concurrent-attempts" : {
+          "type" : "array",
+          "items" : {
+            "type" : "object",
+            "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtaskExecutionAttemptDetailsInfo"
+          }
+        },
         "start_time" : {
           "type" : "integer"
         }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractSubtaskAttemptHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractSubtaskAttemptHandler.java
index 15d128e43e2..04b066860ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractSubtaskAttemptHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractSubtaskAttemptHandler.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
+import java.util.Collection;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.Executor;
@@ -88,11 +89,17 @@ public abstract class AbstractSubtaskAttemptHandler<
             throws RestHandlerException {
         final Integer attemptNumber = request.getPathParameter(SubtaskAttemptPathParameter.class);
 
-        final AccessExecution currentAttempt = executionVertex.getCurrentExecutionAttempt();
+        final Collection<AccessExecution> currentExecutions =
+                executionVertex.getCurrentExecutions();
         final ExecutionHistory executionHistory = executionVertex.getExecutionHistory();
-        if (attemptNumber == currentAttempt.getAttemptNumber()) {
-            return handleRequest(request, currentAttempt);
-        } else if (executionHistory.isValidAttemptNumber(attemptNumber)) {
+
+        for (AccessExecution currentExecution : currentExecutions) {
+            if (attemptNumber == currentExecution.getAttemptNumber()) {
+                return handleRequest(request, currentExecution);
+            }
+        }
+
+        if (executionHistory.isValidAttemptNumber(attemptNumber)) {
             final Optional<? extends AccessExecution> execution =
                     executionHistory.getHistoricalExecution(attemptNumber);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
index 4f850bc0712..f38a7688ab9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
@@ -202,6 +202,8 @@ public class JobDetailsHandler
         MutableIOMetrics counts = new MutableIOMetrics();
 
         for (AccessExecutionVertex vertex : ejv.getTaskVertices()) {
+            // Here we use the metrics of one of the current attempts to represent the subtask,
+            // rather than the aggregation of all attempts.
             counts.addIOMetrics(
                     vertex.getCurrentExecutionAttempt(),
                     metricFetcher,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
index c500792c6aa..4ae57f6bc28 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.handler.job;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
@@ -127,22 +128,24 @@ public class JobExceptionsHandler
         List<JobExceptionsInfo.ExecutionExceptionInfo> taskExceptionList = new ArrayList<>();
         boolean truncated = false;
         for (AccessExecutionVertex task : executionGraph.getAllExecutionVertices()) {
-            Optional<ErrorInfo> failure = task.getFailureInfo();
-            if (failure.isPresent()) {
-                if (taskExceptionList.size() >= exceptionToReportMaxSize) {
-                    truncated = true;
-                    break;
+            for (AccessExecution execution : task.getCurrentExecutions()) {
+                Optional<ErrorInfo> failure = execution.getFailureInfo();
+                if (failure.isPresent()) {
+                    if (taskExceptionList.size() >= exceptionToReportMaxSize) {
+                        truncated = true;
+                        break;
+                    }
+
+                    TaskManagerLocation location = execution.getAssignedResourceLocation();
+                    String locationString = toString(location);
+                    long timestamp = execution.getStateTimestamp(ExecutionState.FAILED);
+                    taskExceptionList.add(
+                            new JobExceptionsInfo.ExecutionExceptionInfo(
+                                    failure.get().getExceptionAsString(),
+                                    task.getTaskNameWithSubtaskIndex(),
+                                    locationString,
+                                    timestamp == 0 ? -1 : timestamp));
                 }
-
-                TaskManagerLocation location = task.getCurrentAssignedResourceLocation();
-                String locationString = toString(location);
-                long timestamp = task.getStateTimestamp(ExecutionState.FAILED);
-                taskExceptionList.add(
-                        new JobExceptionsInfo.ExecutionExceptionInfo(
-                                failure.get().getExceptionAsString(),
-                                task.getTaskNameWithSubtaskIndex(),
-                                locationString,
-                                timestamp == 0 ? -1 : timestamp));
             }
         }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java
index 895fe7011ba..898b74ba0d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.ComponentMetricStore;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.SubtaskMetricStore;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.TaskMetricStore;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
@@ -39,6 +40,7 @@ import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -80,18 +82,23 @@ public class JobVertexBackPressureHandler
                 metricFetcher
                         .getMetricStore()
                         .getTaskMetricStore(jobId.toString(), jobVertexId.toString());
+        Map<String, Map<Integer, Integer>> jobCurrentExecutions =
+                metricFetcher.getMetricStore().getCurrentExecutionAttempts().get(jobId.toString());
+        Map<Integer, Integer> currentExecutionAttempts =
+                jobCurrentExecutions != null
+                        ? jobCurrentExecutions.get(jobVertexId.toString())
+                        : null;
 
         return CompletableFuture.completedFuture(
                 taskMetricStore != null
-                        ? createJobVertexBackPressureInfo(
-                                taskMetricStore.getAllSubtaskMetricStores())
+                        ? createJobVertexBackPressureInfo(taskMetricStore, currentExecutionAttempts)
                         : JobVertexBackPressureInfo.deprecated());
     }
 
     private JobVertexBackPressureInfo createJobVertexBackPressureInfo(
-            Map<Integer, ComponentMetricStore> allSubtaskMetricStores) {
+            TaskMetricStore taskMetricStore, Map<Integer, Integer> currentExecutionAttempts) {
         List<SubtaskBackPressureInfo> subtaskBackPressureInfos =
-                createSubtaskBackPressureInfo(allSubtaskMetricStores);
+                createSubtaskBackPressureInfo(taskMetricStore, currentExecutionAttempts);
         return new JobVertexBackPressureInfo(
                 JobVertexBackPressureInfo.VertexBackPressureStatus.OK,
                 getBackPressureLevel(getMaxBackPressureRatio(subtaskBackPressureInfos)),
@@ -100,26 +107,72 @@ public class JobVertexBackPressureHandler
     }
 
     private List<SubtaskBackPressureInfo> createSubtaskBackPressureInfo(
-            Map<Integer, ComponentMetricStore> subtaskMetricStores) {
+            TaskMetricStore taskMetricStore, Map<Integer, Integer> currentExecutionAttempts) {
+        Map<Integer, SubtaskMetricStore> subtaskMetricStores =
+                taskMetricStore.getAllSubtaskMetricStores();
         List<SubtaskBackPressureInfo> result = new ArrayList<>(subtaskMetricStores.size());
-        for (Map.Entry<Integer, ComponentMetricStore> entry : subtaskMetricStores.entrySet()) {
+        for (Map.Entry<Integer, SubtaskMetricStore> entry : subtaskMetricStores.entrySet()) {
             int subtaskIndex = entry.getKey();
-            ComponentMetricStore subtaskMetricStore = entry.getValue();
-            double backPressureRatio = getBackPressureRatio(subtaskMetricStore);
-            double idleRatio = getIdleRatio(subtaskMetricStore);
-            double busyRatio = getBusyRatio(subtaskMetricStore);
-            result.add(
-                    new SubtaskBackPressureInfo(
-                            subtaskIndex,
-                            getBackPressureLevel(backPressureRatio),
-                            backPressureRatio,
-                            idleRatio,
-                            busyRatio));
+            SubtaskMetricStore subtaskMetricStore = entry.getValue();
+            Map<Integer, ComponentMetricStore> allAttemptsMetricStores =
+                    subtaskMetricStore.getAllAttemptsMetricStores();
+            if (allAttemptsMetricStores.isEmpty() || allAttemptsMetricStores.size() == 1) {
+                result.add(
+                        createSubtaskAttemptBackpressureInfo(
+                                subtaskIndex, null, subtaskMetricStore, null));
+            } else {
+                int currentAttempt =
+                        currentExecutionAttempts == null
+                                ? -1
+                                : currentExecutionAttempts.getOrDefault(subtaskIndex, -1);
+                if (!allAttemptsMetricStores.containsKey(currentAttempt)) {
+                    // allAttemptsMetricStores is not empty here
+                    currentAttempt = allAttemptsMetricStores.keySet().iterator().next();
+                }
+                List<SubtaskBackPressureInfo> otherConcurrentAttempts =
+                        new ArrayList<>(allAttemptsMetricStores.size() - 1);
+                for (Map.Entry<Integer, ComponentMetricStore> attemptStore :
+                        allAttemptsMetricStores.entrySet()) {
+                    if (attemptStore.getKey() == currentAttempt) {
+                        continue;
+                    }
+                    otherConcurrentAttempts.add(
+                            createSubtaskAttemptBackpressureInfo(
+                                    subtaskIndex,
+                                    attemptStore.getKey(),
+                                    attemptStore.getValue(),
+                                    null));
+                }
+                result.add(
+                        createSubtaskAttemptBackpressureInfo(
+                                subtaskIndex,
+                                currentAttempt,
+                                allAttemptsMetricStores.get(currentAttempt),
+                                otherConcurrentAttempts));
+            }
         }
         result.sort(Comparator.comparingInt(SubtaskBackPressureInfo::getSubtask));
         return result;
     }
 
+    private SubtaskBackPressureInfo createSubtaskAttemptBackpressureInfo(
+            int subtaskIndex,
+            @Nullable Integer attemptNumber,
+            ComponentMetricStore metricStore,
+            @Nullable List<SubtaskBackPressureInfo> otherConcurrentAttempts) {
+        double backPressureRatio = getBackPressureRatio(metricStore);
+        double idleRatio = getIdleRatio(metricStore);
+        double busyRatio = getBusyRatio(metricStore);
+        return new SubtaskBackPressureInfo(
+                subtaskIndex,
+                attemptNumber,
+                getBackPressureLevel(backPressureRatio),
+                backPressureRatio,
+                idleRatio,
+                busyRatio,
+                otherConcurrentAttempts);
+    }
+
     private double getMaxBackPressureRatio(List<SubtaskBackPressureInfo> subtaskBackPressureInfos) {
         return subtaskBackPressureInfos.stream()
                 .mapToDouble(backPressureInfo -> backPressureInfo.getBackPressuredRatio())
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
index e4ac7708489..5877567db77 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
@@ -120,9 +120,24 @@ public class JobVertexDetailsHandler
         for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
             final AccessExecution execution = vertex.getCurrentExecutionAttempt();
             final JobVertexID jobVertexID = jobVertex.getJobVertexId();
+
+            final Collection<AccessExecution> attempts = vertex.getCurrentExecutions();
+            List<SubtaskExecutionAttemptDetailsInfo> otherConcurrentAttempts = null;
+
+            if (attempts.size() > 1) {
+                otherConcurrentAttempts = new ArrayList<>();
+                for (AccessExecution attempt : attempts) {
+                    if (attempt.getAttemptNumber() != execution.getAttemptNumber()) {
+                        otherConcurrentAttempts.add(
+                                SubtaskExecutionAttemptDetailsInfo.create(
+                                        attempt, metricFetcher, jobID, jobVertexID, null));
+                    }
+                }
+            }
+
             subtasks.add(
                     SubtaskExecutionAttemptDetailsInfo.create(
-                            execution, metricFetcher, jobID, jobVertexID));
+                            execution, metricFetcher, jobID, jobVertexID, otherConcurrentAttempts));
         }
 
         return new JobVertexDetailsInfo(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
index 3feea441d2f..450535f8c7f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.handler.job;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
@@ -55,8 +56,10 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.stream.Collectors;
 
@@ -127,40 +130,48 @@ public class JobVertexTaskManagersHandler
             AccessExecutionJobVertex jobVertex,
             JobID jobID,
             @Nullable MetricFetcher metricFetcher) {
-        // Build a map that groups tasks by TaskManager
+        // Build a map that groups task executions by TaskManager
         Map<String, String> taskManagerId2Host = new HashMap<>();
-        Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>();
+        Map<String, List<AccessExecution>> taskManagerExecutions = new HashMap<>();
+        Set<AccessExecution> representativeExecutions = new HashSet<>();
         for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
-            TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
-            String taskManagerHost =
-                    location == null
-                            ? "(unassigned)"
-                            : location.getHostname() + ':' + location.dataPort();
-            String taskmanagerId =
-                    location == null ? "(unassigned)" : location.getResourceID().toString();
-            taskManagerId2Host.put(taskmanagerId, taskManagerHost);
-            List<AccessExecutionVertex> vertices =
-                    taskManagerVertices.computeIfAbsent(
-                            taskmanagerId, ignored -> new ArrayList<>(4));
-            vertices.add(vertex);
+            AccessExecution representativeAttempt = vertex.getCurrentExecutionAttempt();
+            representativeExecutions.add(representativeAttempt);
+
+            for (AccessExecution execution : vertex.getCurrentExecutions()) {
+                TaskManagerLocation location = execution.getAssignedResourceLocation();
+                String taskManagerHost =
+                        location == null
+                                ? "(unassigned)"
+                                : location.getHostname() + ':' + location.dataPort();
+                String taskmanagerId =
+                        location == null ? "(unassigned)" : location.getResourceID().toString();
+                taskManagerId2Host.put(taskmanagerId, taskManagerHost);
+                List<AccessExecution> executions =
+                        taskManagerExecutions.computeIfAbsent(
+                                taskmanagerId, ignored -> new ArrayList<>());
+                executions.add(execution);
+            }
         }
 
         final long now = System.currentTimeMillis();
 
         List<JobVertexTaskManagersInfo.TaskManagersInfo> taskManagersInfoList = new ArrayList<>(4);
-        for (Map.Entry<String, List<AccessExecutionVertex>> entry :
-                taskManagerVertices.entrySet()) {
+        for (Map.Entry<String, List<AccessExecution>> entry : taskManagerExecutions.entrySet()) {
             String taskmanagerId = entry.getKey();
             String host = taskManagerId2Host.get(taskmanagerId);
-            List<AccessExecutionVertex> taskVertices = entry.getValue();
+            List<AccessExecution> executions = entry.getValue();
 
             List<IOMetricsInfo> ioMetricsInfos = new ArrayList<>();
             List<Map<ExecutionState, Long>> status =
-                    taskVertices.stream()
-                            .map(AccessExecutionVertex::getCurrentExecutionAttempt)
+                    executions.stream()
                             .map(StatusDurationUtils::getExecutionStateDuration)
                             .collect(Collectors.toList());
 
+            // executionsPerState counts attempts of a subtask separately
+            int[] executionsPerState = new int[ExecutionState.values().length];
+            // tasksPerState counts only the representative attempts, and is used to aggregate the
+            // task manager state
             int[] tasksPerState = new int[ExecutionState.values().length];
 
             long startTime = Long.MAX_VALUE;
@@ -169,27 +180,32 @@ public class JobVertexTaskManagersHandler
 
             MutableIOMetrics counts = new MutableIOMetrics();
 
-            for (AccessExecutionVertex vertex : taskVertices) {
-                final ExecutionState state = vertex.getExecutionState();
-                tasksPerState[state.ordinal()]++;
+            int representativeAttemptsCount = 0;
+            for (AccessExecution execution : executions) {
+                final ExecutionState state = execution.getState();
+                executionsPerState[state.ordinal()]++;
+                if (representativeExecutions.contains(execution)) {
+                    tasksPerState[state.ordinal()]++;
+                    representativeAttemptsCount++;
+                }
 
                 // take the earliest start time
-                long started = vertex.getStateTimestamp(ExecutionState.DEPLOYING);
+                long started = execution.getStateTimestamp(ExecutionState.DEPLOYING);
                 if (started > 0) {
                     startTime = Math.min(startTime, started);
                 }
 
                 allFinished &= state.isTerminal();
-                endTime = Math.max(endTime, vertex.getStateTimestamp(state));
+                endTime = Math.max(endTime, execution.getStateTimestamp(state));
 
                 counts.addIOMetrics(
-                        vertex.getCurrentExecutionAttempt(),
+                        execution,
                         metricFetcher,
                         jobID.toString(),
                         jobVertex.getJobVertexId().toString());
                 MutableIOMetrics current = new MutableIOMetrics();
                 current.addIOMetrics(
-                        vertex.getCurrentExecutionAttempt(),
+                        execution,
                         metricFetcher,
                         jobID.toString(),
                         jobVertex.getJobVertexId().toString());
@@ -222,9 +238,10 @@ public class JobVertexTaskManagersHandler
                 duration = -1L;
             }
 
+            // Safe when tasksPerState are all zero and representativeAttemptsCount is zero
             ExecutionState jobVertexState =
                     ExecutionJobVertex.getAggregateJobVertexState(
-                            tasksPerState, taskVertices.size());
+                            tasksPerState, representativeAttemptsCount);
 
             final IOMetricsInfo jobVertexMetrics =
                     new IOMetricsInfo(
@@ -243,7 +260,7 @@ public class JobVertexTaskManagersHandler
             Map<ExecutionState, Integer> statusCounts =
                     new HashMap<>(ExecutionState.values().length);
             for (ExecutionState state : ExecutionState.values()) {
-                statusCounts.put(state, tasksPerState[state.ordinal()]);
+                statusCounts.put(state, executionsPerState[state.ordinal()]);
             }
             taskManagersInfoList.add(
                     new JobVertexTaskManagersInfo.TaskManagersInfo(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java
index ba25ac5c24f..0e52d088158 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java
@@ -37,6 +37,9 @@ import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.Preconditions;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
 
@@ -81,7 +84,21 @@ public class SubtaskCurrentAttemptDetailsHandler
         final JobID jobID = request.getPathParameter(JobIDPathParameter.class);
         final JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);
 
+        final Collection<AccessExecution> attempts = executionVertex.getCurrentExecutions();
+        List<SubtaskExecutionAttemptDetailsInfo> otherConcurrentAttempts = null;
+
+        if (attempts.size() > 1) {
+            otherConcurrentAttempts = new ArrayList<>();
+            for (AccessExecution attempt : attempts) {
+                if (attempt.getAttemptNumber() != execution.getAttemptNumber()) {
+                    otherConcurrentAttempts.add(
+                            SubtaskExecutionAttemptDetailsInfo.create(
+                                    attempt, metricFetcher, jobID, jobVertexID, null));
+                }
+            }
+        }
+
         return SubtaskExecutionAttemptDetailsInfo.create(
-                execution, metricFetcher, jobID, jobVertexID);
+                execution, metricFetcher, jobID, jobVertexID, otherConcurrentAttempts);
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
index 7093308bf3b..a188a19cb5f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -99,25 +99,25 @@ public class SubtaskExecutionAttemptAccumulatorsHandler
         List<ArchivedJson> archive = new ArrayList<>(16);
         for (AccessExecutionJobVertex task : graph.getAllVertices().values()) {
             for (AccessExecutionVertex subtask : task.getTaskVertices()) {
-                ResponseBody curAttemptJson =
-                        createAccumulatorInfo(subtask.getCurrentExecutionAttempt());
-                String curAttemptPath =
-                        getMessageHeaders()
-                                .getTargetRestEndpointURL()
-                                .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString())
-                                .replace(
-                                        ':' + JobVertexIdPathParameter.KEY,
-                                        task.getJobVertexId().toString())
-                                .replace(
-                                        ':' + SubtaskIndexPathParameter.KEY,
-                                        String.valueOf(subtask.getParallelSubtaskIndex()))
-                                .replace(
-                                        ':' + SubtaskAttemptPathParameter.KEY,
-                                        String.valueOf(
-                                                subtask.getCurrentExecutionAttempt()
-                                                        .getAttemptNumber()));
-
-                archive.add(new ArchivedJson(curAttemptPath, curAttemptJson));
+                for (AccessExecution attempt : subtask.getCurrentExecutions()) {
+                    ResponseBody curAttemptJson = createAccumulatorInfo(attempt);
+                    String curAttemptPath =
+                            getMessageHeaders()
+                                    .getTargetRestEndpointURL()
+                                    .replace(
+                                            ':' + JobIDPathParameter.KEY,
+                                            graph.getJobID().toString())
+                                    .replace(
+                                            ':' + JobVertexIdPathParameter.KEY,
+                                            task.getJobVertexId().toString())
+                                    .replace(
+                                            ':' + SubtaskIndexPathParameter.KEY,
+                                            String.valueOf(subtask.getParallelSubtaskIndex()))
+                                    .replace(
+                                            ':' + SubtaskAttemptPathParameter.KEY,
+                                            String.valueOf(attempt.getAttemptNumber()));
+                    archive.add(new ArchivedJson(curAttemptPath, curAttemptJson));
+                }
 
                 for (AccessExecution attempt :
                         subtask.getExecutionHistory().getHistoricalExecutions()) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
index 398a10c57b4..d4eaa00906e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
@@ -102,7 +102,7 @@ public class SubtaskExecutionAttemptDetailsHandler
         final JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);
 
         return SubtaskExecutionAttemptDetailsInfo.create(
-                execution, metricFetcher, jobID, jobVertexID);
+                execution, metricFetcher, jobID, jobVertexID, null);
     }
 
     @Override
@@ -111,36 +111,38 @@ public class SubtaskExecutionAttemptDetailsHandler
         List<ArchivedJson> archive = new ArrayList<>(16);
         for (AccessExecutionJobVertex task : graph.getAllVertices().values()) {
             for (AccessExecutionVertex subtask : task.getTaskVertices()) {
-                ResponseBody curAttemptJson =
-                        SubtaskExecutionAttemptDetailsInfo.create(
-                                subtask.getCurrentExecutionAttempt(),
-                                null,
-                                graph.getJobID(),
-                                task.getJobVertexId());
-                String curAttemptPath =
-                        getMessageHeaders()
-                                .getTargetRestEndpointURL()
-                                .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString())
-                                .replace(
-                                        ':' + JobVertexIdPathParameter.KEY,
-                                        task.getJobVertexId().toString())
-                                .replace(
-                                        ':' + SubtaskIndexPathParameter.KEY,
-                                        String.valueOf(subtask.getParallelSubtaskIndex()))
-                                .replace(
-                                        ':' + SubtaskAttemptPathParameter.KEY,
-                                        String.valueOf(
-                                                subtask.getCurrentExecutionAttempt()
-                                                        .getAttemptNumber()));
-
-                archive.add(new ArchivedJson(curAttemptPath, curAttemptJson));
+                for (AccessExecution attempt : subtask.getCurrentExecutions()) {
+                    ResponseBody curAttemptJson =
+                            SubtaskExecutionAttemptDetailsInfo.create(
+                                    attempt, null, graph.getJobID(), task.getJobVertexId(), null);
+                    String curAttemptPath =
+                            getMessageHeaders()
+                                    .getTargetRestEndpointURL()
+                                    .replace(
+                                            ':' + JobIDPathParameter.KEY,
+                                            graph.getJobID().toString())
+                                    .replace(
+                                            ':' + JobVertexIdPathParameter.KEY,
+                                            task.getJobVertexId().toString())
+                                    .replace(
+                                            ':' + SubtaskIndexPathParameter.KEY,
+                                            String.valueOf(subtask.getParallelSubtaskIndex()))
+                                    .replace(
+                                            ':' + SubtaskAttemptPathParameter.KEY,
+                                            String.valueOf(attempt.getAttemptNumber()));
 
+                    archive.add(new ArchivedJson(curAttemptPath, curAttemptJson));
+                }
                 for (AccessExecution attempt :
                         subtask.getExecutionHistory().getHistoricalExecutions()) {
                     if (attempt != null) {
                         ResponseBody json =
                                 SubtaskExecutionAttemptDetailsInfo.create(
-                                        attempt, null, graph.getJobID(), task.getJobVertexId());
+                                        attempt,
+                                        null,
+                                        graph.getJobID(),
+                                        task.getJobVertexId(),
+                                        null);
                         String path =
                                 getMessageHeaders()
                                         .getTargetRestEndpointURL()
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java
index 88c3cc851ad..a47ad6a46be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.handler.job;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -75,23 +76,24 @@ public class SubtasksAllAccumulatorsHandler
                 new ArrayList<>();
 
         for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
-            TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
-            String locationString = location == null ? "(unassigned)" : location.getHostname();
+            for (AccessExecution execution : vertex.getCurrentExecutions()) {
+                TaskManagerLocation location = execution.getAssignedResourceLocation();
+                String locationString = location == null ? "(unassigned)" : location.getHostname();
 
-            StringifiedAccumulatorResult[] accs =
-                    vertex.getCurrentExecutionAttempt().getUserAccumulatorsStringified();
-            List<UserAccumulator> userAccumulators = new ArrayList<>(accs.length);
-            for (StringifiedAccumulatorResult acc : accs) {
-                userAccumulators.add(
-                        new UserAccumulator(acc.getName(), acc.getType(), acc.getValue()));
-            }
+                StringifiedAccumulatorResult[] accs = execution.getUserAccumulatorsStringified();
+                List<UserAccumulator> userAccumulators = new ArrayList<>(accs.length);
+                for (StringifiedAccumulatorResult acc : accs) {
+                    userAccumulators.add(
+                            new UserAccumulator(acc.getName(), acc.getType(), acc.getValue()));
+                }
 
-            subtaskAccumulatorsInfos.add(
-                    new SubtasksAllAccumulatorsInfo.SubtaskAccumulatorsInfo(
-                            vertex.getCurrentExecutionAttempt().getParallelSubtaskIndex(),
-                            vertex.getCurrentExecutionAttempt().getAttemptNumber(),
-                            locationString,
-                            userAccumulators));
+                subtaskAccumulatorsInfos.add(
+                        new SubtasksAllAccumulatorsInfo.SubtaskAccumulatorsInfo(
+                                execution.getParallelSubtaskIndex(),
+                                execution.getAttemptNumber(),
+                                locationString,
+                                userAccumulators));
+            }
         }
 
         return new SubtasksAllAccumulatorsInfo(jobVertexId, parallelism, subtaskAccumulatorsInfos);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
index 32f04899810..ac9c8322c9c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
@@ -101,7 +101,8 @@ public class SubtasksTimesHandler
 
         int num = 0;
         for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
-
+            // Use one of the current execution attempts to represent the subtask, rather than
+            // adding times info of all attempts.
             long[] timestamps = vertex.getCurrentExecutionAttempt().getStateTimestamps();
             ExecutionState status = vertex.getExecutionState();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
index 5415c297cbf..c475b6d7cd5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
 
@@ -121,14 +122,22 @@ public class JobVertexBackPressureInfo implements ResponseBody {
     public static final class SubtaskBackPressureInfo {
 
         public static final String FIELD_NAME_SUBTASK = "subtask";
+        public static final String FIELD_NAME_ATTEMPT_NUMBER = "attempt-number";
         public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level";
         public static final String FIELD_NAME_BACK_PRESSURED_RATIO = "ratio";
         public static final String FIELD_NAME_IDLE_RATIO = "idleRatio";
         public static final String FIELD_NAME_BUSY_RATIO = "busyRatio";
+        public static final String FIELD_NAME_OTHER_CONCURRENT_ATTEMPTS =
+                "other-concurrent-attempts";
 
         @JsonProperty(FIELD_NAME_SUBTASK)
         private final int subtask;
 
+        @JsonProperty(FIELD_NAME_ATTEMPT_NUMBER)
+        @JsonInclude(Include.NON_NULL)
+        @Nullable
+        private final Integer attemptNumber;
+
         @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
         private final VertexBackPressureLevel backpressureLevel;
 
@@ -141,18 +150,30 @@ public class JobVertexBackPressureInfo implements ResponseBody {
         @JsonProperty(FIELD_NAME_BUSY_RATIO)
         private final double busyRatio;
 
+        @JsonProperty(FIELD_NAME_OTHER_CONCURRENT_ATTEMPTS)
+        @JsonInclude(Include.NON_EMPTY)
+        @Nullable
+        private final List<SubtaskBackPressureInfo> otherConcurrentAttempts;
+
+        // otherConcurrentAttempts and attemptNumber are Nullable since Jackson will assign null if
+        // the fields are absent while parsing
         public SubtaskBackPressureInfo(
                 @JsonProperty(FIELD_NAME_SUBTASK) int subtask,
+                @JsonProperty(FIELD_NAME_ATTEMPT_NUMBER) @Nullable Integer attemptNumber,
                 @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
                         VertexBackPressureLevel backpressureLevel,
                 @JsonProperty(FIELD_NAME_BACK_PRESSURED_RATIO) double backPressuredRatio,
                 @JsonProperty(FIELD_NAME_IDLE_RATIO) double idleRatio,
-                @JsonProperty(FIELD_NAME_BUSY_RATIO) double busyRatio) {
+                @JsonProperty(FIELD_NAME_BUSY_RATIO) double busyRatio,
+                @JsonProperty(FIELD_NAME_OTHER_CONCURRENT_ATTEMPTS) @Nullable
+                        List<SubtaskBackPressureInfo> otherConcurrentAttempts) {
             this.subtask = subtask;
+            this.attemptNumber = attemptNumber;
             this.backpressureLevel = checkNotNull(backpressureLevel);
             this.backPressuredRatio = backPressuredRatio;
             this.idleRatio = idleRatio;
             this.busyRatio = busyRatio;
+            this.otherConcurrentAttempts = otherConcurrentAttempts;
         }
 
         @Override
@@ -165,16 +186,24 @@ public class JobVertexBackPressureInfo implements ResponseBody {
             }
             SubtaskBackPressureInfo that = (SubtaskBackPressureInfo) o;
             return subtask == that.subtask
+                    && Objects.equals(attemptNumber, that.attemptNumber)
                     && backPressuredRatio == that.backPressuredRatio
                     && idleRatio == that.idleRatio
                     && busyRatio == that.busyRatio
-                    && Objects.equals(backpressureLevel, that.backpressureLevel);
+                    && Objects.equals(backpressureLevel, that.backpressureLevel)
+                    && Objects.equals(otherConcurrentAttempts, that.otherConcurrentAttempts);
         }
 
         @Override
         public int hashCode() {
             return Objects.hash(
-                    subtask, backpressureLevel, backPressuredRatio, idleRatio, busyRatio);
+                    subtask,
+                    attemptNumber,
+                    backpressureLevel,
+                    backPressuredRatio,
+                    idleRatio,
+                    busyRatio,
+                    otherConcurrentAttempts);
         }
 
         public int getSubtask() {
@@ -196,6 +225,16 @@ public class JobVertexBackPressureInfo implements ResponseBody {
         public double getBusyRatio() {
             return busyRatio;
         }
+
+        @Nullable
+        public Integer getAttemptNumber() {
+            return attemptNumber;
+        }
+
+        @Nullable
+        public List<SubtaskBackPressureInfo> getOtherConcurrentAttempts() {
+            return otherConcurrentAttempts;
+        }
     }
 
     /** Status of vertex back-pressure. */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
index 56577fb260b..1a8463ab559 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
@@ -30,12 +30,16 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
 import io.swagger.v3.oas.annotations.Hidden;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
@@ -66,6 +70,8 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
 
     public static final String FIELD_NAME_STATUS_DURATION = "status-duration";
 
+    public static final String FIELD_NAME_OTHER_CONCURRENT_ATTEMPTS = "other-concurrent-attempts";
+
     @JsonProperty(FIELD_NAME_SUBTASK_INDEX)
     private final int subtaskIndex;
 
@@ -100,7 +106,13 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
     @JsonProperty(FIELD_NAME_STATUS_DURATION)
     private final Map<ExecutionState, Long> statusDuration;
 
+    @JsonProperty(FIELD_NAME_OTHER_CONCURRENT_ATTEMPTS)
+    @JsonInclude(Include.NON_EMPTY)
+    @Nullable
+    private final List<SubtaskExecutionAttemptDetailsInfo> otherConcurrentAttempts;
+
     @JsonCreator
+    // blocked is Nullable since Jackson will assign null if the field is absent while parsing
     public SubtaskExecutionAttemptDetailsInfo(
             @JsonProperty(FIELD_NAME_SUBTASK_INDEX) int subtaskIndex,
             @JsonProperty(FIELD_NAME_STATUS) ExecutionState status,
@@ -111,7 +123,9 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
             @JsonProperty(FIELD_NAME_DURATION) long duration,
             @JsonProperty(FIELD_NAME_METRICS) IOMetricsInfo ioMetricsInfo,
             @JsonProperty(FIELD_NAME_TASKMANAGER_ID) String taskmanagerId,
-            @JsonProperty(FIELD_NAME_STATUS_DURATION) Map<ExecutionState, Long> statusDuration) {
+            @JsonProperty(FIELD_NAME_STATUS_DURATION) Map<ExecutionState, Long> statusDuration,
+            @JsonProperty(FIELD_NAME_OTHER_CONCURRENT_ATTEMPTS) @Nullable
+                    List<SubtaskExecutionAttemptDetailsInfo> otherConcurrentAttempts) {
 
         this.subtaskIndex = subtaskIndex;
         this.status = Preconditions.checkNotNull(status);
@@ -124,6 +138,7 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
         this.ioMetricsInfo = Preconditions.checkNotNull(ioMetricsInfo);
         this.taskmanagerId = Preconditions.checkNotNull(taskmanagerId);
         this.statusDuration = Preconditions.checkNotNull(statusDuration);
+        this.otherConcurrentAttempts = otherConcurrentAttempts;
     }
 
     public int getSubtaskIndex() {
@@ -174,51 +189,16 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
         return taskmanagerId;
     }
 
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-
-        SubtaskExecutionAttemptDetailsInfo that = (SubtaskExecutionAttemptDetailsInfo) o;
-
-        return subtaskIndex == that.subtaskIndex
-                && status == that.status
-                && attempt == that.attempt
-                && Objects.equals(host, that.host)
-                && startTime == that.startTime
-                && startTimeCompatible == that.startTimeCompatible
-                && endTime == that.endTime
-                && duration == that.duration
-                && Objects.equals(ioMetricsInfo, that.ioMetricsInfo)
-                && Objects.equals(taskmanagerId, that.taskmanagerId)
-                && Objects.equals(statusDuration, that.statusDuration);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(
-                subtaskIndex,
-                status,
-                attempt,
-                host,
-                startTime,
-                startTimeCompatible,
-                endTime,
-                duration,
-                ioMetricsInfo,
-                taskmanagerId,
-                statusDuration);
+    public List<SubtaskExecutionAttemptDetailsInfo> getOtherConcurrentAttempts() {
+        return otherConcurrentAttempts == null ? new ArrayList<>() : otherConcurrentAttempts;
     }
 
     public static SubtaskExecutionAttemptDetailsInfo create(
             AccessExecution execution,
             @Nullable MetricFetcher metricFetcher,
             JobID jobID,
-            JobVertexID jobVertexID) {
+            JobVertexID jobVertexID,
+            @Nullable List<SubtaskExecutionAttemptDetailsInfo> otherConcurrentAttempts) {
         final ExecutionState status = execution.getState();
         final long now = System.currentTimeMillis();
 
@@ -261,6 +241,49 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
                 duration,
                 ioMetricsInfo,
                 taskmanagerId,
-                getExecutionStateDuration(execution));
+                getExecutionStateDuration(execution),
+                otherConcurrentAttempts);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        SubtaskExecutionAttemptDetailsInfo that = (SubtaskExecutionAttemptDetailsInfo) o;
+
+        return subtaskIndex == that.subtaskIndex
+                && status == that.status
+                && attempt == that.attempt
+                && Objects.equals(host, that.host)
+                && startTime == that.startTime
+                && startTimeCompatible == that.startTimeCompatible
+                && endTime == that.endTime
+                && duration == that.duration
+                && Objects.equals(ioMetricsInfo, that.ioMetricsInfo)
+                && Objects.equals(taskmanagerId, that.taskmanagerId)
+                && Objects.equals(statusDuration, that.statusDuration)
+                && Objects.equals(otherConcurrentAttempts, that.otherConcurrentAttempts);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                subtaskIndex,
+                status,
+                attempt,
+                host,
+                startTime,
+                startTimeCompatible,
+                endTime,
+                duration,
+                ioMetricsInfo,
+                taskmanagerId,
+                statusDuration,
+                otherConcurrentAttempts);
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
index 76ec84509ec..45b469632b1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -257,18 +258,19 @@ public class JobVertexThreadInfoTracker<T extends Statistics> implements JobVert
                         executionVertex.getExecutionState());
                 continue;
             }
-            TaskManagerLocation tmLocation = executionVertex.getCurrentAssignedResourceLocation();
-            if (tmLocation == null) {
-                LOG.trace("ExecutionVertex {} is currently not assigned", executionVertex);
-                continue;
-            }
-            Set<ExecutionAttemptID> groupedAttemptIds =
-                    executionAttemptsByLocation.getOrDefault(tmLocation, new HashSet<>());
+            for (AccessExecution execution : executionVertex.getCurrentExecutions()) {
+                TaskManagerLocation tmLocation = execution.getAssignedResourceLocation();
+                if (tmLocation == null) {
+                    LOG.trace("ExecutionVertex {} is currently not assigned", executionVertex);
+                    continue;
+                }
+                Set<ExecutionAttemptID> groupedAttemptIds =
+                        executionAttemptsByLocation.getOrDefault(tmLocation, new HashSet<>());
 
-            ExecutionAttemptID attemptId =
-                    executionVertex.getCurrentExecutionAttempt().getAttemptId();
-            groupedAttemptIds.add(attemptId);
-            executionAttemptsByLocation.put(tmLocation, ImmutableSet.copyOf(groupedAttemptIds));
+                ExecutionAttemptID attemptId = execution.getAttemptId();
+                groupedAttemptIds.add(attemptId);
+                executionAttemptsByLocation.put(tmLocation, ImmutableSet.copyOf(groupedAttemptIds));
+            }
         }
 
         return executionAttemptsByLocation.entrySet().stream()
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
index 85bf3a44cd5..93714132681 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
@@ -46,6 +46,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
@@ -141,6 +142,63 @@ class JobVertexBackPressureHandlerTest {
                         });
     }
 
+    private static Collection<MetricDump> getMultipleAttemptsMetricDumps() {
+        Collection<MetricDump> dumps = new ArrayList<>();
+        TaskQueryScopeInfo task0 =
+                new TaskQueryScopeInfo(
+                        TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(),
+                        TEST_JOB_VERTEX_ID.toString(),
+                        0,
+                        0);
+        dumps.add(new GaugeDump(task0, MetricNames.TASK_BACK_PRESSURED_TIME, "1000"));
+        dumps.add(new GaugeDump(task0, MetricNames.TASK_IDLE_TIME, "0"));
+        dumps.add(new GaugeDump(task0, MetricNames.TASK_BUSY_TIME, "0"));
+
+        TaskQueryScopeInfo speculativeTask0 =
+                new TaskQueryScopeInfo(
+                        TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(),
+                        TEST_JOB_VERTEX_ID.toString(),
+                        0,
+                        1);
+        dumps.add(new GaugeDump(speculativeTask0, MetricNames.TASK_BACK_PRESSURED_TIME, "200"));
+        dumps.add(new GaugeDump(speculativeTask0, MetricNames.TASK_IDLE_TIME, "100"));
+        dumps.add(new GaugeDump(speculativeTask0, MetricNames.TASK_BUSY_TIME, "800"));
+
+        TaskQueryScopeInfo task1 =
+                new TaskQueryScopeInfo(
+                        TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(),
+                        TEST_JOB_VERTEX_ID.toString(),
+                        1,
+                        0);
+        dumps.add(new GaugeDump(task1, MetricNames.TASK_BACK_PRESSURED_TIME, "500"));
+        dumps.add(new GaugeDump(task1, MetricNames.TASK_IDLE_TIME, "100"));
+        dumps.add(new GaugeDump(task1, MetricNames.TASK_BUSY_TIME, "900"));
+
+        TaskQueryScopeInfo speculativeTask1 =
+                new TaskQueryScopeInfo(
+                        TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(),
+                        TEST_JOB_VERTEX_ID.toString(),
+                        1,
+                        1);
+        dumps.add(new GaugeDump(speculativeTask1, MetricNames.TASK_BACK_PRESSURED_TIME, "900"));
+        dumps.add(new GaugeDump(speculativeTask1, MetricNames.TASK_IDLE_TIME, "0"));
+        dumps.add(new GaugeDump(speculativeTask1, MetricNames.TASK_BUSY_TIME, "100"));
+
+        // missing task2
+
+        TaskQueryScopeInfo task3 =
+                new TaskQueryScopeInfo(
+                        TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(),
+                        TEST_JOB_VERTEX_ID.toString(),
+                        3,
+                        0);
+        dumps.add(new GaugeDump(task3, MetricNames.TASK_BACK_PRESSURED_TIME, "100"));
+        dumps.add(new GaugeDump(task3, MetricNames.TASK_IDLE_TIME, "200"));
+        dumps.add(new GaugeDump(task3, MetricNames.TASK_BUSY_TIME, "700"));
+
+        return dumps;
+    }
+
     @Test
     void testGetBackPressure() throws Exception {
         final Map<String, String> pathParameters = new HashMap<>();
@@ -220,4 +278,153 @@ class JobVertexBackPressureHandlerTest {
         assertThat(jobVertexBackPressureInfo.getStatus())
                 .isEqualTo(VertexBackPressureStatus.DEPRECATED);
     }
+
+    @Test
+    void testGetBackPressureFromMultipleCurrentAttempts() throws Exception {
+        MetricStore multipleAttemptsMetricStore = new MetricStore();
+        for (MetricDump metricDump : getMultipleAttemptsMetricDumps()) {
+            multipleAttemptsMetricStore.add(metricDump);
+        }
+        // Update currentExecutionAttempts directly without JobDetails.
+        Map<Integer, Integer> currentExecutionAttempts = new HashMap<>();
+        currentExecutionAttempts.put(0, 1);
+        currentExecutionAttempts.put(1, 0);
+        multipleAttemptsMetricStore
+                .getCurrentExecutionAttempts()
+                .put(
+                        TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(),
+                        Collections.singletonMap(
+                                TEST_JOB_VERTEX_ID.toString(), currentExecutionAttempts));
+
+        JobVertexBackPressureHandler jobVertexBackPressureHandler =
+                new JobVertexBackPressureHandler(
+                        () -> CompletableFuture.completedFuture(restfulGateway),
+                        Time.seconds(10),
+                        Collections.emptyMap(),
+                        JobVertexBackPressureHeaders.getInstance(),
+                        new MetricFetcher() {
+                            private long updateCount = 0;
+
+                            @Override
+                            public MetricStore getMetricStore() {
+                                return multipleAttemptsMetricStore;
+                            }
+
+                            @Override
+                            public void update() {
+                                updateCount++;
+                            }
+
+                            @Override
+                            public long getLastUpdateTime() {
+                                return updateCount;
+                            }
+                        });
+
+        final Map<String, String> pathParameters = new HashMap<>();
+        pathParameters.put(
+                JobIDPathParameter.KEY, TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString());
+        pathParameters.put(JobVertexIdPathParameter.KEY, TEST_JOB_VERTEX_ID.toString());
+
+        final HandlerRequest<EmptyRequestBody> request =
+                HandlerRequest.resolveParametersAndCreate(
+                        EmptyRequestBody.getInstance(),
+                        new JobVertexMessageParameters(),
+                        pathParameters,
+                        Collections.emptyMap(),
+                        Collections.emptyList());
+
+        final CompletableFuture<JobVertexBackPressureInfo>
+                jobVertexBackPressureInfoCompletableFuture =
+                        jobVertexBackPressureHandler.handleRequest(request, restfulGateway);
+        final JobVertexBackPressureInfo jobVertexBackPressureInfo =
+                jobVertexBackPressureInfoCompletableFuture.get();
+
+        assertThat(jobVertexBackPressureInfo.getStatus()).isEqualTo(VertexBackPressureStatus.OK);
+        assertThat(jobVertexBackPressureInfo.getBackpressureLevel()).isEqualTo(LOW);
+
+        assertThat(
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                .map(SubtaskBackPressureInfo::getAttemptNumber)
+                                .collect(Collectors.toList()))
+                .containsExactly(1, 0, null);
+        assertThat(
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                .map(SubtaskBackPressureInfo::getOtherConcurrentAttempts)
+                                .filter(Objects::nonNull)
+                                .flatMap(Collection::stream)
+                                .map(SubtaskBackPressureInfo::getAttemptNumber)
+                                .collect(Collectors.toList()))
+                .containsExactly(0, 1);
+
+        assertThat(
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                .map(SubtaskBackPressureInfo::getBackPressuredRatio)
+                                .collect(Collectors.toList()))
+                .containsExactly(0.2, 0.5, 0.1);
+        assertThat(
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                .map(SubtaskBackPressureInfo::getOtherConcurrentAttempts)
+                                .filter(Objects::nonNull)
+                                .flatMap(Collection::stream)
+                                .map(SubtaskBackPressureInfo::getBackPressuredRatio)
+                                .collect(Collectors.toList()))
+                .containsExactly(1.0, 0.9);
+
+        assertThat(
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                .map(SubtaskBackPressureInfo::getIdleRatio)
+                                .collect(Collectors.toList()))
+                .containsExactly(0.1, 0.1, 0.2);
+        assertThat(
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                .map(SubtaskBackPressureInfo::getOtherConcurrentAttempts)
+                                .filter(Objects::nonNull)
+                                .flatMap(Collection::stream)
+                                .map(SubtaskBackPressureInfo::getIdleRatio)
+                                .collect(Collectors.toList()))
+                .containsExactly(0.0, 0.0);
+
+        assertThat(
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                .map(SubtaskBackPressureInfo::getBusyRatio)
+                                .collect(Collectors.toList()))
+                .containsExactly(0.8, 0.9, 0.7);
+        assertThat(
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                .map(SubtaskBackPressureInfo::getOtherConcurrentAttempts)
+                                .filter(Objects::nonNull)
+                                .flatMap(Collection::stream)
+                                .map(SubtaskBackPressureInfo::getBusyRatio)
+                                .collect(Collectors.toList()))
+                .containsExactly(0.0, 0.1);
+
+        assertThat(
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                .map(SubtaskBackPressureInfo::getBackpressureLevel)
+                                .collect(Collectors.toList()))
+                .containsExactly(LOW, LOW, OK);
+        assertThat(
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                .map(SubtaskBackPressureInfo::getOtherConcurrentAttempts)
+                                .filter(Objects::nonNull)
+                                .flatMap(Collection::stream)
+                                .map(SubtaskBackPressureInfo::getBackpressureLevel)
+                                .collect(Collectors.toList()))
+                .containsExactly(HIGH, HIGH);
+
+        assertThat(
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                .map(SubtaskBackPressureInfo::getSubtask)
+                                .collect(Collectors.toList()))
+                .containsExactly(0, 1, 3);
+        assertThat(
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                .map(SubtaskBackPressureInfo::getOtherConcurrentAttempts)
+                                .filter(Objects::nonNull)
+                                .flatMap(Collection::stream)
+                                .map(SubtaskBackPressureInfo::getSubtask)
+                                .collect(Collectors.toList()))
+                .containsExactly(0, 1);
+    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
index 4c267329a74..70f46c1d302 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -192,7 +192,8 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
                         finishedTs - deployingTs,
                         ioMetricsInfo,
                         assignedResourceLocation.getResourceID().getResourceIdString(),
-                        statusDuration);
+                        statusDuration,
+                        null);
 
         assertEquals(expectedDetailsInfo, detailsInfo);
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
index a044f3f87ff..40f2286d722 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -195,7 +195,8 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
                         -1L,
                         ioMetricsInfo,
                         "(unassigned)",
-                        statusDuration);
+                        statusDuration,
+                        null);
 
         assertEquals(expectedDetailsInfo, detailsInfo);
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/AggregatedTaskDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/AggregatedTaskDetailsInfoTest.java
index 3490a05d029..df3e3fef220 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/AggregatedTaskDetailsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/AggregatedTaskDetailsInfoTest.java
@@ -77,7 +77,8 @@ public class AggregatedTaskDetailsInfoTest
                                 Math.abs(random.nextLong()),
                                 ioMetricsInfo,
                                 "taskmanagerId",
-                                statusDuration)));
+                                statusDuration,
+                                null)));
     }
 
     @Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfoTest.java
index 0dc78e48a54..566f668837a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfoTest.java
@@ -34,13 +34,31 @@ public class JobVertexBackPressureInfoTest
         List<JobVertexBackPressureInfo.SubtaskBackPressureInfo> subtaskList = new ArrayList<>();
         subtaskList.add(
                 new JobVertexBackPressureInfo.SubtaskBackPressureInfo(
-                        0, JobVertexBackPressureInfo.VertexBackPressureLevel.LOW, 0.1, 0.5, 0.4));
+                        0,
+                        0,
+                        JobVertexBackPressureInfo.VertexBackPressureLevel.LOW,
+                        0.1,
+                        0.5,
+                        0.4,
+                        null));
         subtaskList.add(
                 new JobVertexBackPressureInfo.SubtaskBackPressureInfo(
-                        1, JobVertexBackPressureInfo.VertexBackPressureLevel.OK, 0.4, 0.3, 0.3));
+                        1,
+                        0,
+                        JobVertexBackPressureInfo.VertexBackPressureLevel.OK,
+                        0.4,
+                        0.3,
+                        0.3,
+                        null));
         subtaskList.add(
                 new JobVertexBackPressureInfo.SubtaskBackPressureInfo(
-                        2, JobVertexBackPressureInfo.VertexBackPressureLevel.HIGH, 0.9, 0.0, 0.1));
+                        2,
+                        0,
+                        JobVertexBackPressureInfo.VertexBackPressureLevel.HIGH,
+                        0.9,
+                        0.0,
+                        0.1,
+                        null));
         return new JobVertexBackPressureInfo(
                 JobVertexBackPressureInfo.VertexBackPressureStatus.OK,
                 JobVertexBackPressureInfo.VertexBackPressureLevel.LOW,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java
index 11682f37ceb..00b12f7c68d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetails
 import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -71,11 +72,12 @@ public class JobVertexDetailsInfoTest
                         1L,
                         jobVertexMetrics,
                         "taskmanagerId1",
-                        statusDuration));
+                        statusDuration,
+                        null));
         vertexTaskDetailList.add(
                 new SubtaskExecutionAttemptDetailsInfo(
                         1,
-                        ExecutionState.FAILED,
+                        ExecutionState.RUNNING,
                         random.nextInt(),
                         "local2",
                         System.currentTimeMillis(),
@@ -83,7 +85,20 @@ public class JobVertexDetailsInfoTest
                         1L,
                         jobVertexMetrics,
                         "taskmanagerId2",
-                        statusDuration));
+                        statusDuration,
+                        Collections.singletonList(
+                                new SubtaskExecutionAttemptDetailsInfo(
+                                        1,
+                                        ExecutionState.FAILED,
+                                        random.nextInt(),
+                                        "local2",
+                                        System.currentTimeMillis(),
+                                        System.currentTimeMillis(),
+                                        1L,
+                                        jobVertexMetrics,
+                                        "taskmanagerId2",
+                                        statusDuration,
+                                        null))));
         vertexTaskDetailList.add(
                 new SubtaskExecutionAttemptDetailsInfo(
                         2,
@@ -95,7 +110,8 @@ public class JobVertexDetailsInfoTest
                         1L,
                         jobVertexMetrics,
                         "taskmanagerId3",
-                        statusDuration));
+                        statusDuration,
+                        null));
 
         int parallelism = 1 + (random.nextInt() / 3);
         return new JobVertexDetailsInfo(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java
index 48158c05b56..c4110811e0c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java
@@ -70,6 +70,7 @@ public class SubtaskExecutionAttemptDetailsInfoTest
                 Math.abs(random.nextLong()),
                 ioMetricsInfo,
                 "taskmanagerId",
-                statusDuration);
+                statusDuration,
+                null);
     }
 }