You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/08/01 10:30:58 UTC

[flink] 05/06: [FLINK-28588][rest] MetricStore supports to store and query metrics of multiple execution attempts of a subtask.

This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1a48fd53bd317ac2102adb00c1209350b57a687e
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Tue Jul 26 16:54:10 2022 +0800

    [FLINK-28588][rest] MetricStore supports to store and query metrics of multiple execution attempts of a subtask.
---
 .../history/HistoryServerArchiveFetcher.java       |   3 +-
 .../runtime/messages/webmonitor/JobDetails.java    |  98 ++++++++++-
 .../metrics/dump/MetricDumpSerialization.java      |  10 +-
 .../flink/runtime/metrics/dump/QueryScopeInfo.java |  28 +++-
 .../groups/InternalOperatorMetricGroup.java        |   1 +
 .../runtime/metrics/groups/TaskMetricGroup.java    |   5 +-
 .../handler/legacy/metrics/MetricFetcherImpl.java  |   1 +
 .../rest/handler/legacy/metrics/MetricStore.java   | 186 +++++++++++++++++----
 .../rest/handler/util/MutableIOMetrics.java        |   7 +-
 .../messages/webmonitor/JobDetailsTest.java        |  31 ++++
 .../metrics/dump/MetricDumpSerializerTest.java     |   5 +-
 .../runtime/metrics/dump/QueryScopeInfoTest.java   |  10 +-
 .../job/JobVertexBackPressureHandlerTest.java      |   7 +-
 .../AggregatingSubtasksMetricsHandlerTest.java     |   6 +-
 .../job/metrics/JobVertexMetricsHandlerTest.java   |   4 +-
 .../job/metrics/SubtaskMetricsHandlerTest.java     |   4 +-
 .../handler/legacy/metrics/MetricFetcherTest.java  |   5 +-
 .../handler/legacy/metrics/MetricStoreTest.java    |  61 ++++++-
 18 files changed, 408 insertions(+), 64 deletions(-)

diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index eb5a34b2c2d..8e5aee9c633 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -425,7 +425,8 @@ class HistoryServerArchiveFetcher {
                         state,
                         lastMod,
                         tasksPerState,
-                        numTasks);
+                        numTasks,
+                        new HashMap<>());
         MultipleJobsDetails multipleJobsDetails =
                 new MultipleJobsDetails(Collections.singleton(jobDetails));
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
index f9b609ac1cf..74b2964228c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.messages.webmonitor;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -39,6 +40,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -57,6 +60,8 @@ public class JobDetails implements Serializable {
     private static final String FIELD_NAME_STATUS = "state";
     private static final String FIELD_NAME_LAST_MODIFICATION = "last-modification";
     private static final String FIELD_NAME_TOTAL_NUMBER_TASKS = "total";
+    private static final String FIELD_NAME_CURRENT_EXECUTION_ATTEMPTS =
+            "current-execution-attempts";
 
     private final JobID jobId;
 
@@ -76,6 +81,15 @@ public class JobDetails implements Serializable {
 
     private final int numTasks;
 
+    /**
+     * The map holds the attempt number of the current execution attempt in the Execution, which is
+     * considered as the representing execution for the subtask of the vertex. The keys and values
+     * are JobVertexID -> SubtaskIndex -> CurrentExecutionAttemptNumber. It is used to accumulate
+     * the metrics of a subtask in MetricFetcher.
+     */
+    private final Map<String, Map<Integer, Integer>> currentExecutionAttempts;
+
+    @VisibleForTesting
     public JobDetails(
             JobID jobId,
             String jobName,
@@ -86,7 +100,30 @@ public class JobDetails implements Serializable {
             long lastUpdateTime,
             int[] tasksPerState,
             int numTasks) {
+        this(
+                jobId,
+                jobName,
+                startTime,
+                endTime,
+                duration,
+                status,
+                lastUpdateTime,
+                tasksPerState,
+                numTasks,
+                new HashMap<>());
+    }
 
+    public JobDetails(
+            JobID jobId,
+            String jobName,
+            long startTime,
+            long endTime,
+            long duration,
+            JobStatus status,
+            long lastUpdateTime,
+            int[] tasksPerState,
+            int numTasks,
+            Map<String, Map<Integer, Integer>> currentExecutionAttempts) {
         this.jobId = checkNotNull(jobId);
         this.jobName = checkNotNull(jobName);
         this.startTime = startTime;
@@ -100,6 +137,7 @@ public class JobDetails implements Serializable {
                 ExecutionState.values().length);
         this.tasksPerState = checkNotNull(tasksPerState);
         this.numTasks = numTasks;
+        this.currentExecutionAttempts = checkNotNull(currentExecutionAttempts);
     }
 
     public static JobDetails createDetailsForJob(AccessExecutionGraph job) {
@@ -112,16 +150,27 @@ public class JobDetails implements Serializable {
         int[] countsPerStatus = new int[ExecutionState.values().length];
         long lastChanged = 0;
         int numTotalTasks = 0;
+        Map<String, Map<Integer, Integer>> currentExecutionAttempts = new HashMap<>();
 
         for (AccessExecutionJobVertex ejv : job.getVerticesTopologically()) {
             AccessExecutionVertex[] taskVertices = ejv.getTaskVertices();
             numTotalTasks += taskVertices.length;
+            Map<Integer, Integer> vertexAttempts = new HashMap<>();
 
             for (AccessExecutionVertex taskVertex : taskVertices) {
+                if (taskVertex.getCurrentExecutions().size() > 1) {
+                    vertexAttempts.put(
+                            taskVertex.getParallelSubtaskIndex(),
+                            taskVertex.getCurrentExecutionAttempt().getAttemptNumber());
+                }
                 ExecutionState state = taskVertex.getExecutionState();
                 countsPerStatus[state.ordinal()]++;
                 lastChanged = Math.max(lastChanged, taskVertex.getStateTimestamp(state));
             }
+
+            if (!vertexAttempts.isEmpty()) {
+                currentExecutionAttempts.put(String.valueOf(ejv.getJobVertexId()), vertexAttempts);
+            }
         }
 
         lastChanged = Math.max(lastChanged, finished);
@@ -135,7 +184,8 @@ public class JobDetails implements Serializable {
                 status,
                 lastChanged,
                 countsPerStatus,
-                numTotalTasks);
+                numTotalTasks,
+                currentExecutionAttempts);
     }
 
     // ------------------------------------------------------------------------
@@ -176,6 +226,9 @@ public class JobDetails implements Serializable {
         return tasksPerState;
     }
 
+    public Map<String, Map<Integer, Integer>> getCurrentExecutionAttempts() {
+        return currentExecutionAttempts;
+    }
     // ------------------------------------------------------------------------
 
     @Override
@@ -192,7 +245,8 @@ public class JobDetails implements Serializable {
                     && this.status == that.status
                     && this.jobId.equals(that.jobId)
                     && this.jobName.equals(that.jobName)
-                    && Arrays.equals(this.tasksPerState, that.tasksPerState);
+                    && Arrays.equals(this.tasksPerState, that.tasksPerState)
+                    && this.currentExecutionAttempts.equals(that.currentExecutionAttempts);
         } else {
             return false;
         }
@@ -208,6 +262,7 @@ public class JobDetails implements Serializable {
         result = 31 * result + (int) (lastUpdateTime ^ (lastUpdateTime >>> 32));
         result = 31 * result + Arrays.hashCode(tasksPerState);
         result = 31 * result + numTasks;
+        result = 31 * result + currentExecutionAttempts.hashCode();
         return result;
     }
 
@@ -271,6 +326,20 @@ public class JobDetails implements Serializable {
 
             jsonGenerator.writeEndObject();
 
+            if (!jobDetails.currentExecutionAttempts.isEmpty()) {
+                jsonGenerator.writeObjectFieldStart(FIELD_NAME_CURRENT_EXECUTION_ATTEMPTS);
+                for (Map.Entry<String, Map<Integer, Integer>> vertex :
+                        jobDetails.currentExecutionAttempts.entrySet()) {
+                    jsonGenerator.writeObjectFieldStart(vertex.getKey());
+                    for (Map.Entry<Integer, Integer> attempt : vertex.getValue().entrySet()) {
+                        jsonGenerator.writeNumberField(
+                                String.valueOf(attempt.getKey()), attempt.getValue());
+                    }
+                    jsonGenerator.writeEndObject();
+                }
+                jsonGenerator.writeEndObject();
+            }
+
             jsonGenerator.writeEndObject();
         }
     }
@@ -310,6 +379,28 @@ public class JobDetails implements Serializable {
                         jsonNode == null ? 0 : jsonNode.intValue();
             }
 
+            Map<String, Map<Integer, Integer>> attempts = new HashMap<>();
+            JsonNode attemptsNode = rootNode.get(FIELD_NAME_CURRENT_EXECUTION_ATTEMPTS);
+            if (attemptsNode != null) {
+                attemptsNode
+                        .fields()
+                        .forEachRemaining(
+                                vertex -> {
+                                    String vertexId = vertex.getKey();
+                                    Map<Integer, Integer> vertexAttempts =
+                                            attempts.computeIfAbsent(
+                                                    vertexId, k -> new HashMap<>());
+                                    vertex.getValue()
+                                            .fields()
+                                            .forEachRemaining(
+                                                    attempt ->
+                                                            vertexAttempts.put(
+                                                                    Integer.parseInt(
+                                                                            attempt.getKey()),
+                                                                    attempt.getValue().intValue()));
+                                });
+            }
+
             return new JobDetails(
                     jobId,
                     jobName,
@@ -319,7 +410,8 @@ public class JobDetails implements Serializable {
                     jobStatus,
                     lastUpdateTime,
                     numVerticesPerExecutionState,
-                    numTasks);
+                    numTasks,
+                    attempts);
         }
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
index de423dadf9b..90187fed27f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
@@ -240,6 +240,7 @@ public class MetricDumpSerialization {
                 out.writeUTF(taskInfo.jobID);
                 out.writeUTF(taskInfo.vertexID);
                 out.writeInt(taskInfo.subtaskIndex);
+                out.writeInt(taskInfo.attemptNumber);
                 break;
             case INFO_CATEGORY_OPERATOR:
                 QueryScopeInfo.OperatorQueryScopeInfo operatorInfo =
@@ -247,6 +248,7 @@ public class MetricDumpSerialization {
                 out.writeUTF(operatorInfo.jobID);
                 out.writeUTF(operatorInfo.vertexID);
                 out.writeInt(operatorInfo.subtaskIndex);
+                out.writeInt(operatorInfo.attemptNumber);
                 out.writeUTF(operatorInfo.operatorName);
                 break;
             default:
@@ -436,6 +438,7 @@ public class MetricDumpSerialization {
         String jobID;
         String vertexID;
         int subtaskIndex;
+        int attemptNumber;
 
         String scope = dis.readUTF();
         byte cat = dis.readByte();
@@ -452,14 +455,17 @@ public class MetricDumpSerialization {
                 jobID = dis.readUTF();
                 vertexID = dis.readUTF();
                 subtaskIndex = dis.readInt();
-                return new QueryScopeInfo.TaskQueryScopeInfo(jobID, vertexID, subtaskIndex, scope);
+                attemptNumber = dis.readInt();
+                return new QueryScopeInfo.TaskQueryScopeInfo(
+                        jobID, vertexID, subtaskIndex, attemptNumber, scope);
             case INFO_CATEGORY_OPERATOR:
                 jobID = dis.readUTF();
                 vertexID = dis.readUTF();
                 subtaskIndex = dis.readInt();
+                attemptNumber = dis.readInt();
                 String operatorName = dis.readUTF();
                 return new QueryScopeInfo.OperatorQueryScopeInfo(
-                        jobID, vertexID, subtaskIndex, operatorName, scope);
+                        jobID, vertexID, subtaskIndex, attemptNumber, operatorName, scope);
             default:
                 throw new IOException("Unknown scope category: " + cat);
         }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
index c61a9d76412..d0d9652f627 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
@@ -142,22 +142,30 @@ public abstract class QueryScopeInfo {
         public final String jobID;
         public final String vertexID;
         public final int subtaskIndex;
+        public final int attemptNumber;
 
-        public TaskQueryScopeInfo(String jobID, String vertexid, int subtaskIndex) {
-            this(jobID, vertexid, subtaskIndex, "");
+        public TaskQueryScopeInfo(
+                String jobID, String vertexid, int subtaskIndex, int attemptNumber) {
+            this(jobID, vertexid, subtaskIndex, attemptNumber, "");
         }
 
-        public TaskQueryScopeInfo(String jobID, String vertexid, int subtaskIndex, String scope) {
+        public TaskQueryScopeInfo(
+                String jobID, String vertexid, int subtaskIndex, int attemptNumber, String scope) {
             super(scope);
             this.jobID = jobID;
             this.vertexID = vertexid;
             this.subtaskIndex = subtaskIndex;
+            this.attemptNumber = attemptNumber;
         }
 
         @Override
         public TaskQueryScopeInfo copy(String additionalScope) {
             return new TaskQueryScopeInfo(
-                    this.jobID, this.vertexID, this.subtaskIndex, concatScopes(additionalScope));
+                    this.jobID,
+                    this.vertexID,
+                    this.subtaskIndex,
+                    this.attemptNumber,
+                    concatScopes(additionalScope));
         }
 
         @Override
@@ -174,23 +182,30 @@ public abstract class QueryScopeInfo {
         public final String jobID;
         public final String vertexID;
         public final int subtaskIndex;
+        public final int attemptNumber;
         public final String operatorName;
 
         public OperatorQueryScopeInfo(
-                String jobID, String vertexid, int subtaskIndex, String operatorName) {
-            this(jobID, vertexid, subtaskIndex, operatorName, "");
+                String jobID,
+                String vertexid,
+                int subtaskIndex,
+                int attemptNumber,
+                String operatorName) {
+            this(jobID, vertexid, subtaskIndex, attemptNumber, operatorName, "");
         }
 
         public OperatorQueryScopeInfo(
                 String jobID,
                 String vertexid,
                 int subtaskIndex,
+                int attemptNumber,
                 String operatorName,
                 String scope) {
             super(scope);
             this.jobID = jobID;
             this.vertexID = vertexid;
             this.subtaskIndex = subtaskIndex;
+            this.attemptNumber = attemptNumber;
             this.operatorName = operatorName;
         }
 
@@ -200,6 +215,7 @@ public abstract class QueryScopeInfo {
                     this.jobID,
                     this.vertexID,
                     this.subtaskIndex,
+                    this.attemptNumber,
                     this.operatorName,
                     concatScopes(additionalScope));
         }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorMetricGroup.java
index 68a85ba5f2f..d075675b80c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorMetricGroup.java
@@ -75,6 +75,7 @@ public class InternalOperatorMetricGroup extends ComponentMetricGroup<TaskMetric
                 this.parent.parent.jobId.toString(),
                 this.parent.vertexId.toString(),
                 this.parent.subtaskIndex,
+                this.parent.attemptNumber(),
                 filter.filterCharacters(this.operatorName));
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
index 0dc0ff9b6c5..afcbbaa44bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
@@ -134,7 +134,10 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
     protected QueryScopeInfo.TaskQueryScopeInfo createQueryServiceMetricInfo(
             CharacterFilter filter) {
         return new QueryScopeInfo.TaskQueryScopeInfo(
-                this.parent.jobId.toString(), String.valueOf(this.vertexId), this.subtaskIndex);
+                this.parent.jobId.toString(),
+                String.valueOf(this.vertexId),
+                this.subtaskIndex,
+                this.attemptNumber);
     }
 
     // ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java
index beb652f97c7..65d2e162532 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java
@@ -143,6 +143,7 @@ public class MetricFetcherImpl<T extends RestfulGateway> implements MetricFetche
                                     toRetain.add(job.getJobId().toString());
                                 }
                                 metrics.retainJobs(toRetain);
+                                metrics.updateCurrentExecutionAttempts(jobDetails.getJobs());
                             }
                         },
                         executor);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
index 93f289fa036..c86ac763f9c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rest.handler.legacy.metrics;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.metrics.dump.MetricDump;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 
@@ -27,8 +28,10 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.ThreadSafe;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -54,6 +57,15 @@ public class MetricStore {
     private final Map<String, TaskManagerMetricStore> taskManagers = new ConcurrentHashMap<>();
     private final Map<String, JobMetricStore> jobs = new ConcurrentHashMap<>();
 
+    /**
+     * The map holds the attempt number of the representing execution for each subtask of each
+     * vertex. The keys and values are JobID -> JobVertexID -> SubtaskIndex ->
+     * CurrentExecutionAttemptNumber. When a metric of an execution attempt is added, the metric can
+     * also be added to the SubtaskMetricStore when it is of the representing execution.
+     */
+    private final Map<String, Map<String, Map<Integer, Integer>>> currentExecutionAttempts =
+            new ConcurrentHashMap<>();
+
     /**
      * Remove inactive task managers.
      *
@@ -70,6 +82,18 @@ public class MetricStore {
      */
     synchronized void retainJobs(List<String> activeJobs) {
         jobs.keySet().retainAll(activeJobs);
+        currentExecutionAttempts.keySet().retainAll(activeJobs);
+    }
+
+    public synchronized void updateCurrentExecutionAttempts(Collection<JobDetails> jobs) {
+        jobs.forEach(
+                job ->
+                        currentExecutionAttempts.put(
+                                job.getJobId().toString(), job.getCurrentExecutionAttempts()));
+    }
+
+    public Map<String, Map<String, Map<Integer, Integer>>> getCurrentExecutionAttempts() {
+        return currentExecutionAttempts;
     }
 
     /**
@@ -153,7 +177,24 @@ public class MetricStore {
         if (task == null) {
             return null;
         }
-        return ComponentMetricStore.unmodifiable(task.getSubtaskMetricStore(subtaskIndex));
+        return SubtaskMetricStore.unmodifiable(task.getSubtaskMetricStore(subtaskIndex));
+    }
+
+    public synchronized ComponentMetricStore getSubtaskAttemptMetricStore(
+            String jobID, String taskID, int subtaskIndex, int attemptNumber) {
+        JobMetricStore job = jobID == null ? null : jobs.get(jobID);
+        if (job == null) {
+            return null;
+        }
+        TaskMetricStore task = job.getTaskMetricStore(taskID);
+        if (task == null) {
+            return null;
+        }
+        SubtaskMetricStore subtask = task.getSubtaskMetricStore(subtaskIndex);
+        if (subtask == null) {
+            return null;
+        }
+        return ComponentMetricStore.unmodifiable(subtask.getAttemptsMetricStore(attemptNumber));
     }
 
     public synchronized Map<String, JobMetricStore> getJobs() {
@@ -177,7 +218,9 @@ public class MetricStore {
             TaskManagerMetricStore tm;
             JobMetricStore job;
             TaskMetricStore task;
-            ComponentMetricStore subtask;
+            SubtaskMetricStore subtask;
+            ComponentMetricStore attempt;
+            boolean isRepresentativeAttempt;
 
             String name = info.scope.isEmpty() ? metric.name : info.scope + "." + metric.name;
 
@@ -214,15 +257,34 @@ public class MetricStore {
                     task = job.tasks.computeIfAbsent(taskInfo.vertexID, k -> new TaskMetricStore());
                     subtask =
                             task.subtasks.computeIfAbsent(
-                                    taskInfo.subtaskIndex, k -> new ComponentMetricStore());
-                    /**
-                     * The duplication is intended. Metrics scoped by subtask are useful for several
-                     * job/task handlers, while the WebInterface task metric queries currently do
-                     * not account for subtasks, so we don't divide by subtask and instead use the
-                     * concatenation of subtask index and metric name as the name for those.
-                     */
-                    addMetric(subtask.metrics, name, metric);
-                    addMetric(task.metrics, taskInfo.subtaskIndex + "." + name, metric);
+                                    taskInfo.subtaskIndex, k -> new SubtaskMetricStore());
+
+                    // The attempt is the representative one if the current execution attempt
+                    // number for the subtask is not present in the currentExecutionAttempts,
+                    // which means there should be only one execution
+                    isRepresentativeAttempt =
+                            isRepresentativeAttempt(
+                                    taskInfo.jobID,
+                                    taskInfo.vertexID,
+                                    taskInfo.subtaskIndex,
+                                    taskInfo.attemptNumber);
+                    attempt =
+                            subtask.attempts.computeIfAbsent(
+                                    taskInfo.attemptNumber, k -> new ComponentMetricStore());
+                    addMetric(attempt.metrics, name, metric);
+                    // If the attempt is representative one, its metrics can be updated to the
+                    // subtask and task metric store.
+                    if (isRepresentativeAttempt) {
+                        /**
+                         * The duplication is intended. Metrics scoped by subtask are useful for
+                         * several job/task handlers, while the WebInterface task metric queries
+                         * currently do not account for subtasks, so we don't divide by subtask and
+                         * instead use the concatenation of subtask index and metric name as the
+                         * name for those.
+                         */
+                        addMetric(subtask.metrics, name, metric);
+                        addMetric(task.metrics, taskInfo.subtaskIndex + "." + name, metric);
+                    }
                     break;
                 case INFO_CATEGORY_OPERATOR:
                     QueryScopeInfo.OperatorQueryScopeInfo operatorInfo =
@@ -233,21 +295,38 @@ public class MetricStore {
                                     operatorInfo.vertexID, k -> new TaskMetricStore());
                     subtask =
                             task.subtasks.computeIfAbsent(
-                                    operatorInfo.subtaskIndex, k -> new ComponentMetricStore());
-                    /**
-                     * As the WebInterface does not account for operators (because it can't) we
-                     * don't divide by operator and instead use the concatenation of subtask index,
-                     * operator name and metric name as the name.
-                     */
-                    addMetric(subtask.metrics, operatorInfo.operatorName + "." + name, metric);
-                    addMetric(
-                            task.metrics,
-                            operatorInfo.subtaskIndex
-                                    + "."
-                                    + operatorInfo.operatorName
-                                    + "."
-                                    + name,
-                            metric);
+                                    operatorInfo.subtaskIndex, k -> new SubtaskMetricStore());
+
+                    isRepresentativeAttempt =
+                            isRepresentativeAttempt(
+                                    operatorInfo.jobID,
+                                    operatorInfo.vertexID,
+                                    operatorInfo.subtaskIndex,
+                                    operatorInfo.attemptNumber);
+
+                    attempt =
+                            subtask.attempts.computeIfAbsent(
+                                    operatorInfo.attemptNumber, k -> new ComponentMetricStore());
+                    addMetric(attempt.metrics, operatorInfo.operatorName + "." + name, metric);
+
+                    // If the attempt is representative one, its metrics can be updated to the
+                    // subtask and task metric store.
+                    if (isRepresentativeAttempt) {
+                        /**
+                         * As the WebInterface does not account for operators (because it can't) we
+                         * don't divide by operator and instead use the concatenation of subtask
+                         * index, operator name and metric name as the name.
+                         */
+                        addMetric(subtask.metrics, operatorInfo.operatorName + "." + name, metric);
+                        addMetric(
+                                task.metrics,
+                                operatorInfo.subtaskIndex
+                                        + "."
+                                        + operatorInfo.operatorName
+                                        + "."
+                                        + name,
+                                metric);
+                    }
                     break;
                 default:
                     LOG.debug("Invalid metric dump category: " + info.getCategory());
@@ -257,6 +336,19 @@ public class MetricStore {
         }
     }
 
+    // Returns whether the attempt is the representative one. It's also true if the current
+    // execution attempt number for the subtask is not present in the currentExecutionAttempts,
+    // which means there should be only one execution
+    private boolean isRepresentativeAttempt(
+            String jobID, String vertexID, int subtaskIndex, int attemptNumber) {
+        return Optional.of(currentExecutionAttempts)
+                        .map(m -> m.get(jobID))
+                        .map(m -> m.get(vertexID))
+                        .map(m -> m.get(subtaskIndex))
+                        .orElse(attemptNumber)
+                == attemptNumber;
+    }
+
     private void addMetric(Map<String, String> target, String name, MetricDump metric) {
         switch (metric.getCategory()) {
             case METRIC_CATEGORY_COUNTER:
@@ -363,24 +455,24 @@ public class MetricStore {
     /** Sub-structure containing metrics of a single Task. */
     @ThreadSafe
     public static class TaskMetricStore extends ComponentMetricStore {
-        private final Map<Integer, ComponentMetricStore> subtasks;
+        private final Map<Integer, SubtaskMetricStore> subtasks;
 
         private TaskMetricStore() {
             this(new ConcurrentHashMap<>(), new ConcurrentHashMap<>());
         }
 
         private TaskMetricStore(
-                Map<String, String> metrics, Map<Integer, ComponentMetricStore> subtasks) {
+                Map<String, String> metrics, Map<Integer, SubtaskMetricStore> subtasks) {
             super(metrics);
             this.subtasks = checkNotNull(subtasks);
         }
 
-        public ComponentMetricStore getSubtaskMetricStore(int subtaskIndex) {
+        public SubtaskMetricStore getSubtaskMetricStore(int subtaskIndex) {
             return subtasks.get(subtaskIndex);
         }
 
-        public Map<Integer, ComponentMetricStore> getAllSubtaskMetricStores() {
-            return subtasks;
+        public Map<Integer, SubtaskMetricStore> getAllSubtaskMetricStores() {
+            return unmodifiableMap(subtasks);
         }
 
         private static TaskMetricStore unmodifiable(TaskMetricStore source) {
@@ -391,4 +483,36 @@ public class MetricStore {
                     unmodifiableMap(source.metrics), unmodifiableMap(source.subtasks));
         }
     }
+
+    /** Sub-structure containing metrics of a single subtask. */
+    @ThreadSafe
+    public static class SubtaskMetricStore extends ComponentMetricStore {
+        private final Map<Integer, ComponentMetricStore> attempts;
+
+        private SubtaskMetricStore() {
+            this(new ConcurrentHashMap<>(), new ConcurrentHashMap<>());
+        }
+
+        private SubtaskMetricStore(
+                Map<String, String> metrics, Map<Integer, ComponentMetricStore> attempts) {
+            super(metrics);
+            this.attempts = checkNotNull(attempts);
+        }
+
+        public ComponentMetricStore getAttemptsMetricStore(int attemptNumber) {
+            return attempts.get(attemptNumber);
+        }
+
+        public Map<Integer, ComponentMetricStore> getAllAttemptsMetricStores() {
+            return unmodifiableMap(attempts);
+        }
+
+        private static SubtaskMetricStore unmodifiable(SubtaskMetricStore source) {
+            if (source == null) {
+                return null;
+            }
+            return new SubtaskMetricStore(
+                    unmodifiableMap(source.metrics), unmodifiableMap(source.attempts));
+        }
+    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
index 7da9061a090..8ba34a10d85 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
@@ -99,8 +99,11 @@ public class MutableIOMetrics extends IOMetrics {
                 fetcher.update();
                 MetricStore.ComponentMetricStore metrics =
                         fetcher.getMetricStore()
-                                .getSubtaskMetricStore(
-                                        jobID, taskID, attempt.getParallelSubtaskIndex());
+                                .getSubtaskAttemptMetricStore(
+                                        jobID,
+                                        taskID,
+                                        attempt.getParallelSubtaskIndex(),
+                                        attempt.getAttemptNumber());
                 if (metrics != null) {
                     /**
                      * We want to keep track of missing metrics to be able to make a difference
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java
index 790ca43ce7d..609ef1f8e0d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java
@@ -29,6 +29,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -102,4 +104,33 @@ class JobDetailsTest {
 
         assertThat(unmarshalled).isEqualTo(expected);
     }
+
+    @Test
+    void testJobDetailsWithExecutionAttemptsMarshalling() throws JsonProcessingException {
+        Map<String, Map<Integer, Integer>> currentExecutionAttempts = new HashMap<>();
+        currentExecutionAttempts.computeIfAbsent("a", k -> new HashMap<>()).put(1, 2);
+        currentExecutionAttempts.computeIfAbsent("a", k -> new HashMap<>()).put(2, 4);
+        currentExecutionAttempts.computeIfAbsent("b", k -> new HashMap<>()).put(3, 1);
+
+        final JobDetails expected =
+                new JobDetails(
+                        new JobID(),
+                        "foobar",
+                        1L,
+                        10L,
+                        9L,
+                        JobStatus.RUNNING,
+                        8L,
+                        new int[] {1, 3, 3, 4, 7, 4, 2, 7, 3, 3},
+                        42,
+                        currentExecutionAttempts);
+
+        final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
+
+        final JsonNode marshalled = objectMapper.valueToTree(expected);
+
+        final JobDetails unmarshalled = objectMapper.treeToValue(marshalled, JobDetails.class);
+
+        assertThat(unmarshalled).isEqualTo(expected);
+    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
index 52eec21ab5d..2876cf1635a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
@@ -159,11 +159,12 @@ class MetricDumpSerializerTest {
         gauges.put(
                 g1,
                 new Tuple2<QueryScopeInfo, String>(
-                        new QueryScopeInfo.TaskQueryScopeInfo("jid", "vid", 2, "D"), "g1"));
+                        new QueryScopeInfo.TaskQueryScopeInfo("jid", "vid", 2, 0, "D"), "g1"));
         histograms.put(
                 h1,
                 new Tuple2<QueryScopeInfo, String>(
-                        new QueryScopeInfo.OperatorQueryScopeInfo("jid", "vid", 2, "opname", "E"),
+                        new QueryScopeInfo.OperatorQueryScopeInfo(
+                                "jid", "vid", 2, 0, "opname", "E"),
                         "h1"));
 
         MetricDumpSerialization.MetricSerializationResult serialized =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
index 0355d01fcf0..7380cd004b8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
@@ -94,7 +94,7 @@ class QueryScopeInfoTest {
     @Test
     void testTaskQueryScopeInfo() {
         QueryScopeInfo.TaskQueryScopeInfo info =
-                new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 2);
+                new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 2, 0);
         assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_TASK);
         assertThat(info.scope).isEmpty();
         assertThat(info.jobID).isEqualTo("jobid");
@@ -108,7 +108,7 @@ class QueryScopeInfoTest {
         assertThat(info.vertexID).isEqualTo("taskid");
         assertThat(info.subtaskIndex).isEqualTo(2);
 
-        info = new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 2, "hello");
+        info = new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 2, 0, "hello");
         assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_TASK);
         assertThat(info.scope).isEqualTo("hello");
         assertThat(info.jobID).isEqualTo("jobid");
@@ -126,7 +126,7 @@ class QueryScopeInfoTest {
     @Test
     void testOperatorQueryScopeInfo() {
         QueryScopeInfo.OperatorQueryScopeInfo info =
-                new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 2, "opname");
+                new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 2, 0, "opname");
         assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_OPERATOR);
         assertThat(info.scope).isEmpty();
         assertThat(info.jobID).isEqualTo("jobid");
@@ -142,7 +142,9 @@ class QueryScopeInfoTest {
         assertThat(info.operatorName).isEqualTo("opname");
         assertThat(info.subtaskIndex).isEqualTo(2);
 
-        info = new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 2, "opname", "hello");
+        info =
+                new QueryScopeInfo.OperatorQueryScopeInfo(
+                        "jobid", "taskid", 2, 0, "opname", "hello");
         assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_OPERATOR);
         assertThat(info.scope).isEqualTo("hello");
         assertThat(info.jobID).isEqualTo("jobid");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
index 77cd5047d3e..85bf3a44cd5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
@@ -77,6 +77,7 @@ class JobVertexBackPressureHandlerTest {
                 new TaskQueryScopeInfo(
                         TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(),
                         TEST_JOB_VERTEX_ID.toString(),
+                        0,
                         0);
         dumps.add(new GaugeDump(task0, MetricNames.TASK_BACK_PRESSURED_TIME, "1000"));
         dumps.add(new GaugeDump(task0, MetricNames.TASK_IDLE_TIME, "0"));
@@ -86,7 +87,8 @@ class JobVertexBackPressureHandlerTest {
                 new TaskQueryScopeInfo(
                         TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(),
                         TEST_JOB_VERTEX_ID.toString(),
-                        1);
+                        1,
+                        0);
         dumps.add(new GaugeDump(task1, MetricNames.TASK_BACK_PRESSURED_TIME, "500"));
         dumps.add(new GaugeDump(task1, MetricNames.TASK_IDLE_TIME, "100"));
         dumps.add(new GaugeDump(task1, MetricNames.TASK_BUSY_TIME, "900"));
@@ -97,7 +99,8 @@ class JobVertexBackPressureHandlerTest {
                 new TaskQueryScopeInfo(
                         TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(),
                         TEST_JOB_VERTEX_ID.toString(),
-                        3);
+                        3,
+                        0);
         dumps.add(new GaugeDump(task3, MetricNames.TASK_BACK_PRESSURED_TIME, "100"));
         dumps.add(new GaugeDump(task3, MetricNames.TASK_IDLE_TIME, "200"));
         dumps.add(new GaugeDump(task3, MetricNames.TASK_BUSY_TIME, "700"));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java
index b04020ab411..ed13c7da040 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java
@@ -65,19 +65,19 @@ public class AggregatingSubtasksMetricsHandlerTest
         Collection<MetricDump> dumps = new ArrayList<>(3);
         QueryScopeInfo.TaskQueryScopeInfo task1 =
                 new QueryScopeInfo.TaskQueryScopeInfo(
-                        JOB_ID.toString(), TASK_ID.toString(), 1, "abc");
+                        JOB_ID.toString(), TASK_ID.toString(), 1, 0, "abc");
         MetricDump.CounterDump cd1 = new MetricDump.CounterDump(task1, "metric1", 1);
         dumps.add(cd1);
 
         QueryScopeInfo.TaskQueryScopeInfo task2 =
                 new QueryScopeInfo.TaskQueryScopeInfo(
-                        JOB_ID.toString(), TASK_ID.toString(), 2, "abc");
+                        JOB_ID.toString(), TASK_ID.toString(), 2, 0, "abc");
         MetricDump.CounterDump cd2 = new MetricDump.CounterDump(task2, "metric1", 3);
         dumps.add(cd2);
 
         QueryScopeInfo.TaskQueryScopeInfo task3 =
                 new QueryScopeInfo.TaskQueryScopeInfo(
-                        JOB_ID.toString(), TASK_ID.toString(), 3, "abc");
+                        JOB_ID.toString(), TASK_ID.toString(), 3, 0, "abc");
         MetricDump.CounterDump cd3 = new MetricDump.CounterDump(task3, "metric2", 5);
         dumps.add(cd3);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandlerTest.java
index 66ea8c22c00..b48dbbac38f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandlerTest.java
@@ -34,6 +34,8 @@ public class JobVertexMetricsHandlerTest extends MetricsHandlerTestBase<JobVerte
 
     private static final int TEST_SUBTASK_INDEX = 1;
 
+    private static final int TEST_ATTEMPT_NUMBER = 0;
+
     @Override
     JobVertexMetricsHandler getMetricsHandler() {
         return new JobVertexMetricsHandler(
@@ -43,7 +45,7 @@ public class JobVertexMetricsHandlerTest extends MetricsHandlerTestBase<JobVerte
     @Override
     QueryScopeInfo getQueryScopeInfo() {
         return new QueryScopeInfo.TaskQueryScopeInfo(
-                TEST_JOB_ID, TEST_VERTEX_ID, TEST_SUBTASK_INDEX);
+                TEST_JOB_ID, TEST_VERTEX_ID, TEST_SUBTASK_INDEX, TEST_ATTEMPT_NUMBER);
     }
 
     @Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandlerTest.java
index f59569c1d12..30559e56aee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandlerTest.java
@@ -37,6 +37,8 @@ public class SubtaskMetricsHandlerTest extends MetricsHandlerTestBase<SubtaskMet
 
     private static final int TEST_SUBTASK_INDEX = 0;
 
+    private static final int TEST_ATTEMPT_NUMBER = 0;
+
     @Override
     SubtaskMetricsHandler getMetricsHandler() {
         return new SubtaskMetricsHandler(leaderRetriever, TIMEOUT, TEST_HEADERS, mockMetricFetcher);
@@ -45,7 +47,7 @@ public class SubtaskMetricsHandlerTest extends MetricsHandlerTestBase<SubtaskMet
     @Override
     QueryScopeInfo getQueryScopeInfo() {
         return new QueryScopeInfo.TaskQueryScopeInfo(
-                TEST_JOB_ID, TEST_VERTEX_ID, TEST_SUBTASK_INDEX);
+                TEST_JOB_ID, TEST_VERTEX_ID, TEST_SUBTASK_INDEX, TEST_ATTEMPT_NUMBER);
     }
 
     @Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
index 9dc014d8b50..a7259988672 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
@@ -177,12 +177,13 @@ public class MetricFetcherTest extends TestLogger {
                 c1,
                 new Tuple2<>(
                         new QueryScopeInfo.OperatorQueryScopeInfo(
-                                jobID.toString(), "taskid", 2, "opname", "abc"),
+                                jobID.toString(), "taskid", 2, 0, "opname", "abc"),
                         "oc"));
         counters.put(
                 c2,
                 new Tuple2<>(
-                        new QueryScopeInfo.TaskQueryScopeInfo(jobID.toString(), "taskid", 2, "abc"),
+                        new QueryScopeInfo.TaskQueryScopeInfo(
+                                jobID.toString(), "taskid", 2, 0, "abc"),
                         "tc"));
         meters.put(
                 new Meter() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
index 97c739df224..c54f50291fc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
@@ -24,6 +24,9 @@ import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -41,7 +44,18 @@ class MetricStoreTest {
         assertThat(store.getJobMetricStore("jobid").getMetric("abc.metric4", "-1")).isEqualTo("3");
 
         assertThat(store.getTaskMetricStore("jobid", "taskid").getMetric("8.abc.metric5", "-1"))
+                .isEqualTo("14");
+        assertThat(store.getSubtaskMetricStore("jobid", "taskid", 8).getMetric("abc.metric5", "-1"))
+                .isEqualTo("14");
+        assertThat(
+                        store.getSubtaskAttemptMetricStore("jobid", "taskid", 8, 1)
+                                .getMetric("abc.metric5", "-1"))
                 .isEqualTo("4");
+        assertThat(
+                        store.getSubtaskAttemptMetricStore("jobid", "taskid", 8, 2)
+                                .getMetric("abc.metric5", "-1"))
+                .isEqualTo("14");
+
         assertThat(
                         store.getTaskMetricStore("jobid", "taskid")
                                 .getMetric("8.opname.abc.metric6", "-1"))
@@ -50,6 +64,27 @@ class MetricStoreTest {
                         store.getTaskMetricStore("jobid", "taskid")
                                 .getMetric("8.opname.abc.metric7", "-1"))
                 .isEqualTo("6");
+        assertThat(
+                        store.getTaskMetricStore("jobid", "taskid")
+                                .getMetric("1.opname.abc.metric7", "-1"))
+                .isEqualTo("6");
+        assertThat(
+                        store.getSubtaskMetricStore("jobid", "taskid", 1)
+                                .getMetric("opname.abc.metric7", "-1"))
+                .isEqualTo("6");
+        assertThat(store.getSubtaskAttemptMetricStore("jobid", "taskid", 1, 2)).isNull();
+        assertThat(
+                        store.getSubtaskAttemptMetricStore("jobid", "taskid", 1, 3)
+                                .getMetric("opname.abc.metric7", "-1"))
+                .isEqualTo("6");
+        assertThat(
+                        store.getSubtaskAttemptMetricStore("jobid", "taskid", 8, 2)
+                                .getMetric("opname.abc.metric7", "-1"))
+                .isEqualTo("6");
+        assertThat(
+                        store.getSubtaskAttemptMetricStore("jobid", "taskid", 8, 4)
+                                .getMetric("opname.abc.metric7", "-1"))
+                .isEqualTo("16");
     }
 
     @Test
@@ -72,6 +107,11 @@ class MetricStoreTest {
     }
 
     static MetricStore setupStore(MetricStore store) {
+        Map<Integer, Integer> currentExecutionAttempts = new HashMap<>();
+        currentExecutionAttempts.put(8, 2);
+        store.getCurrentExecutionAttempts()
+                .put("jobid", Collections.singletonMap("taskid", currentExecutionAttempts));
+
         QueryScopeInfo.JobManagerQueryScopeInfo jm =
                 new QueryScopeInfo.JobManagerQueryScopeInfo("abc");
         MetricDump.CounterDump cd1 = new MetricDump.CounterDump(jm, "metric1", 0);
@@ -96,19 +136,30 @@ class MetricStoreTest {
         MetricDump.CounterDump cd42 = new MetricDump.CounterDump(job2, "metric4", 3);
 
         QueryScopeInfo.TaskQueryScopeInfo task =
-                new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 8, "abc");
+                new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 8, 1, "abc");
         MetricDump.CounterDump cd5 = new MetricDump.CounterDump(task, "metric5", 4);
 
+        QueryScopeInfo.TaskQueryScopeInfo speculativeTask =
+                new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 8, 2, "abc");
+        MetricDump.CounterDump cd52 = new MetricDump.CounterDump(speculativeTask, "metric5", 14);
+
         QueryScopeInfo.OperatorQueryScopeInfo operator =
-                new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 8, "opname", "abc");
+                new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 8, 2, "opname", "abc");
         MetricDump.CounterDump cd6 = new MetricDump.CounterDump(operator, "metric6", 5);
         MetricDump.CounterDump cd7 = new MetricDump.CounterDump(operator, "metric7", 6);
 
         QueryScopeInfo.OperatorQueryScopeInfo operator2 =
-                new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 1, "opname", "abc");
+                new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 1, 3, "opname", "abc");
         MetricDump.CounterDump cd62 = new MetricDump.CounterDump(operator2, "metric6", 5);
         MetricDump.CounterDump cd72 = new MetricDump.CounterDump(operator2, "metric7", 6);
 
+        QueryScopeInfo.OperatorQueryScopeInfo speculativeOperator2 =
+                new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 8, 4, "opname", "abc");
+        MetricDump.CounterDump cd63 =
+                new MetricDump.CounterDump(speculativeOperator2, "metric6", 15);
+        MetricDump.CounterDump cd73 =
+                new MetricDump.CounterDump(speculativeOperator2, "metric7", 16);
+
         store.add(cd1);
         store.add(cd2);
         store.add(cd2a);
@@ -125,6 +176,10 @@ class MetricStoreTest {
         store.add(cd32);
         store.add(cd42);
 
+        store.add(cd52);
+        store.add(cd63);
+        store.add(cd73);
+
         return store;
     }
 }