You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/08/01 10:30:53 UTC

[flink] branch master updated (aae0462cb39 -> f436b20429b)

This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from aae0462cb39 [FLINK-24787][docs] Add more details of state latency tracking documentation
     new 247d4263c1a [hotfix][runtime][tests] Migrates some tests to Junit5
     new 53938680abe [FLINK-28588][rest] Add blocked flag in TaskManagerInfo and TaskManagerDetailsInfo
     new 0263b55288b [FLINK-28588][rest] Add blocked task manager count and blocked slot count in ResourceOverview and ClusterOverview
     new 44c00dbb4a0 [FLINK-28588][rest] Archive all current executions in ArchivedExecutionVertex.
     new 1a48fd53bd3 [FLINK-28588][rest] MetricStore supports to store and query metrics of multiple execution attempts of a subtask.
     new f436b20429b [FLINK-28588][rest] Acquire information of all current executions in REST handlers if applicable

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../history/HistoryServerArchiveFetcher.java       |   3 +-
 .../src/test/resources/rest_api_v1.snapshot        |  43 +++
 .../executiongraph/AccessExecutionVertex.java      |   9 +
 .../executiongraph/ArchivedExecutionVertex.java    |  26 +-
 .../ArchivedSpeculativeExecutionVertex.java        |  52 ---
 .../executiongraph/SpeculativeExecutionVertex.java |   4 +-
 .../messages/webmonitor/ClusterOverview.java       |  52 ++-
 .../runtime/messages/webmonitor/JobDetails.java    |  98 +++++-
 .../metrics/dump/MetricDumpSerialization.java      |  10 +-
 .../flink/runtime/metrics/dump/QueryScopeInfo.java |  28 +-
 .../groups/InternalOperatorMetricGroup.java        |   1 +
 .../runtime/metrics/groups/TaskMetricGroup.java    |   5 +-
 .../runtime/resourcemanager/ResourceManager.java   |  33 +-
 .../runtime/resourcemanager/ResourceOverview.java  |  18 +-
 .../handler/job/AbstractSubtaskAttemptHandler.java |  15 +-
 .../rest/handler/job/JobDetailsHandler.java        |   2 +
 .../rest/handler/job/JobExceptionsHandler.java     |  33 +-
 .../handler/job/JobVertexBackPressureHandler.java  |  87 ++++-
 .../rest/handler/job/JobVertexDetailsHandler.java  |  17 +-
 .../handler/job/JobVertexTaskManagersHandler.java  |  73 +++--
 .../job/SubtaskCurrentAttemptDetailsHandler.java   |  19 +-
 ...SubtaskExecutionAttemptAccumulatorsHandler.java |  38 +--
 .../job/SubtaskExecutionAttemptDetailsHandler.java |  52 +--
 .../job/SubtasksAllAccumulatorsHandler.java        |  32 +-
 .../rest/handler/job/SubtasksTimesHandler.java     |   3 +-
 .../messages/ClusterOverviewWithVersion.java       |  26 +-
 .../handler/legacy/metrics/MetricFetcherImpl.java  |   1 +
 .../rest/handler/legacy/metrics/MetricStore.java   | 186 +++++++++--
 .../rest/handler/util/MutableIOMetrics.java        |   7 +-
 .../rest/messages/JobVertexBackPressureInfo.java   |  45 ++-
 .../job/SubtaskExecutionAttemptDetailsInfo.java    | 105 +++---
 .../taskmanager/TaskManagerDetailsInfo.java        |   7 +-
 .../rest/messages/taskmanager/TaskManagerInfo.java |  32 +-
 .../threadinfo/JobVertexThreadInfoTracker.java     |  24 +-
 .../executiongraph/ArchivedExecutionGraphTest.java | 356 +++++----------------
 .../ArchivedExecutionGraphTestUtils.java           | 191 +++++++++++
 ...ecutionVertexWithSpeculativeExecutionTest.java} | 185 +++--------
 .../messages/webmonitor/JobDetailsTest.java        |  46 ++-
 .../metrics/dump/MetricDumpSerializerTest.java     | 140 ++++----
 .../runtime/metrics/dump/QueryScopeInfoTest.java   | 182 +++++------
 .../resourcemanager/ResourceManagerTest.java       |  96 ++++++
 .../utils/TestingResourceManagerGateway.java       |   2 +-
 .../job/JobVertexBackPressureHandlerTest.java      | 279 +++++++++++++---
 .../SubtaskCurrentAttemptDetailsHandlerTest.java   |   3 +-
 .../SubtaskExecutionAttemptDetailsHandlerTest.java |   3 +-
 .../AggregatingSubtasksMetricsHandlerTest.java     |   6 +-
 .../job/metrics/JobVertexMetricsHandlerTest.java   |   4 +-
 .../job/metrics/SubtaskMetricsHandlerTest.java     |   4 +-
 .../messages/ClusterOverviewWithVersionTest.java   |   2 +-
 .../handler/legacy/metrics/MetricFetcherTest.java  |   5 +-
 .../handler/legacy/metrics/MetricStoreTest.java    | 111 +++++--
 .../taskmanager/TaskManagerDetailsHandlerTest.java |  21 +-
 .../messages/AggregatedTaskDetailsInfoTest.java    |   3 +-
 .../messages/JobVertexBackPressureInfoTest.java    |  24 +-
 .../rest/messages/JobVertexDetailsInfoTest.java    |  24 +-
 .../SubtaskExecutionAttemptDetailsInfoTest.java    |   3 +-
 .../messages/taskmanager/TaskManagerInfoTest.java  |   3 +-
 .../runtime/webmonitor/TestingRestfulGateway.java  |   2 +-
 58 files changed, 1879 insertions(+), 1002 deletions(-)
 delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedSpeculativeExecutionVertex.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTestUtils.java
 copy flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/{SpeculativeExecutionVertexTest.java => ArchivedExecutionVertexWithSpeculativeExecutionTest.java} (52%)


[flink] 02/06: [FLINK-28588][rest] Add blocked flag in TaskManagerInfo and TaskManagerDetailsInfo

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 53938680abe846e1947243809b4b43d5f67610d3
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Tue Jul 26 16:06:10 2022 +0800

    [FLINK-28588][rest] Add blocked flag in TaskManagerInfo and TaskManagerDetailsInfo
---
 .../src/test/resources/rest_api_v1.snapshot        |  6 ++++
 .../runtime/resourcemanager/ResourceManager.java   |  7 +++--
 .../taskmanager/TaskManagerDetailsInfo.java        |  7 ++++-
 .../rest/messages/taskmanager/TaskManagerInfo.java | 32 ++++++++++++++++++----
 .../taskmanager/TaskManagerDetailsHandlerTest.java |  3 +-
 .../messages/taskmanager/TaskManagerInfoTest.java  |  3 +-
 6 files changed, 48 insertions(+), 10 deletions(-)

diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index f0e069087af..427f59c2723 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -3439,6 +3439,9 @@
                     "type" : "integer"
                   }
                 }
+              },
+              "blocked" : {
+                "type" : "boolean"
               }
             }
           }
@@ -3597,6 +3600,9 @@
             }
           }
         },
+        "blocked" : {
+          "type" : "boolean"
+        },
         "allocatedSlots" : {
           "type" : "array",
           "items" : {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 9a9a17b4cbc..fb85cf88231 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -646,7 +646,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
                             slotManager.getRegisteredResourceOf(taskExecutor.getInstanceID()),
                             slotManager.getFreeResourceOf(taskExecutor.getInstanceID()),
                             taskExecutor.getHardwareDescription(),
-                            taskExecutor.getMemoryConfiguration()));
+                            taskExecutor.getMemoryConfiguration(),
+                            blocklistHandler.isBlockedTaskManager(taskExecutor.getResourceID())));
         }
 
         return CompletableFuture.completedFuture(taskManagerInfos);
@@ -675,7 +676,9 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
                                     slotManager.getRegisteredResourceOf(instanceId),
                                     slotManager.getFreeResourceOf(instanceId),
                                     taskExecutor.getHardwareDescription(),
-                                    taskExecutor.getMemoryConfiguration()),
+                                    taskExecutor.getMemoryConfiguration(),
+                                    blocklistHandler.isBlockedTaskManager(
+                                            taskExecutor.getResourceID())),
                             slotManager.getAllocatedSlotsOf(instanceId));
 
             return CompletableFuture.completedFuture(taskManagerInfoWithSlots);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
index 657dfe7af8c..c9df8325e10 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
@@ -33,6 +33,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgn
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 
+import javax.annotation.Nullable;
+
 import java.util.Collection;
 import java.util.Objects;
 
@@ -64,6 +66,7 @@ public class TaskManagerDetailsInfo extends TaskManagerInfo {
             @JsonProperty(FIELD_NAME_AVAILABLE_RESOURCE) ResourceProfileInfo freeResource,
             @JsonProperty(FIELD_NAME_HARDWARE) HardwareDescription hardwareDescription,
             @JsonProperty(FIELD_NAME_MEMORY) TaskExecutorMemoryConfiguration memoryConfiguration,
+            @JsonProperty(FIELD_NAME_BLOCKED) @Nullable Boolean blocked,
             @JsonProperty(FIELD_NAME_ALLOCATED_SLOTS) Collection<SlotInfo> allocatedSlots,
             @JsonProperty(FIELD_NAME_METRICS) TaskManagerMetricsInfo taskManagerMetrics) {
         super(
@@ -77,7 +80,8 @@ public class TaskManagerDetailsInfo extends TaskManagerInfo {
                 totalResource,
                 freeResource,
                 hardwareDescription,
-                memoryConfiguration);
+                memoryConfiguration,
+                blocked);
 
         this.taskManagerMetrics = Preconditions.checkNotNull(taskManagerMetrics);
         this.allocatedSlots = Preconditions.checkNotNull(allocatedSlots);
@@ -98,6 +102,7 @@ public class TaskManagerDetailsInfo extends TaskManagerInfo {
                 taskManagerInfoWithSlots.getTaskManagerInfo().getFreeResource(),
                 taskManagerInfoWithSlots.getTaskManagerInfo().getHardwareDescription(),
                 taskManagerInfoWithSlots.getTaskManagerInfo().getMemoryConfiguration(),
+                taskManagerInfoWithSlots.getTaskManagerInfo().getBlocked(),
                 taskManagerInfoWithSlots.getAllocatedSlots(),
                 taskManagerMetrics);
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
index b8d93ccd08d..308e49b504d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
@@ -31,10 +31,14 @@ import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 import java.util.Objects;
 
@@ -63,6 +67,8 @@ public class TaskManagerInfo implements ResponseBody, Serializable {
 
     public static final String FIELD_NAME_MEMORY = "memoryConfiguration";
 
+    public static final String FIELD_NAME_BLOCKED = "blocked";
+
     private static final long serialVersionUID = 1L;
 
     @JsonProperty(FIELD_NAME_RESOURCE_ID)
@@ -99,7 +105,12 @@ public class TaskManagerInfo implements ResponseBody, Serializable {
     @JsonProperty(FIELD_NAME_MEMORY)
     private final TaskExecutorMemoryConfiguration memoryConfiguration;
 
+    @JsonProperty(FIELD_NAME_BLOCKED)
+    @JsonInclude(Include.NON_DEFAULT)
+    private final boolean blocked;
+
     @JsonCreator
+    // blocked is Nullable since Jackson will assign null if the field is absent while parsing
     public TaskManagerInfo(
             @JsonDeserialize(using = ResourceIDDeserializer.class)
                     @JsonProperty(FIELD_NAME_RESOURCE_ID)
@@ -113,7 +124,8 @@ public class TaskManagerInfo implements ResponseBody, Serializable {
             @JsonProperty(FIELD_NAME_TOTAL_RESOURCE) ResourceProfileInfo totalResource,
             @JsonProperty(FIELD_NAME_AVAILABLE_RESOURCE) ResourceProfileInfo freeResource,
             @JsonProperty(FIELD_NAME_HARDWARE) HardwareDescription hardwareDescription,
-            @JsonProperty(FIELD_NAME_MEMORY) TaskExecutorMemoryConfiguration memoryConfiguration) {
+            @JsonProperty(FIELD_NAME_MEMORY) TaskExecutorMemoryConfiguration memoryConfiguration,
+            @JsonProperty(FIELD_NAME_BLOCKED) @Nullable Boolean blocked) {
         this.resourceId = Preconditions.checkNotNull(resourceId);
         this.address = Preconditions.checkNotNull(address);
         this.dataPort = dataPort;
@@ -125,6 +137,7 @@ public class TaskManagerInfo implements ResponseBody, Serializable {
         this.freeResource = freeResource;
         this.hardwareDescription = Preconditions.checkNotNull(hardwareDescription);
         this.memoryConfiguration = Preconditions.checkNotNull(memoryConfiguration);
+        this.blocked = (blocked != null) && blocked;
     }
 
     public TaskManagerInfo(
@@ -138,7 +151,8 @@ public class TaskManagerInfo implements ResponseBody, Serializable {
             ResourceProfile totalResource,
             ResourceProfile freeResource,
             HardwareDescription hardwareDescription,
-            TaskExecutorMemoryConfiguration memoryConfiguration) {
+            TaskExecutorMemoryConfiguration memoryConfiguration,
+            @Nullable Boolean blocked) {
         this(
                 resourceId,
                 address,
@@ -150,7 +164,8 @@ public class TaskManagerInfo implements ResponseBody, Serializable {
                 ResourceProfileInfo.fromResrouceProfile(totalResource),
                 ResourceProfileInfo.fromResrouceProfile(freeResource),
                 hardwareDescription,
-                memoryConfiguration);
+                memoryConfiguration,
+                blocked);
     }
 
     @JsonIgnore
@@ -208,6 +223,11 @@ public class TaskManagerInfo implements ResponseBody, Serializable {
         return memoryConfiguration;
     }
 
+    @JsonIgnore
+    public boolean getBlocked() {
+        return blocked;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -227,7 +247,8 @@ public class TaskManagerInfo implements ResponseBody, Serializable {
                 && Objects.equals(resourceId, that.resourceId)
                 && Objects.equals(address, that.address)
                 && Objects.equals(hardwareDescription, that.hardwareDescription)
-                && Objects.equals(memoryConfiguration, that.memoryConfiguration);
+                && Objects.equals(memoryConfiguration, that.memoryConfiguration)
+                && blocked == that.blocked;
     }
 
     @Override
@@ -243,6 +264,7 @@ public class TaskManagerInfo implements ResponseBody, Serializable {
                 totalResource,
                 freeResource,
                 hardwareDescription,
-                memoryConfiguration);
+                memoryConfiguration,
+                blocked);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
index 95a64901af9..5e2e14864d8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
@@ -172,7 +172,8 @@ class TaskManagerDetailsHandlerTest {
                 ResourceProfile.ZERO,
                 ResourceProfile.ZERO,
                 new HardwareDescription(0, 0L, 0L, 0L),
-                new TaskExecutorMemoryConfiguration(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L));
+                new TaskExecutorMemoryConfiguration(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L),
+                false);
     }
 
     private static HandlerRequest<EmptyRequestBody> createRequest() throws HandlerRequestException {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java
index 7231c94070b..140134ee953 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfoTest.java
@@ -65,6 +65,7 @@ public class TaskManagerInfoTest extends RestResponseMarshallingTestBase<TaskMan
                         random.nextLong(),
                         random.nextLong(),
                         random.nextLong(),
-                        random.nextLong()));
+                        random.nextLong()),
+                random.nextBoolean());
     }
 }


[flink] 01/06: [hotfix][runtime][tests] Migrates some tests to Junit5

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 247d4263c1a484ba12ce5c7826ff938b83285d11
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Tue Jul 26 15:56:13 2022 +0800

    [hotfix][runtime][tests] Migrates some tests to Junit5
---
 .../executiongraph/ArchivedExecutionGraphTest.java | 356 +++++----------------
 .../ArchivedExecutionGraphTestUtils.java           | 170 ++++++++++
 .../messages/webmonitor/JobDetailsTest.java        |  15 +-
 .../metrics/dump/MetricDumpSerializerTest.java     | 135 ++++----
 .../runtime/metrics/dump/QueryScopeInfoTest.java   | 168 +++++-----
 .../job/JobVertexBackPressureHandlerTest.java      |  67 ++--
 .../handler/legacy/metrics/MetricStoreTest.java    |  50 +--
 .../taskmanager/TaskManagerDetailsHandlerTest.java |  18 +-
 8 files changed, 475 insertions(+), 504 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index b6e9cefc2b4..4427c5b3042 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummarySnapshot;
@@ -42,44 +41,34 @@ import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
-import org.apache.flink.util.OptionalFailure;
-import org.apache.flink.util.SerializedValue;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
 
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Function;
 
 import static java.util.Arrays.asList;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link ArchivedExecutionGraph}. */
-public class ArchivedExecutionGraphTest extends TestLogger {
+public class ArchivedExecutionGraphTest {
 
-    @ClassRule
-    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
-            TestingUtils.defaultExecutorResource();
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
 
     private static ExecutionGraph runtimeGraph;
 
-    @BeforeClass
-    public static void setupExecutionGraph() throws Exception {
+    @BeforeAll
+    static void setupExecutionGraph() throws Exception {
         // -------------------------------------------------------------------------------------------------------------
         // Setup
         // -------------------------------------------------------------------------------------------------------------
@@ -146,21 +135,21 @@ public class ArchivedExecutionGraphTest extends TestLogger {
     }
 
     @Test
-    public void testArchive() throws IOException, ClassNotFoundException {
+    void testArchive() throws IOException, ClassNotFoundException {
         ArchivedExecutionGraph archivedGraph = ArchivedExecutionGraph.createFrom(runtimeGraph);
 
         compareExecutionGraph(runtimeGraph, archivedGraph);
     }
 
     @Test
-    public void testSerialization() throws IOException, ClassNotFoundException {
+    void testSerialization() throws IOException, ClassNotFoundException {
         ArchivedExecutionGraph archivedGraph = ArchivedExecutionGraph.createFrom(runtimeGraph);
 
         verifySerializability(archivedGraph);
     }
 
     @Test
-    public void testCreateFromInitializingJobForSuspendedJob() {
+    void testCreateFromInitializingJobForSuspendedJob() {
         final ArchivedExecutionGraph suspendedExecutionGraph =
                 ArchivedExecutionGraph.createSparseArchivedExecutionGraph(
                         new JobID(),
@@ -170,12 +159,12 @@ public class ArchivedExecutionGraphTest extends TestLogger {
                         null,
                         System.currentTimeMillis());
 
-        assertThat(suspendedExecutionGraph.getState(), is(JobStatus.SUSPENDED));
-        assertThat(suspendedExecutionGraph.getFailureInfo(), notNullValue());
+        assertThat(suspendedExecutionGraph.getState()).isEqualTo(JobStatus.SUSPENDED);
+        assertThat(suspendedExecutionGraph.getFailureInfo()).isNotNull();
     }
 
     @Test
-    public void testCheckpointSettingsArchiving() {
+    void testCheckpointSettingsArchiving() {
         final CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration =
                 CheckpointCoordinatorConfiguration.builder().build();
 
@@ -192,19 +181,19 @@ public class ArchivedExecutionGraphTest extends TestLogger {
     }
 
     public static void assertContainsCheckpointSettings(ArchivedExecutionGraph archivedGraph) {
-        assertThat(archivedGraph.getCheckpointCoordinatorConfiguration(), notNullValue());
-        assertThat(archivedGraph.getCheckpointStatsSnapshot(), notNullValue());
-        assertThat(archivedGraph.getCheckpointStorageName().get(), is("Unknown"));
-        assertThat(archivedGraph.getStateBackendName().get(), is("Unknown"));
+        assertThat(archivedGraph.getCheckpointCoordinatorConfiguration()).isNotNull();
+        assertThat(archivedGraph.getCheckpointStatsSnapshot()).isNotNull();
+        assertThat(archivedGraph.getCheckpointStorageName().get()).isEqualTo("Unknown");
+        assertThat(archivedGraph.getStateBackendName().get()).isEqualTo("Unknown");
     }
 
     @Test
-    public void testArchiveWithStatusOverride() throws IOException, ClassNotFoundException {
+    void testArchiveWithStatusOverride() throws IOException, ClassNotFoundException {
         ArchivedExecutionGraph archivedGraph =
                 ArchivedExecutionGraph.createFrom(runtimeGraph, JobStatus.RESTARTING);
 
-        assertThat(archivedGraph.getState(), is(JobStatus.RESTARTING));
-        assertThat(archivedGraph.getStatusTimestamp(JobStatus.FAILED), is(0L));
+        assertThat(archivedGraph.getState()).isEqualTo(JobStatus.RESTARTING);
+        assertThat(archivedGraph.getStatusTimestamp(JobStatus.FAILED)).isEqualTo(0L);
     }
 
     private static void compareExecutionGraph(
@@ -213,41 +202,31 @@ public class ArchivedExecutionGraphTest extends TestLogger {
         // -------------------------------------------------------------------------------------------------------------
         // ExecutionGraph
         // -------------------------------------------------------------------------------------------------------------
-        assertEquals(runtimeGraph.getJsonPlan(), archivedGraph.getJsonPlan());
-        assertEquals(runtimeGraph.getJobID(), archivedGraph.getJobID());
-        assertEquals(runtimeGraph.getJobName(), archivedGraph.getJobName());
-        assertEquals(runtimeGraph.getState(), archivedGraph.getState());
-        assertEquals(
-                runtimeGraph.getFailureInfo().getExceptionAsString(),
-                archivedGraph.getFailureInfo().getExceptionAsString());
-        assertEquals(
-                runtimeGraph.getStatusTimestamp(JobStatus.CREATED),
-                archivedGraph.getStatusTimestamp(JobStatus.CREATED));
-        assertEquals(
-                runtimeGraph.getStatusTimestamp(JobStatus.RUNNING),
-                archivedGraph.getStatusTimestamp(JobStatus.RUNNING));
-        assertEquals(
-                runtimeGraph.getStatusTimestamp(JobStatus.FAILING),
-                archivedGraph.getStatusTimestamp(JobStatus.FAILING));
-        assertEquals(
-                runtimeGraph.getStatusTimestamp(JobStatus.FAILED),
-                archivedGraph.getStatusTimestamp(JobStatus.FAILED));
-        assertEquals(
-                runtimeGraph.getStatusTimestamp(JobStatus.CANCELLING),
-                archivedGraph.getStatusTimestamp(JobStatus.CANCELLING));
-        assertEquals(
-                runtimeGraph.getStatusTimestamp(JobStatus.CANCELED),
-                archivedGraph.getStatusTimestamp(JobStatus.CANCELED));
-        assertEquals(
-                runtimeGraph.getStatusTimestamp(JobStatus.FINISHED),
-                archivedGraph.getStatusTimestamp(JobStatus.FINISHED));
-        assertEquals(
-                runtimeGraph.getStatusTimestamp(JobStatus.RESTARTING),
-                archivedGraph.getStatusTimestamp(JobStatus.RESTARTING));
-        assertEquals(
-                runtimeGraph.getStatusTimestamp(JobStatus.SUSPENDED),
-                archivedGraph.getStatusTimestamp(JobStatus.SUSPENDED));
-        assertEquals(runtimeGraph.isStoppable(), archivedGraph.isStoppable());
+        assertThat(runtimeGraph.getJsonPlan()).isEqualTo(archivedGraph.getJsonPlan());
+        assertThat(runtimeGraph.getJobID()).isEqualTo(archivedGraph.getJobID());
+        assertThat(runtimeGraph.getJobName()).isEqualTo(archivedGraph.getJobName());
+        assertThat(runtimeGraph.getState()).isEqualTo(archivedGraph.getState());
+        assertThat(runtimeGraph.getFailureInfo().getExceptionAsString())
+                .isEqualTo(archivedGraph.getFailureInfo().getExceptionAsString());
+        assertThat(runtimeGraph.getStatusTimestamp(JobStatus.CREATED))
+                .isEqualTo(archivedGraph.getStatusTimestamp(JobStatus.CREATED));
+        assertThat(runtimeGraph.getStatusTimestamp(JobStatus.RUNNING))
+                .isEqualTo(archivedGraph.getStatusTimestamp(JobStatus.RUNNING));
+        assertThat(runtimeGraph.getStatusTimestamp(JobStatus.FAILING))
+                .isEqualTo(archivedGraph.getStatusTimestamp(JobStatus.FAILING));
+        assertThat(runtimeGraph.getStatusTimestamp(JobStatus.FAILED))
+                .isEqualTo(archivedGraph.getStatusTimestamp(JobStatus.FAILED));
+        assertThat(runtimeGraph.getStatusTimestamp(JobStatus.CANCELLING))
+                .isEqualTo(archivedGraph.getStatusTimestamp(JobStatus.CANCELLING));
+        assertThat(runtimeGraph.getStatusTimestamp(JobStatus.CANCELED))
+                .isEqualTo(archivedGraph.getStatusTimestamp(JobStatus.CANCELED));
+        assertThat(runtimeGraph.getStatusTimestamp(JobStatus.FINISHED))
+                .isEqualTo(archivedGraph.getStatusTimestamp(JobStatus.FINISHED));
+        assertThat(runtimeGraph.getStatusTimestamp(JobStatus.RESTARTING))
+                .isEqualTo(archivedGraph.getStatusTimestamp(JobStatus.RESTARTING));
+        assertThat(runtimeGraph.getStatusTimestamp(JobStatus.SUSPENDED))
+                .isEqualTo(archivedGraph.getStatusTimestamp(JobStatus.SUSPENDED));
+        assertThat(runtimeGraph.isStoppable()).isEqualTo(archivedGraph.isStoppable());
 
         // -------------------------------------------------------------------------------------------------------------
         // CheckpointStats
@@ -279,19 +258,16 @@ public class ArchivedExecutionGraphTest extends TestLogger {
             StatsSummarySnapshot runtime = meter.apply(runtimeSnapshot.getSummaryStats());
             StatsSummarySnapshot archived = meter.apply(runtimeSnapshot.getSummaryStats());
             for (Function<StatsSummarySnapshot, Object> agg : aggs) {
-                assertEquals(agg.apply(runtime), agg.apply(archived));
+                assertThat(agg.apply(runtime)).isEqualTo(agg.apply(archived));
             }
         }
 
-        assertEquals(
-                runtimeSnapshot.getCounts().getTotalNumberOfCheckpoints(),
-                archivedSnapshot.getCounts().getTotalNumberOfCheckpoints());
-        assertEquals(
-                runtimeSnapshot.getCounts().getNumberOfCompletedCheckpoints(),
-                archivedSnapshot.getCounts().getNumberOfCompletedCheckpoints());
-        assertEquals(
-                runtimeSnapshot.getCounts().getNumberOfInProgressCheckpoints(),
-                archivedSnapshot.getCounts().getNumberOfInProgressCheckpoints());
+        assertThat(runtimeSnapshot.getCounts().getTotalNumberOfCheckpoints())
+                .isEqualTo(archivedSnapshot.getCounts().getTotalNumberOfCheckpoints());
+        assertThat(runtimeSnapshot.getCounts().getNumberOfCompletedCheckpoints())
+                .isEqualTo(archivedSnapshot.getCounts().getNumberOfCompletedCheckpoints());
+        assertThat(runtimeSnapshot.getCounts().getNumberOfInProgressCheckpoints())
+                .isEqualTo(archivedSnapshot.getCounts().getNumberOfInProgressCheckpoints());
 
         // -------------------------------------------------------------------------------------------------------------
         // ArchivedExecutionConfig
@@ -299,24 +275,23 @@ public class ArchivedExecutionGraphTest extends TestLogger {
         ArchivedExecutionConfig runtimeConfig = runtimeGraph.getArchivedExecutionConfig();
         ArchivedExecutionConfig archivedConfig = archivedGraph.getArchivedExecutionConfig();
 
-        assertEquals(runtimeConfig.getExecutionMode(), archivedConfig.getExecutionMode());
-        assertEquals(runtimeConfig.getParallelism(), archivedConfig.getParallelism());
-        assertEquals(runtimeConfig.getObjectReuseEnabled(), archivedConfig.getObjectReuseEnabled());
-        assertEquals(
-                runtimeConfig.getRestartStrategyDescription(),
-                archivedConfig.getRestartStrategyDescription());
-        assertNotNull(archivedConfig.getGlobalJobParameters().get("hello"));
-        assertEquals(
-                runtimeConfig.getGlobalJobParameters().get("hello"),
-                archivedConfig.getGlobalJobParameters().get("hello"));
+        assertThat(runtimeConfig.getExecutionMode()).isEqualTo(archivedConfig.getExecutionMode());
+        assertThat(runtimeConfig.getParallelism()).isEqualTo(archivedConfig.getParallelism());
+        assertThat(runtimeConfig.getObjectReuseEnabled())
+                .isEqualTo(archivedConfig.getObjectReuseEnabled());
+        assertThat(runtimeConfig.getRestartStrategyDescription())
+                .isEqualTo(archivedConfig.getRestartStrategyDescription());
+        assertThat(archivedConfig.getGlobalJobParameters().get("hello")).isNotNull();
+        assertThat(runtimeConfig.getGlobalJobParameters().get("hello"))
+                .isEqualTo(archivedConfig.getGlobalJobParameters().get("hello"));
 
         // -------------------------------------------------------------------------------------------------------------
         // StringifiedAccumulators
         // -------------------------------------------------------------------------------------------------------------
-        compareStringifiedAccumulators(
+        ArchivedExecutionGraphTestUtils.compareStringifiedAccumulators(
                 runtimeGraph.getAccumulatorResultsStringified(),
                 archivedGraph.getAccumulatorResultsStringified());
-        compareSerializedAccumulators(
+        ArchivedExecutionGraphTestUtils.compareSerializedAccumulators(
                 runtimeGraph.getAccumulatorsSerialized(),
                 archivedGraph.getAccumulatorsSerialized());
 
@@ -339,7 +314,7 @@ public class ArchivedExecutionGraphTest extends TestLogger {
                 archivedGraph.getVerticesTopologically().iterator();
 
         while (runtimeTopologicalVertices.hasNext()) {
-            assertTrue(archiveTopologicaldVertices.hasNext());
+            assertThat(archiveTopologicaldVertices.hasNext()).isTrue();
             compareExecutionJobVertex(
                     runtimeTopologicalVertices.next(), archiveTopologicaldVertices.next());
         }
@@ -353,199 +328,32 @@ public class ArchivedExecutionGraphTest extends TestLogger {
                 archivedGraph.getAllExecutionVertices().iterator();
 
         while (runtimeExecutionVertices.hasNext()) {
-            assertTrue(archivedExecutionVertices.hasNext());
-            compareExecutionVertex(
+            assertThat(archivedExecutionVertices.hasNext()).isTrue();
+            ArchivedExecutionGraphTestUtils.compareExecutionVertex(
                     runtimeExecutionVertices.next(), archivedExecutionVertices.next());
         }
     }
 
     private static void compareExecutionJobVertex(
             AccessExecutionJobVertex runtimeJobVertex, AccessExecutionJobVertex archivedJobVertex) {
-        assertEquals(runtimeJobVertex.getName(), archivedJobVertex.getName());
-        assertEquals(runtimeJobVertex.getParallelism(), archivedJobVertex.getParallelism());
-        assertEquals(runtimeJobVertex.getMaxParallelism(), archivedJobVertex.getMaxParallelism());
-        assertEquals(runtimeJobVertex.getJobVertexId(), archivedJobVertex.getJobVertexId());
-        assertEquals(runtimeJobVertex.getAggregateState(), archivedJobVertex.getAggregateState());
-
-        compareStringifiedAccumulators(
+        assertThat(runtimeJobVertex.getName()).isEqualTo(archivedJobVertex.getName());
+        assertThat(runtimeJobVertex.getParallelism()).isEqualTo(archivedJobVertex.getParallelism());
+        assertThat(runtimeJobVertex.getMaxParallelism())
+                .isEqualTo(archivedJobVertex.getMaxParallelism());
+        assertThat(runtimeJobVertex.getJobVertexId()).isEqualTo(archivedJobVertex.getJobVertexId());
+        assertThat(runtimeJobVertex.getAggregateState())
+                .isEqualTo(archivedJobVertex.getAggregateState());
+
+        ArchivedExecutionGraphTestUtils.compareStringifiedAccumulators(
                 runtimeJobVertex.getAggregatedUserAccumulatorsStringified(),
                 archivedJobVertex.getAggregatedUserAccumulatorsStringified());
 
         AccessExecutionVertex[] runtimeExecutionVertices = runtimeJobVertex.getTaskVertices();
         AccessExecutionVertex[] archivedExecutionVertices = archivedJobVertex.getTaskVertices();
-        assertEquals(runtimeExecutionVertices.length, archivedExecutionVertices.length);
+        assertThat(runtimeExecutionVertices.length).isEqualTo(archivedExecutionVertices.length);
         for (int x = 0; x < runtimeExecutionVertices.length; x++) {
-            compareExecutionVertex(runtimeExecutionVertices[x], archivedExecutionVertices[x]);
-        }
-    }
-
-    private static void compareExecutionVertex(
-            AccessExecutionVertex runtimeVertex, AccessExecutionVertex archivedVertex) {
-        assertEquals(
-                runtimeVertex.getTaskNameWithSubtaskIndex(),
-                archivedVertex.getTaskNameWithSubtaskIndex());
-        assertEquals(
-                runtimeVertex.getParallelSubtaskIndex(), archivedVertex.getParallelSubtaskIndex());
-        assertEquals(runtimeVertex.getExecutionState(), archivedVertex.getExecutionState());
-        assertEquals(
-                runtimeVertex.getStateTimestamp(ExecutionState.CREATED),
-                archivedVertex.getStateTimestamp(ExecutionState.CREATED));
-        assertEquals(
-                runtimeVertex.getStateTimestamp(ExecutionState.SCHEDULED),
-                archivedVertex.getStateTimestamp(ExecutionState.SCHEDULED));
-        assertEquals(
-                runtimeVertex.getStateTimestamp(ExecutionState.DEPLOYING),
-                archivedVertex.getStateTimestamp(ExecutionState.DEPLOYING));
-        assertEquals(
-                runtimeVertex.getStateTimestamp(ExecutionState.INITIALIZING),
-                archivedVertex.getStateTimestamp(ExecutionState.INITIALIZING));
-        assertEquals(
-                runtimeVertex.getStateTimestamp(ExecutionState.RUNNING),
-                archivedVertex.getStateTimestamp(ExecutionState.RUNNING));
-        assertEquals(
-                runtimeVertex.getStateTimestamp(ExecutionState.FINISHED),
-                archivedVertex.getStateTimestamp(ExecutionState.FINISHED));
-        assertEquals(
-                runtimeVertex.getStateTimestamp(ExecutionState.CANCELING),
-                archivedVertex.getStateTimestamp(ExecutionState.CANCELING));
-        assertEquals(
-                runtimeVertex.getStateTimestamp(ExecutionState.CANCELED),
-                archivedVertex.getStateTimestamp(ExecutionState.CANCELED));
-        assertEquals(
-                runtimeVertex.getStateTimestamp(ExecutionState.FAILED),
-                archivedVertex.getStateTimestamp(ExecutionState.FAILED));
-        assertThat(
-                runtimeVertex.getFailureInfo().map(ErrorInfo::getExceptionAsString),
-                is(archivedVertex.getFailureInfo().map(ErrorInfo::getExceptionAsString)));
-        assertThat(
-                runtimeVertex.getFailureInfo().map(ErrorInfo::getTimestamp),
-                is(archivedVertex.getFailureInfo().map(ErrorInfo::getTimestamp)));
-        assertEquals(
-                runtimeVertex.getCurrentAssignedResourceLocation(),
-                archivedVertex.getCurrentAssignedResourceLocation());
-
-        compareExecution(
-                runtimeVertex.getCurrentExecutionAttempt(),
-                archivedVertex.getCurrentExecutionAttempt());
-    }
-
-    private static void compareExecution(
-            AccessExecution runtimeExecution, AccessExecution archivedExecution) {
-        assertEquals(runtimeExecution.getAttemptId(), archivedExecution.getAttemptId());
-        assertEquals(runtimeExecution.getAttemptNumber(), archivedExecution.getAttemptNumber());
-        assertArrayEquals(
-                runtimeExecution.getStateTimestamps(), archivedExecution.getStateTimestamps());
-        assertArrayEquals(
-                runtimeExecution.getStateEndTimestamps(),
-                archivedExecution.getStateEndTimestamps());
-        assertEquals(runtimeExecution.getState(), archivedExecution.getState());
-        assertEquals(
-                runtimeExecution.getAssignedResourceLocation(),
-                archivedExecution.getAssignedResourceLocation());
-        assertThat(
-                runtimeExecution.getFailureInfo().map(ErrorInfo::getExceptionAsString),
-                is(archivedExecution.getFailureInfo().map(ErrorInfo::getExceptionAsString)));
-        assertThat(
-                runtimeExecution.getFailureInfo().map(ErrorInfo::getTimestamp),
-                is(archivedExecution.getFailureInfo().map(ErrorInfo::getTimestamp)));
-        assertEquals(
-                runtimeExecution.getStateTimestamp(ExecutionState.CREATED),
-                archivedExecution.getStateTimestamp(ExecutionState.CREATED));
-        assertEquals(
-                runtimeExecution.getStateTimestamp(ExecutionState.SCHEDULED),
-                archivedExecution.getStateTimestamp(ExecutionState.SCHEDULED));
-        assertEquals(
-                runtimeExecution.getStateTimestamp(ExecutionState.DEPLOYING),
-                archivedExecution.getStateTimestamp(ExecutionState.DEPLOYING));
-        assertEquals(
-                runtimeExecution.getStateTimestamp(ExecutionState.INITIALIZING),
-                archivedExecution.getStateTimestamp(ExecutionState.INITIALIZING));
-        assertEquals(
-                runtimeExecution.getStateTimestamp(ExecutionState.RUNNING),
-                archivedExecution.getStateTimestamp(ExecutionState.RUNNING));
-        assertEquals(
-                runtimeExecution.getStateTimestamp(ExecutionState.FINISHED),
-                archivedExecution.getStateTimestamp(ExecutionState.FINISHED));
-        assertEquals(
-                runtimeExecution.getStateTimestamp(ExecutionState.CANCELING),
-                archivedExecution.getStateTimestamp(ExecutionState.CANCELING));
-        assertEquals(
-                runtimeExecution.getStateTimestamp(ExecutionState.CANCELED),
-                archivedExecution.getStateTimestamp(ExecutionState.CANCELED));
-        assertEquals(
-                runtimeExecution.getStateTimestamp(ExecutionState.FAILED),
-                archivedExecution.getStateTimestamp(ExecutionState.FAILED));
-        assertEquals(
-                runtimeExecution.getStateEndTimestamp(ExecutionState.CREATED),
-                archivedExecution.getStateEndTimestamp(ExecutionState.CREATED));
-        assertEquals(
-                runtimeExecution.getStateEndTimestamp(ExecutionState.SCHEDULED),
-                archivedExecution.getStateEndTimestamp(ExecutionState.SCHEDULED));
-        assertEquals(
-                runtimeExecution.getStateEndTimestamp(ExecutionState.DEPLOYING),
-                archivedExecution.getStateEndTimestamp(ExecutionState.DEPLOYING));
-        assertEquals(
-                runtimeExecution.getStateEndTimestamp(ExecutionState.INITIALIZING),
-                archivedExecution.getStateEndTimestamp(ExecutionState.INITIALIZING));
-        assertEquals(
-                runtimeExecution.getStateEndTimestamp(ExecutionState.RUNNING),
-                archivedExecution.getStateEndTimestamp(ExecutionState.RUNNING));
-        assertEquals(
-                runtimeExecution.getStateEndTimestamp(ExecutionState.FINISHED),
-                archivedExecution.getStateEndTimestamp(ExecutionState.FINISHED));
-        assertEquals(
-                runtimeExecution.getStateEndTimestamp(ExecutionState.CANCELING),
-                archivedExecution.getStateEndTimestamp(ExecutionState.CANCELING));
-        assertEquals(
-                runtimeExecution.getStateEndTimestamp(ExecutionState.CANCELED),
-                archivedExecution.getStateEndTimestamp(ExecutionState.CANCELED));
-        assertEquals(
-                runtimeExecution.getStateEndTimestamp(ExecutionState.FAILED),
-                archivedExecution.getStateEndTimestamp(ExecutionState.FAILED));
-        compareStringifiedAccumulators(
-                runtimeExecution.getUserAccumulatorsStringified(),
-                archivedExecution.getUserAccumulatorsStringified());
-        assertEquals(
-                runtimeExecution.getParallelSubtaskIndex(),
-                archivedExecution.getParallelSubtaskIndex());
-    }
-
-    private static void compareStringifiedAccumulators(
-            StringifiedAccumulatorResult[] runtimeAccs,
-            StringifiedAccumulatorResult[] archivedAccs) {
-        assertEquals(runtimeAccs.length, archivedAccs.length);
-
-        for (int x = 0; x < runtimeAccs.length; x++) {
-            StringifiedAccumulatorResult runtimeResult = runtimeAccs[x];
-            StringifiedAccumulatorResult archivedResult = archivedAccs[x];
-
-            assertEquals(runtimeResult.getName(), archivedResult.getName());
-            assertEquals(runtimeResult.getType(), archivedResult.getType());
-            assertEquals(runtimeResult.getValue(), archivedResult.getValue());
-        }
-    }
-
-    private static void compareSerializedAccumulators(
-            Map<String, SerializedValue<OptionalFailure<Object>>> runtimeAccs,
-            Map<String, SerializedValue<OptionalFailure<Object>>> archivedAccs)
-            throws IOException, ClassNotFoundException {
-        assertEquals(runtimeAccs.size(), archivedAccs.size());
-        for (Entry<String, SerializedValue<OptionalFailure<Object>>> runtimeAcc :
-                runtimeAccs.entrySet()) {
-            long runtimeUserAcc =
-                    (long)
-                            runtimeAcc
-                                    .getValue()
-                                    .deserializeValue(ClassLoader.getSystemClassLoader())
-                                    .getUnchecked();
-            long archivedUserAcc =
-                    (long)
-                            archivedAccs
-                                    .get(runtimeAcc.getKey())
-                                    .deserializeValue(ClassLoader.getSystemClassLoader())
-                                    .getUnchecked();
-
-            assertEquals(runtimeUserAcc, archivedUserAcc);
+            ArchivedExecutionGraphTestUtils.compareExecutionVertex(
+                    runtimeExecutionVertices[x], archivedExecutionVertices[x]);
         }
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTestUtils.java
new file mode 100644
index 00000000000..55a10cd5396
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTestUtils.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.OptionalFailure;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class ArchivedExecutionGraphTestUtils {
+
+    private ArchivedExecutionGraphTestUtils() {}
+
+    static void compareExecutionVertex(
+            AccessExecutionVertex runtimeVertex, AccessExecutionVertex archivedVertex) {
+        assertThat(runtimeVertex.getTaskNameWithSubtaskIndex())
+                .isEqualTo(archivedVertex.getTaskNameWithSubtaskIndex());
+        assertThat(runtimeVertex.getParallelSubtaskIndex())
+                .isEqualTo(archivedVertex.getParallelSubtaskIndex());
+        assertThat(runtimeVertex.getExecutionState()).isEqualTo(archivedVertex.getExecutionState());
+        assertThat(runtimeVertex.getStateTimestamp(ExecutionState.CREATED))
+                .isEqualTo(archivedVertex.getStateTimestamp(ExecutionState.CREATED));
+        assertThat(runtimeVertex.getStateTimestamp(ExecutionState.SCHEDULED))
+                .isEqualTo(archivedVertex.getStateTimestamp(ExecutionState.SCHEDULED));
+        assertThat(runtimeVertex.getStateTimestamp(ExecutionState.DEPLOYING))
+                .isEqualTo(archivedVertex.getStateTimestamp(ExecutionState.DEPLOYING));
+        assertThat(runtimeVertex.getStateTimestamp(ExecutionState.INITIALIZING))
+                .isEqualTo(archivedVertex.getStateTimestamp(ExecutionState.INITIALIZING));
+        assertThat(runtimeVertex.getStateTimestamp(ExecutionState.RUNNING))
+                .isEqualTo(archivedVertex.getStateTimestamp(ExecutionState.RUNNING));
+        assertThat(runtimeVertex.getStateTimestamp(ExecutionState.FINISHED))
+                .isEqualTo(archivedVertex.getStateTimestamp(ExecutionState.FINISHED));
+        assertThat(runtimeVertex.getStateTimestamp(ExecutionState.CANCELING))
+                .isEqualTo(archivedVertex.getStateTimestamp(ExecutionState.CANCELING));
+        assertThat(runtimeVertex.getStateTimestamp(ExecutionState.CANCELED))
+                .isEqualTo(archivedVertex.getStateTimestamp(ExecutionState.CANCELED));
+        assertThat(runtimeVertex.getStateTimestamp(ExecutionState.FAILED))
+                .isEqualTo(archivedVertex.getStateTimestamp(ExecutionState.FAILED));
+        assertThat(runtimeVertex.getFailureInfo().map(ErrorInfo::getExceptionAsString))
+                .isEqualTo(archivedVertex.getFailureInfo().map(ErrorInfo::getExceptionAsString));
+        assertThat(runtimeVertex.getFailureInfo().map(ErrorInfo::getTimestamp))
+                .isEqualTo(archivedVertex.getFailureInfo().map(ErrorInfo::getTimestamp));
+        assertThat(runtimeVertex.getCurrentAssignedResourceLocation())
+                .isEqualTo(archivedVertex.getCurrentAssignedResourceLocation());
+
+        compareExecution(
+                runtimeVertex.getCurrentExecutionAttempt(),
+                archivedVertex.getCurrentExecutionAttempt());
+    }
+
+    private static void compareExecution(
+            AccessExecution runtimeExecution, AccessExecution archivedExecution) {
+        assertThat(runtimeExecution.getAttemptId()).isEqualTo(archivedExecution.getAttemptId());
+        assertThat(runtimeExecution.getAttemptNumber())
+                .isEqualTo(archivedExecution.getAttemptNumber());
+        assertThat(runtimeExecution.getStateTimestamps())
+                .containsExactly(archivedExecution.getStateTimestamps());
+        assertThat(runtimeExecution.getStateEndTimestamps())
+                .containsExactly(archivedExecution.getStateEndTimestamps());
+        assertThat(runtimeExecution.getState()).isEqualTo(archivedExecution.getState());
+        assertThat(runtimeExecution.getAssignedResourceLocation())
+                .isEqualTo(archivedExecution.getAssignedResourceLocation());
+        assertThat(runtimeExecution.getFailureInfo().map(ErrorInfo::getExceptionAsString))
+                .isEqualTo(archivedExecution.getFailureInfo().map(ErrorInfo::getExceptionAsString));
+        assertThat(runtimeExecution.getFailureInfo().map(ErrorInfo::getTimestamp))
+                .isEqualTo(archivedExecution.getFailureInfo().map(ErrorInfo::getTimestamp));
+        assertThat(runtimeExecution.getStateTimestamp(ExecutionState.CREATED))
+                .isEqualTo(archivedExecution.getStateTimestamp(ExecutionState.CREATED));
+        assertThat(runtimeExecution.getStateTimestamp(ExecutionState.SCHEDULED))
+                .isEqualTo(archivedExecution.getStateTimestamp(ExecutionState.SCHEDULED));
+        assertThat(runtimeExecution.getStateTimestamp(ExecutionState.DEPLOYING))
+                .isEqualTo(archivedExecution.getStateTimestamp(ExecutionState.DEPLOYING));
+        assertThat(runtimeExecution.getStateTimestamp(ExecutionState.INITIALIZING))
+                .isEqualTo(archivedExecution.getStateTimestamp(ExecutionState.INITIALIZING));
+        assertThat(runtimeExecution.getStateTimestamp(ExecutionState.RUNNING))
+                .isEqualTo(archivedExecution.getStateTimestamp(ExecutionState.RUNNING));
+        assertThat(runtimeExecution.getStateTimestamp(ExecutionState.FINISHED))
+                .isEqualTo(archivedExecution.getStateTimestamp(ExecutionState.FINISHED));
+        assertThat(runtimeExecution.getStateTimestamp(ExecutionState.CANCELING))
+                .isEqualTo(archivedExecution.getStateTimestamp(ExecutionState.CANCELING));
+        assertThat(runtimeExecution.getStateTimestamp(ExecutionState.CANCELED))
+                .isEqualTo(archivedExecution.getStateTimestamp(ExecutionState.CANCELED));
+        assertThat(runtimeExecution.getStateTimestamp(ExecutionState.FAILED))
+                .isEqualTo(archivedExecution.getStateTimestamp(ExecutionState.FAILED));
+        assertThat(runtimeExecution.getStateEndTimestamp(ExecutionState.CREATED))
+                .isEqualTo(archivedExecution.getStateEndTimestamp(ExecutionState.CREATED));
+        assertThat(runtimeExecution.getStateEndTimestamp(ExecutionState.SCHEDULED))
+                .isEqualTo(archivedExecution.getStateEndTimestamp(ExecutionState.SCHEDULED));
+        assertThat(runtimeExecution.getStateEndTimestamp(ExecutionState.DEPLOYING))
+                .isEqualTo(archivedExecution.getStateEndTimestamp(ExecutionState.DEPLOYING));
+        assertThat(runtimeExecution.getStateEndTimestamp(ExecutionState.INITIALIZING))
+                .isEqualTo(archivedExecution.getStateEndTimestamp(ExecutionState.INITIALIZING));
+        assertThat(runtimeExecution.getStateEndTimestamp(ExecutionState.RUNNING))
+                .isEqualTo(archivedExecution.getStateEndTimestamp(ExecutionState.RUNNING));
+        assertThat(runtimeExecution.getStateEndTimestamp(ExecutionState.FINISHED))
+                .isEqualTo(archivedExecution.getStateEndTimestamp(ExecutionState.FINISHED));
+        assertThat(runtimeExecution.getStateEndTimestamp(ExecutionState.CANCELING))
+                .isEqualTo(archivedExecution.getStateEndTimestamp(ExecutionState.CANCELING));
+        assertThat(runtimeExecution.getStateEndTimestamp(ExecutionState.CANCELED))
+                .isEqualTo(archivedExecution.getStateEndTimestamp(ExecutionState.CANCELED));
+        assertThat(runtimeExecution.getStateEndTimestamp(ExecutionState.FAILED))
+                .isEqualTo(archivedExecution.getStateEndTimestamp(ExecutionState.FAILED));
+        compareStringifiedAccumulators(
+                runtimeExecution.getUserAccumulatorsStringified(),
+                archivedExecution.getUserAccumulatorsStringified());
+        assertThat(runtimeExecution.getParallelSubtaskIndex())
+                .isEqualTo(archivedExecution.getParallelSubtaskIndex());
+    }
+
+    static void compareStringifiedAccumulators(
+            StringifiedAccumulatorResult[] runtimeAccs,
+            StringifiedAccumulatorResult[] archivedAccs) {
+        assertThat(runtimeAccs.length).isEqualTo(archivedAccs.length);
+
+        for (int x = 0; x < runtimeAccs.length; x++) {
+            StringifiedAccumulatorResult runtimeResult = runtimeAccs[x];
+            StringifiedAccumulatorResult archivedResult = archivedAccs[x];
+
+            assertThat(runtimeResult.getName()).isEqualTo(archivedResult.getName());
+            assertThat(runtimeResult.getType()).isEqualTo(archivedResult.getType());
+            assertThat(runtimeResult.getValue()).isEqualTo(archivedResult.getValue());
+        }
+    }
+
+    static void compareSerializedAccumulators(
+            Map<String, SerializedValue<OptionalFailure<Object>>> runtimeAccs,
+            Map<String, SerializedValue<OptionalFailure<Object>>> archivedAccs)
+            throws IOException, ClassNotFoundException {
+        assertThat(runtimeAccs.size()).isEqualTo(archivedAccs.size());
+        for (Entry<String, SerializedValue<OptionalFailure<Object>>> runtimeAcc :
+                runtimeAccs.entrySet()) {
+            long runtimeUserAcc =
+                    (long)
+                            runtimeAcc
+                                    .getValue()
+                                    .deserializeValue(ClassLoader.getSystemClassLoader())
+                                    .getUnchecked();
+            long archivedUserAcc =
+                    (long)
+                            archivedAccs
+                                    .get(runtimeAcc.getKey())
+                                    .deserializeValue(ClassLoader.getSystemClassLoader())
+                                    .getUnchecked();
+
+            assertThat(runtimeUserAcc).isEqualTo(archivedUserAcc);
+        }
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java
index 0ab6ddcd975..790ca43ce7d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java
@@ -21,20 +21,19 @@ package org.apache.flink.runtime.messages.webmonitor;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.rest.util.RestMapperUtils;
-import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link JobDetails}. */
-public class JobDetailsTest extends TestLogger {
+class JobDetailsTest {
     private static final String COMPATIBLE_JOB_DETAILS =
             "{"
                     + "  \"jid\" : \"7a7c3291accebd10b6be8d4f8c8d8dfc\","
@@ -60,7 +59,7 @@ public class JobDetailsTest extends TestLogger {
 
     /** Tests that we can marshal and unmarshal JobDetails instances. */
     @Test
-    public void testJobDetailsMarshalling() throws JsonProcessingException {
+    void testJobDetailsMarshalling() throws JsonProcessingException {
         final JobDetails expected =
                 new JobDetails(
                         new JobID(),
@@ -79,11 +78,11 @@ public class JobDetailsTest extends TestLogger {
 
         final JobDetails unmarshalled = objectMapper.treeToValue(marshalled, JobDetails.class);
 
-        assertEquals(expected, unmarshalled);
+        assertThat(unmarshalled).isEqualTo(expected);
     }
 
     @Test
-    public void testJobDetailsCompatibleUnmarshalling() throws IOException {
+    void testJobDetailsCompatibleUnmarshalling() throws IOException {
         final JobDetails expected =
                 new JobDetails(
                         JobID.fromHexString("7a7c3291accebd10b6be8d4f8c8d8dfc"),
@@ -101,6 +100,6 @@ public class JobDetailsTest extends TestLogger {
         final JobDetails unmarshalled =
                 objectMapper.readValue(COMPATIBLE_JOB_DETAILS, JobDetails.class);
 
-        assertEquals(expected, unmarshalled);
+        assertThat(unmarshalled).isEqualTo(expected);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
index cfd7f51af35..52eec21ab5d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
@@ -26,8 +26,8 @@ import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.metrics.util.TestHistogram;
 
-import org.junit.Assert;
-import org.junit.Test;
+import org.assertj.core.data.Offset;
+import org.junit.jupiter.api.Test;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -41,14 +41,13 @@ import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_C
 import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_GAUGE;
 import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_HISTOGRAM;
 import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_METER;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 
 /** Tests for the {@link MetricDumpSerialization}. */
-public class MetricDumpSerializerTest {
+class MetricDumpSerializerTest {
     @Test
-    public void testNullGaugeHandling() throws IOException {
+    void testNullGaugeHandling() throws IOException {
         MetricDumpSerialization.MetricDumpSerializer serializer =
                 new MetricDumpSerialization.MetricDumpSerializer();
         MetricDumpSerialization.MetricDumpDeserializer deserializer =
@@ -74,17 +73,16 @@ public class MetricDumpSerializerTest {
                         Collections.<Meter, Tuple2<QueryScopeInfo, String>>emptyMap());
 
         // no metrics should be serialized
-        Assert.assertEquals(0, output.serializedCounters.length);
-        Assert.assertEquals(0, output.serializedGauges.length);
-        Assert.assertEquals(0, output.serializedHistograms.length);
-        Assert.assertEquals(0, output.serializedMeters.length);
-
+        assertThat(output.serializedCounters).isEmpty();
+        assertThat(output.serializedGauges).isEmpty();
+        assertThat(output.serializedHistograms).isEmpty();
+        assertThat(output.serializedMeters).isEmpty();
         List<MetricDump> deserialized = deserializer.deserialize(output);
-        Assert.assertEquals(0, deserialized.size());
+        assertThat(deserialized).isEmpty();
     }
 
     @Test
-    public void testJavaSerialization() throws IOException {
+    void testJavaSerialization() throws IOException {
         MetricDumpSerialization.MetricDumpSerializer serializer =
                 new MetricDumpSerialization.MetricDumpSerializer();
 
@@ -100,7 +98,7 @@ public class MetricDumpSerializerTest {
     }
 
     @Test
-    public void testSerialization() throws IOException {
+    void testSerialization() throws IOException {
         MetricDumpSerialization.MetricDumpSerializer serializer =
                 new MetricDumpSerialization.MetricDumpSerializer();
         MetricDumpSerialization.MetricDumpDeserializer deserializer =
@@ -174,7 +172,7 @@ public class MetricDumpSerializerTest {
 
         // ===== Counters
         // ==============================================================================================
-        assertEquals(5, deserialized.size());
+        assertThat(deserialized.size()).isEqualTo(5);
 
         for (MetricDump metric : deserialized) {
             switch (metric.getCategory()) {
@@ -182,87 +180,86 @@ public class MetricDumpSerializerTest {
                     MetricDump.CounterDump counterDump = (MetricDump.CounterDump) metric;
                     switch ((byte) counterDump.count) {
                         case 1:
-                            assertTrue(
-                                    counterDump.scopeInfo
-                                            instanceof QueryScopeInfo.JobManagerQueryScopeInfo);
-                            assertEquals("A", counterDump.scopeInfo.scope);
-                            assertEquals("c1", counterDump.name);
+                            assertThat(counterDump.scopeInfo)
+                                    .isInstanceOf(QueryScopeInfo.JobManagerQueryScopeInfo.class);
+                            assertThat(counterDump.scopeInfo.scope).isEqualTo("A");
+                            assertThat(counterDump.name).isEqualTo("c1");
                             counters.remove(c1);
                             break;
                         case 2:
-                            assertTrue(
-                                    counterDump.scopeInfo
-                                            instanceof QueryScopeInfo.TaskManagerQueryScopeInfo);
-                            assertEquals("B", counterDump.scopeInfo.scope);
-                            assertEquals("c2", counterDump.name);
-                            assertEquals(
-                                    "tmid",
-                                    ((QueryScopeInfo.TaskManagerQueryScopeInfo)
-                                                    counterDump.scopeInfo)
-                                            .taskManagerID);
+                            assertThat(counterDump.scopeInfo)
+                                    .isInstanceOf(QueryScopeInfo.TaskManagerQueryScopeInfo.class);
+                            assertThat(counterDump.scopeInfo.scope).isEqualTo("B");
+                            assertThat(counterDump.name).isEqualTo("c2");
+                            assertThat("tmid")
+                                    .isEqualTo(
+                                            ((QueryScopeInfo.TaskManagerQueryScopeInfo)
+                                                            counterDump.scopeInfo)
+                                                    .taskManagerID);
                             counters.remove(c2);
                             break;
                         default:
-                            fail();
+                            fail("Unexpected counter count.");
                     }
                     break;
                 case METRIC_CATEGORY_GAUGE:
                     MetricDump.GaugeDump gaugeDump = (MetricDump.GaugeDump) metric;
-                    assertEquals("4", gaugeDump.value);
-                    assertEquals("g1", gaugeDump.name);
+                    assertThat(gaugeDump.value).isEqualTo("4");
+                    assertThat(gaugeDump.name).isEqualTo("g1");
 
-                    assertTrue(gaugeDump.scopeInfo instanceof QueryScopeInfo.TaskQueryScopeInfo);
+                    assertThat(gaugeDump.scopeInfo)
+                            .isInstanceOf(QueryScopeInfo.TaskQueryScopeInfo.class);
                     QueryScopeInfo.TaskQueryScopeInfo taskInfo =
                             (QueryScopeInfo.TaskQueryScopeInfo) gaugeDump.scopeInfo;
-                    assertEquals("D", taskInfo.scope);
-                    assertEquals("jid", taskInfo.jobID);
-                    assertEquals("vid", taskInfo.vertexID);
-                    assertEquals(2, taskInfo.subtaskIndex);
+                    assertThat(taskInfo.scope).isEqualTo("D");
+                    assertThat(taskInfo.jobID).isEqualTo("jid");
+                    assertThat(taskInfo.vertexID).isEqualTo("vid");
+                    assertThat(taskInfo.subtaskIndex).isEqualTo(2);
                     gauges.remove(g1);
                     break;
                 case METRIC_CATEGORY_HISTOGRAM:
                     MetricDump.HistogramDump histogramDump = (MetricDump.HistogramDump) metric;
-                    assertEquals("h1", histogramDump.name);
-                    assertEquals(0.5, histogramDump.median, 0.1);
-                    assertEquals(0.75, histogramDump.p75, 0.1);
-                    assertEquals(0.90, histogramDump.p90, 0.1);
-                    assertEquals(0.95, histogramDump.p95, 0.1);
-                    assertEquals(0.98, histogramDump.p98, 0.1);
-                    assertEquals(0.99, histogramDump.p99, 0.1);
-                    assertEquals(0.999, histogramDump.p999, 0.1);
-                    assertEquals(4, histogramDump.mean, 0.1);
-                    assertEquals(5, histogramDump.stddev, 0.1);
-                    assertEquals(6, histogramDump.max);
-                    assertEquals(7, histogramDump.min);
+                    assertThat(histogramDump.name).isEqualTo("h1");
+                    assertThat(histogramDump.median).isCloseTo(0.5, Offset.offset(0.1));
+                    assertThat(histogramDump.p75).isCloseTo(0.75, Offset.offset(0.1));
+                    assertThat(histogramDump.p90).isCloseTo(0.9, Offset.offset(0.1));
+                    assertThat(histogramDump.p95).isCloseTo(0.95, Offset.offset(0.1));
+                    assertThat(histogramDump.p98).isCloseTo(0.98, Offset.offset(0.1));
+                    assertThat(histogramDump.p99).isCloseTo(0.99, Offset.offset(0.1));
+                    assertThat(histogramDump.p999).isCloseTo(0.999, Offset.offset(0.1));
+                    assertThat(histogramDump.mean).isCloseTo(4, Offset.offset(0.1));
+                    assertThat(histogramDump.stddev).isCloseTo(5, Offset.offset(0.1));
+                    assertThat(histogramDump.max).isEqualTo(6);
+                    assertThat(histogramDump.min).isEqualTo(7);
 
-                    assertTrue(
-                            histogramDump.scopeInfo
-                                    instanceof QueryScopeInfo.OperatorQueryScopeInfo);
+                    assertThat(histogramDump.scopeInfo)
+                            .isInstanceOf(QueryScopeInfo.OperatorQueryScopeInfo.class);
                     QueryScopeInfo.OperatorQueryScopeInfo opInfo =
                             (QueryScopeInfo.OperatorQueryScopeInfo) histogramDump.scopeInfo;
-                    assertEquals("E", opInfo.scope);
-                    assertEquals("jid", opInfo.jobID);
-                    assertEquals("vid", opInfo.vertexID);
-                    assertEquals(2, opInfo.subtaskIndex);
-                    assertEquals("opname", opInfo.operatorName);
+                    assertThat(opInfo.scope).isEqualTo("E");
+                    assertThat(opInfo.jobID).isEqualTo("jid");
+                    assertThat(opInfo.vertexID).isEqualTo("vid");
+                    assertThat(opInfo.subtaskIndex).isEqualTo(2);
+                    assertThat(opInfo.operatorName).isEqualTo("opname");
                     histograms.remove(h1);
                     break;
                 case METRIC_CATEGORY_METER:
                     MetricDump.MeterDump meterDump = (MetricDump.MeterDump) metric;
-                    assertEquals(5.0, meterDump.rate, 0.1);
+                    assertThat(meterDump.rate).isCloseTo(5.0, Offset.offset(0.1));
 
-                    assertTrue(meterDump.scopeInfo instanceof QueryScopeInfo.JobQueryScopeInfo);
-                    assertEquals("C", meterDump.scopeInfo.scope);
-                    assertEquals("c3", meterDump.name);
-                    assertEquals(
-                            "jid", ((QueryScopeInfo.JobQueryScopeInfo) meterDump.scopeInfo).jobID);
+                    assertThat(meterDump.scopeInfo)
+                            .isInstanceOf(QueryScopeInfo.JobQueryScopeInfo.class);
+                    assertThat(meterDump.scopeInfo.scope).isEqualTo("C");
+                    assertThat(meterDump.name).isEqualTo("c3");
+                    assertThat(((QueryScopeInfo.JobQueryScopeInfo) meterDump.scopeInfo).jobID)
+                            .isEqualTo("jid");
                     break;
                 default:
-                    fail();
+                    fail("Unexpected metric type: " + metric.getCategory());
             }
         }
-        assertTrue(counters.isEmpty());
-        assertTrue(gauges.isEmpty());
-        assertTrue(histograms.isEmpty());
+        assertThat(counters).isEmpty();
+        assertThat(gauges).isEmpty();
+        assertThat(histograms).isEmpty();
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
index 029f53e57cc..0355d01fcf0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
@@ -18,144 +18,144 @@
 
 package org.apache.flink.runtime.metrics.dump;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link QueryScopeInfo} classes. */
-public class QueryScopeInfoTest {
+class QueryScopeInfoTest {
     @Test
-    public void testJobManagerQueryScopeInfo() {
+    void testJobManagerQueryScopeInfo() {
         QueryScopeInfo.JobManagerQueryScopeInfo info =
                 new QueryScopeInfo.JobManagerQueryScopeInfo();
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_JM, info.getCategory());
-        assertEquals("", info.scope);
+        assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_JM);
+        assertThat(info.scope).isEmpty();
 
         info = info.copy("world");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_JM, info.getCategory());
-        assertEquals("world", info.scope);
+        assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_JM);
+        assertThat(info.scope).isEqualTo("world");
 
         info = new QueryScopeInfo.JobManagerQueryScopeInfo("hello");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_JM, info.getCategory());
-        assertEquals("hello", info.scope);
+        assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_JM);
+        assertThat(info.scope).isEqualTo("hello");
 
         info = info.copy("world");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_JM, info.getCategory());
-        assertEquals("hello.world", info.scope);
+        assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_JM);
+        assertThat(info.scope).isEqualTo("hello.world");
     }
 
     @Test
-    public void testTaskManagerQueryScopeInfo() {
+    void testTaskManagerQueryScopeInfo() {
         QueryScopeInfo.TaskManagerQueryScopeInfo info =
                 new QueryScopeInfo.TaskManagerQueryScopeInfo("tmid");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_TM, info.getCategory());
-        assertEquals("", info.scope);
-        assertEquals("tmid", info.taskManagerID);
+        assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_TM);
+        assertThat(info.scope).isEmpty();
+        assertThat(info.taskManagerID).isEqualTo("tmid");
 
         info = info.copy("world");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_TM, info.getCategory());
-        assertEquals("world", info.scope);
-        assertEquals("tmid", info.taskManagerID);
+        assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_TM);
+        assertThat(info.scope).isEqualTo("world");
+        assertThat(info.taskManagerID).isEqualTo("tmid");
 
         info = new QueryScopeInfo.TaskManagerQueryScopeInfo("tmid", "hello");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_TM, info.getCategory());
-        assertEquals("hello", info.scope);
-        assertEquals("tmid", info.taskManagerID);
+        assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_TM);
+        assertThat(info.scope).isEqualTo("hello");
+        assertThat(info.taskManagerID).isEqualTo("tmid");
 
         info = info.copy("world");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_TM, info.getCategory());
-        assertEquals("hello.world", info.scope);
-        assertEquals("tmid", info.taskManagerID);
+        assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_TM);
+        assertThat(info.scope).isEqualTo("hello.world");
+        assertThat(info.taskManagerID).isEqualTo("tmid");
     }
 
     @Test
-    public void testJobQueryScopeInfo() {
+    void testJobQueryScopeInfo() {
         QueryScopeInfo.JobQueryScopeInfo info = new QueryScopeInfo.JobQueryScopeInfo("jobid");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_JOB, info.getCategory());
-        assertEquals("", info.scope);
-        assertEquals("jobid", info.jobID);
+        assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_JOB);
+        assertThat(info.scope).isEmpty();
+        assertThat(info.jobID).isEqualTo("jobid");
 
         info = info.copy("world");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_JOB, info.getCategory());
-        assertEquals("world", info.scope);
-        assertEquals("jobid", info.jobID);
+        assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_JOB);
+        assertThat(info.scope).isEqualTo("world");
+        assertThat(info.jobID).isEqualTo("jobid");
 
         info = new QueryScopeInfo.JobQueryScopeInfo("jobid", "hello");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_JOB, info.getCategory());
-        assertEquals("hello", info.scope);
-        assertEquals("jobid", info.jobID);
+        assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_JOB);
+        assertThat(info.scope).isEqualTo("hello");
+        assertThat(info.jobID).isEqualTo("jobid");
 
         info = info.copy("world");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_JOB, info.getCategory());
-        assertEquals("hello.world", info.scope);
-        assertEquals("jobid", info.jobID);
+        assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_JOB);
+        assertThat(info.scope).isEqualTo("hello.world");
+        assertThat(info.jobID).isEqualTo("jobid");
     }
 
     @Test
-    public void testTaskQueryScopeInfo() {
+    void testTaskQueryScopeInfo() {
         QueryScopeInfo.TaskQueryScopeInfo info =
                 new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 2);
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_TASK, info.getCategory());
-        assertEquals("", info.scope);
-        assertEquals("jobid", info.jobID);
-        assertEquals("taskid", info.vertexID);
-        assertEquals(2, info.subtaskIndex);
+        assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_TASK);
+        assertThat(info.scope).isEmpty();
+        assertThat(info.jobID).isEqualTo("jobid");
+        assertThat(info.vertexID).isEqualTo("taskid");
+        assertThat(info.subtaskIndex).isEqualTo(2);
 
         info = info.copy("world");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_TASK, info.getCategory());
-        assertEquals("world", info.scope);
-        assertEquals("jobid", info.jobID);
-        assertEquals("taskid", info.vertexID);
-        assertEquals(2, info.subtaskIndex);
+        assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_TASK);
+        assertThat(info.scope).isEqualTo("world");
+        assertThat(info.jobID).isEqualTo("jobid");
+        assertThat(info.vertexID).isEqualTo("taskid");
+        assertThat(info.subtaskIndex).isEqualTo(2);
 
         info = new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 2, "hello");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_TASK, info.getCategory());
-        assertEquals("hello", info.scope);
-        assertEquals("jobid", info.jobID);
-        assertEquals("taskid", info.vertexID);
-        assertEquals(2, info.subtaskIndex);
+        assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_TASK);
+        assertThat(info.scope).isEqualTo("hello");
+        assertThat(info.jobID).isEqualTo("jobid");
+        assertThat(info.vertexID).isEqualTo("taskid");
+        assertThat(info.subtaskIndex).isEqualTo(2);
 
         info = info.copy("world");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_TASK, info.getCategory());
-        assertEquals("hello.world", info.scope);
-        assertEquals("jobid", info.jobID);
-        assertEquals("taskid", info.vertexID);
-        assertEquals(2, info.subtaskIndex);
+        assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_TASK);
+        assertThat(info.scope).isEqualTo("hello.world");
+        assertThat(info.jobID).isEqualTo("jobid");
+        assertThat(info.vertexID).isEqualTo("taskid");
+        assertThat(info.subtaskIndex).isEqualTo(2);
     }
 
     @Test
-    public void testOperatorQueryScopeInfo() {
+    void testOperatorQueryScopeInfo() {
         QueryScopeInfo.OperatorQueryScopeInfo info =
                 new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 2, "opname");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_OPERATOR, info.getCategory());
-        assertEquals("", info.scope);
-        assertEquals("jobid", info.jobID);
-        assertEquals("taskid", info.vertexID);
-        assertEquals("opname", info.operatorName);
-        assertEquals(2, info.subtaskIndex);
+        assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_OPERATOR);
+        assertThat(info.scope).isEmpty();
+        assertThat(info.jobID).isEqualTo("jobid");
+        assertThat(info.vertexID).isEqualTo("taskid");
+        assertThat(info.operatorName).isEqualTo("opname");
+        assertThat(info.subtaskIndex).isEqualTo(2);
 
         info = info.copy("world");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_OPERATOR, info.getCategory());
-        assertEquals("world", info.scope);
-        assertEquals("jobid", info.jobID);
-        assertEquals("taskid", info.vertexID);
-        assertEquals("opname", info.operatorName);
-        assertEquals(2, info.subtaskIndex);
+        assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_OPERATOR);
+        assertThat(info.scope).isEqualTo("world");
+        assertThat(info.jobID).isEqualTo("jobid");
+        assertThat(info.vertexID).isEqualTo("taskid");
+        assertThat(info.operatorName).isEqualTo("opname");
+        assertThat(info.subtaskIndex).isEqualTo(2);
 
         info = new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 2, "opname", "hello");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_OPERATOR, info.getCategory());
-        assertEquals("hello", info.scope);
-        assertEquals("jobid", info.jobID);
-        assertEquals("taskid", info.vertexID);
-        assertEquals("opname", info.operatorName);
-        assertEquals(2, info.subtaskIndex);
+        assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_OPERATOR);
+        assertThat(info.scope).isEqualTo("hello");
+        assertThat(info.jobID).isEqualTo("jobid");
+        assertThat(info.vertexID).isEqualTo("taskid");
+        assertThat(info.operatorName).isEqualTo("opname");
+        assertThat(info.subtaskIndex).isEqualTo(2);
 
         info = info.copy("world");
-        assertEquals(QueryScopeInfo.INFO_CATEGORY_OPERATOR, info.getCategory());
-        assertEquals("hello.world", info.scope);
-        assertEquals("jobid", info.jobID);
-        assertEquals("taskid", info.vertexID);
-        assertEquals("opname", info.operatorName);
-        assertEquals(2, info.subtaskIndex);
+        assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_OPERATOR);
+        assertThat(info.scope).isEqualTo("hello.world");
+        assertThat(info.jobID).isEqualTo("jobid");
+        assertThat(info.vertexID).isEqualTo("taskid");
+        assertThat(info.operatorName).isEqualTo("opname");
+        assertThat(info.subtaskIndex).isEqualTo(2);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
index d38b7c1b8ed..77cd5047d3e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
@@ -38,8 +38,8 @@ import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
 import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -52,12 +52,10 @@ import java.util.stream.Collectors;
 import static org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo.VertexBackPressureLevel.HIGH;
 import static org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo.VertexBackPressureLevel.LOW;
 import static org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo.VertexBackPressureLevel.OK;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link JobVertexBackPressureHandler}. */
-public class JobVertexBackPressureHandlerTest {
+class JobVertexBackPressureHandlerTest {
 
     /** Job ID for which back pressure stats exist. */
     private static final JobID TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE = new JobID();
@@ -107,8 +105,8 @@ public class JobVertexBackPressureHandlerTest {
         return dumps;
     }
 
-    @Before
-    public void setUp() {
+    @BeforeEach
+    void setUp() {
         metricStore = new MetricStore();
         for (MetricDump metricDump : getMetricDumps()) {
             metricStore.add(metricDump);
@@ -141,7 +139,7 @@ public class JobVertexBackPressureHandlerTest {
     }
 
     @Test
-    public void testGetBackPressure() throws Exception {
+    void testGetBackPressure() throws Exception {
         final Map<String, String> pathParameters = new HashMap<>();
         pathParameters.put(
                 JobIDPathParameter.KEY, TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString());
@@ -161,42 +159,42 @@ public class JobVertexBackPressureHandlerTest {
         final JobVertexBackPressureInfo jobVertexBackPressureInfo =
                 jobVertexBackPressureInfoCompletableFuture.get();
 
-        assertThat(jobVertexBackPressureInfo.getStatus(), equalTo(VertexBackPressureStatus.OK));
-        assertThat(jobVertexBackPressureInfo.getBackpressureLevel(), equalTo(HIGH));
+        assertThat(jobVertexBackPressureInfo.getStatus()).isEqualTo(VertexBackPressureStatus.OK);
+        assertThat(jobVertexBackPressureInfo.getBackpressureLevel()).isEqualTo(HIGH);
 
         assertThat(
-                jobVertexBackPressureInfo.getSubtasks().stream()
-                        .map(SubtaskBackPressureInfo::getBackPressuredRatio)
-                        .collect(Collectors.toList()),
-                contains(1.0, 0.5, 0.1));
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                .map(SubtaskBackPressureInfo::getBackPressuredRatio)
+                                .collect(Collectors.toList()))
+                .containsExactly(1.0, 0.5, 0.1);
 
         assertThat(
-                jobVertexBackPressureInfo.getSubtasks().stream()
-                        .map(SubtaskBackPressureInfo::getIdleRatio)
-                        .collect(Collectors.toList()),
-                contains(0.0, 0.1, 0.2));
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                .map(SubtaskBackPressureInfo::getIdleRatio)
+                                .collect(Collectors.toList()))
+                .containsExactly(0.0, 0.1, 0.2);
 
         assertThat(
-                jobVertexBackPressureInfo.getSubtasks().stream()
-                        .map(SubtaskBackPressureInfo::getBusyRatio)
-                        .collect(Collectors.toList()),
-                contains(0.0, 0.9, 0.7));
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                .map(SubtaskBackPressureInfo::getBusyRatio)
+                                .collect(Collectors.toList()))
+                .containsExactly(0.0, 0.9, 0.7);
 
         assertThat(
-                jobVertexBackPressureInfo.getSubtasks().stream()
-                        .map(SubtaskBackPressureInfo::getBackpressureLevel)
-                        .collect(Collectors.toList()),
-                contains(HIGH, LOW, OK));
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                .map(SubtaskBackPressureInfo::getBackpressureLevel)
+                                .collect(Collectors.toList()))
+                .containsExactly(HIGH, LOW, OK);
 
         assertThat(
-                jobVertexBackPressureInfo.getSubtasks().stream()
-                        .map(SubtaskBackPressureInfo::getSubtask)
-                        .collect(Collectors.toList()),
-                contains(0, 1, 3));
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                .map(SubtaskBackPressureInfo::getSubtask)
+                                .collect(Collectors.toList()))
+                .containsExactly(0, 1, 3);
     }
 
     @Test
-    public void testAbsentBackPressure() throws Exception {
+    void testAbsentBackPressure() throws Exception {
         final Map<String, String> pathParameters = new HashMap<>();
         pathParameters.put(
                 JobIDPathParameter.KEY, TEST_JOB_ID_BACK_PRESSURE_STATS_ABSENT.toString());
@@ -216,8 +214,7 @@ public class JobVertexBackPressureHandlerTest {
         final JobVertexBackPressureInfo jobVertexBackPressureInfo =
                 jobVertexBackPressureInfoCompletableFuture.get();
 
-        assertThat(
-                jobVertexBackPressureInfo.getStatus(),
-                equalTo(VertexBackPressureStatus.DEPRECATED));
+        assertThat(jobVertexBackPressureInfo.getStatus())
+                .isEqualTo(VertexBackPressureStatus.DEPRECATED);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
index 9bb6d1fde71..97c739df224 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
@@ -20,38 +20,40 @@ package org.apache.flink.runtime.rest.handler.legacy.metrics;
 
 import org.apache.flink.runtime.metrics.dump.MetricDump;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the MetricStore. */
-public class MetricStoreTest extends TestLogger {
+class MetricStoreTest {
+
     @Test
-    public void testAdd() throws IOException {
+    void testAdd() throws IOException {
         MetricStore store = setupStore(new MetricStore());
 
-        assertEquals("0", store.getJobManagerMetricStore().getMetric("abc.metric1", "-1"));
-        assertEquals("1", store.getTaskManagerMetricStore("tmid").getMetric("abc.metric2", "-1"));
-        assertEquals("2", store.getJobMetricStore("jobid").getMetric("abc.metric3", "-1"));
-        assertEquals("3", store.getJobMetricStore("jobid").getMetric("abc.metric4", "-1"));
-        assertEquals(
-                "4", store.getTaskMetricStore("jobid", "taskid").getMetric("8.abc.metric5", "-1"));
-        assertEquals(
-                "5",
-                store.getTaskMetricStore("jobid", "taskid")
-                        .getMetric("8.opname.abc.metric6", "-1"));
-        assertEquals(
-                "6",
-                store.getTaskMetricStore("jobid", "taskid")
-                        .getMetric("8.opname.abc.metric7", "-1"));
+        assertThat(store.getJobManagerMetricStore().getMetric("abc.metric1", "-1")).isEqualTo("0");
+        assertThat(store.getTaskManagerMetricStore("tmid").getMetric("abc.metric2", "-1"))
+                .isEqualTo("1");
+        assertThat(store.getJobMetricStore("jobid").getMetric("abc.metric3", "-1")).isEqualTo("2");
+        assertThat(store.getJobMetricStore("jobid").getMetric("abc.metric4", "-1")).isEqualTo("3");
+
+        assertThat(store.getTaskMetricStore("jobid", "taskid").getMetric("8.abc.metric5", "-1"))
+                .isEqualTo("4");
+        assertThat(
+                        store.getTaskMetricStore("jobid", "taskid")
+                                .getMetric("8.opname.abc.metric6", "-1"))
+                .isEqualTo("5");
+        assertThat(
+                        store.getTaskMetricStore("jobid", "taskid")
+                                .getMetric("8.opname.abc.metric7", "-1"))
+                .isEqualTo("6");
     }
 
     @Test
-    public void testMalformedNameHandling() {
+    void testMalformedNameHandling() {
         MetricStore store = new MetricStore();
         // -----verify that no exceptions are thrown
 
@@ -64,12 +66,12 @@ public class MetricStoreTest extends TestLogger {
         store.add(cd);
 
         // -----verify that no side effects occur
-        assertEquals(0, store.getJobManager().metrics.size());
-        assertEquals(0, store.getTaskManagers().size());
-        assertEquals(0, store.getJobs().size());
+        assertThat(store.getJobManager().metrics).isEmpty();
+        assertThat(store.getTaskManagers()).isEmpty();
+        assertThat(store.getJobs()).isEmpty();
     }
 
-    public static MetricStore setupStore(MetricStore store) {
+    static MetricStore setupStore(MetricStore store) {
         QueryScopeInfo.JobManagerQueryScopeInfo jm =
                 new QueryScopeInfo.JobManagerQueryScopeInfo("abc");
         MetricDump.CounterDump cd1 = new MetricDump.CounterDump(jm, "metric1", 0);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
index ee6f15c2b29..95a64901af9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
@@ -39,13 +39,12 @@ import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMetricsInfo;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -54,11 +53,10 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests the {@link TaskManagerDetailsHandler} implementation. */
-public class TaskManagerDetailsHandlerTest extends TestLogger {
+class TaskManagerDetailsHandlerTest {
 
     private static final ResourceID TASK_MANAGER_ID = ResourceID.generate();
 
@@ -67,8 +65,8 @@ public class TaskManagerDetailsHandlerTest extends TestLogger {
 
     private TaskManagerDetailsHandler testInstance;
 
-    @Before
-    public void setup() throws HandlerRequestException {
+    @BeforeEach
+    void setup() throws HandlerRequestException {
         resourceManagerGateway = new TestingResourceManagerGateway();
         metricFetcher = new TestingMetricFetcher();
 
@@ -83,7 +81,7 @@ public class TaskManagerDetailsHandlerTest extends TestLogger {
     }
 
     @Test
-    public void testTaskManagerMetricsInfoExtraction()
+    void testTaskManagerMetricsInfoExtraction()
             throws RestHandlerException, ExecutionException, InterruptedException,
                     JsonProcessingException, HandlerRequestException {
         initializeMetricStore(metricFetcher.getMetricStore());
@@ -124,7 +122,7 @@ public class TaskManagerDetailsHandlerTest extends TestLogger {
         String actualJson = objectMapper.writeValueAsString(actual);
         String expectedJson = objectMapper.writeValueAsString(expected);
 
-        assertThat(actualJson, is(expectedJson));
+        assertThat(actualJson).isEqualTo(expectedJson);
     }
 
     private static void initializeMetricStore(MetricStore metricStore) {


[flink] 04/06: [FLINK-28588][rest] Archive all current executions in ArchivedExecutionVertex.

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 44c00dbb4a083ff197442e2ce6440558be252787
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Tue Jul 26 16:39:40 2022 +0800

    [FLINK-28588][rest] Archive all current executions in ArchivedExecutionVertex.
---
 .../executiongraph/AccessExecutionVertex.java      |   9 +
 .../executiongraph/ArchivedExecutionVertex.java    |  26 ++-
 .../ArchivedSpeculativeExecutionVertex.java        |  52 -----
 .../executiongraph/SpeculativeExecutionVertex.java |   4 +-
 .../ArchivedExecutionGraphTestUtils.java           |  21 ++
 ...xecutionVertexWithSpeculativeExecutionTest.java | 223 +++++++++++++++++++++
 6 files changed, 280 insertions(+), 55 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java
index 6775424ba4b..f8d4581c7e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
+import java.util.Collection;
 import java.util.Optional;
 
 /** Common interface for the runtime {@link ExecutionVertex} and {@link ArchivedExecutionVertex}. */
@@ -46,6 +47,14 @@ public interface AccessExecutionVertex {
      */
     AccessExecution getCurrentExecutionAttempt();
 
+    /**
+     * Returns the current executions for this execution vertex. The returned collection must
+     * contain the current execution attempt.
+     *
+     * @return current executions
+     */
+    <T extends AccessExecution> Collection<T> getCurrentExecutions();
+
     /**
      * Returns the current {@link ExecutionState} for this execution vertex.
      *
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
index e5feba5445a..d9f8448a702 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
@@ -18,10 +18,14 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -40,15 +44,29 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa
 
     private final ArchivedExecution currentExecution; // this field must never be null
 
+    private final Collection<AccessExecution> currentExecutions;
+
     // ------------------------------------------------------------------------
 
     public ArchivedExecutionVertex(ExecutionVertex vertex) {
         this.subTaskIndex = vertex.getParallelSubtaskIndex();
         this.executionHistory = getCopyOfExecutionHistory(vertex);
         this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex();
-        this.currentExecution = vertex.getCurrentExecutionAttempt().archive();
+
+        Execution vertexCurrentExecution = vertex.getCurrentExecutionAttempt();
+        ArrayList<AccessExecution> currentExecutionList =
+                new ArrayList<>(vertex.getCurrentExecutions().size());
+        currentExecution = vertexCurrentExecution.archive();
+        currentExecutionList.add(currentExecution);
+        for (Execution execution : vertex.getCurrentExecutions()) {
+            if (execution != vertexCurrentExecution) {
+                currentExecutionList.add(execution.archive());
+            }
+        }
+        currentExecutions = Collections.unmodifiableList(currentExecutionList);
     }
 
+    @VisibleForTesting
     public ArchivedExecutionVertex(
             int subTaskIndex,
             String taskNameWithSubtask,
@@ -58,6 +76,7 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa
         this.taskNameWithSubtask = checkNotNull(taskNameWithSubtask);
         this.currentExecution = checkNotNull(currentExecution);
         this.executionHistory = checkNotNull(executionHistory);
+        this.currentExecutions = Collections.singletonList(currentExecution);
     }
 
     // --------------------------------------------------------------------------------------------
@@ -79,6 +98,11 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa
         return currentExecution;
     }
 
+    @Override
+    public Collection<AccessExecution> getCurrentExecutions() {
+        return currentExecutions;
+    }
+
     @Override
     public ExecutionState getExecutionState() {
         return currentExecution.getState();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedSpeculativeExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedSpeculativeExecutionVertex.java
deleted file mode 100644
index 263049414c5..00000000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedSpeculativeExecutionVertex.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-/**
- * {@link ArchivedSpeculativeExecutionVertex} is a readonly representation of {@link
- * SpeculativeExecutionVertex}.
- */
-public class ArchivedSpeculativeExecutionVertex extends ArchivedExecutionVertex {
-
-    private static final long serialVersionUID = 1L;
-
-    public ArchivedSpeculativeExecutionVertex(SpeculativeExecutionVertex vertex) {
-        super(
-                vertex.getParallelSubtaskIndex(),
-                vertex.getTaskNameWithSubtaskIndex(),
-                vertex.getCurrentExecutionAttempt().archive(),
-                getCopyOfExecutionHistory(vertex));
-    }
-
-    private static ExecutionHistory getCopyOfExecutionHistory(SpeculativeExecutionVertex vertex) {
-        final ExecutionHistory executionHistory =
-                ArchivedExecutionVertex.getCopyOfExecutionHistory(vertex);
-
-        // add all the executions to the execution history except for the only admitted current
-        // execution
-        final Execution currentAttempt = vertex.getCurrentExecutionAttempt();
-        for (Execution execution : vertex.getCurrentExecutions()) {
-            if (execution.getAttemptNumber() != currentAttempt.getAttemptNumber()) {
-                executionHistory.add(execution.archive());
-            }
-        }
-
-        return executionHistory;
-    }
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
index 41ef1c0ac2f..71e28779637 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
@@ -285,8 +285,8 @@ public class SpeculativeExecutionVertex extends ExecutionVertex {
     }
 
     @Override
-    public ArchivedSpeculativeExecutionVertex archive() {
-        return new ArchivedSpeculativeExecutionVertex(this);
+    public ArchivedExecutionVertex archive() {
+        return new ArchivedExecutionVertex(this);
     }
 
     @Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTestUtils.java
index 55a10cd5396..fc269901b34 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTestUtils.java
@@ -24,6 +24,10 @@ import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.SerializedValue;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
@@ -68,6 +72,23 @@ class ArchivedExecutionGraphTestUtils {
         compareExecution(
                 runtimeVertex.getCurrentExecutionAttempt(),
                 archivedVertex.getCurrentExecutionAttempt());
+
+        compareExecutions(
+                runtimeVertex.getCurrentExecutions(), archivedVertex.getCurrentExecutions());
+    }
+
+    private static <RT extends AccessExecution, AT extends AccessExecution> void compareExecutions(
+            Collection<RT> runtimeExecutions, Collection<AT> archivedExecutions) {
+        assertThat(runtimeExecutions).hasSameSizeAs(archivedExecutions);
+
+        List<RT> sortedRuntimeExecutions = new ArrayList<>(runtimeExecutions);
+        List<AT> sortedArchivedExecutions = new ArrayList<>(archivedExecutions);
+        sortedRuntimeExecutions.sort(Comparator.comparingInt(AccessExecution::getAttemptNumber));
+        sortedArchivedExecutions.sort(Comparator.comparingInt(AccessExecution::getAttemptNumber));
+
+        for (int i = 0; i < runtimeExecutions.size(); i++) {
+            compareExecution(sortedRuntimeExecutions.get(i), sortedArchivedExecutions.get(i));
+        }
     }
 
     private static void compareExecution(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertexWithSpeculativeExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertexWithSpeculativeExecutionTest.java
new file mode 100644
index 00000000000..c9e83882b1c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertexWithSpeculativeExecutionTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.TestingInternalFailuresListener;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for the {@link ArchivedExecutionVertex} created from a {@link SpeculativeExecutionVertex}.
+ */
+class ArchivedExecutionVertexWithSpeculativeExecutionTest {
+
+    @RegisterExtension
+    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
+
+    private TestingInternalFailuresListener internalFailuresListener;
+
+    @BeforeEach
+    void setUp() {
+        internalFailuresListener = new TestingInternalFailuresListener();
+    }
+
+    @Test
+    void testCreateSpeculativeExecution() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testResetExecutionVertex() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        e1.transitionState(ExecutionState.RUNNING);
+        e1.markFinished();
+        e2.cancel();
+        ev.resetForNewExecution();
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testCancel() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        ev.createNewSpeculativeExecution(System.currentTimeMillis());
+        ev.cancel();
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testSuspend() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        ev.createNewSpeculativeExecution(System.currentTimeMillis());
+        ev.suspend();
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testFail() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        ev.createNewSpeculativeExecution(System.currentTimeMillis());
+        ev.fail(new Exception("Forced test failure."));
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testMarkFailed() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        ev.createNewSpeculativeExecution(System.currentTimeMillis());
+        ev.markFailed(new Exception("Forced test failure."));
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testVertexTerminationAndJobTermination() throws Exception {
+        final JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
+        final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex);
+        final ExecutionGraph eg = createExecutionGraph(jobGraph);
+        eg.transitionToRunning();
+
+        ExecutionJobVertex jv = eg.getJobVertex(jobVertex.getID());
+        assertThat(jv).isNotNull();
+        final SpeculativeExecutionVertex ev = (SpeculativeExecutionVertex) jv.getTaskVertices()[0];
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        e1.transitionState(ExecutionState.RUNNING);
+        e1.markFinished();
+        e2.cancel();
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testArchiveFailedExecutions() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        e1.transitionState(ExecutionState.RUNNING);
+
+        final Execution e2 = ev.createNewSpeculativeExecution(0);
+        e2.transitionState(ExecutionState.FAILED);
+        ev.archiveFailedExecution(e2.getAttemptId());
+
+        final Execution e3 = ev.createNewSpeculativeExecution(0);
+        e3.transitionState(ExecutionState.RUNNING);
+        e1.transitionState(ExecutionState.FAILED);
+        ev.archiveFailedExecution(e1.getAttemptId());
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testArchiveTheOnlyCurrentExecution() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        e1.transitionState(ExecutionState.FAILED);
+
+        ev.archiveFailedExecution(e1.getAttemptId());
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testGetExecutionState() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        e1.transitionState(ExecutionState.CANCELED);
+
+        // the latter added state is more likely to reach FINISH state
+        final List<ExecutionState> statesSortedByPriority = new ArrayList<>();
+        statesSortedByPriority.add(ExecutionState.FAILED);
+        statesSortedByPriority.add(ExecutionState.CANCELING);
+        statesSortedByPriority.add(ExecutionState.CREATED);
+        statesSortedByPriority.add(ExecutionState.SCHEDULED);
+        statesSortedByPriority.add(ExecutionState.DEPLOYING);
+        statesSortedByPriority.add(ExecutionState.INITIALIZING);
+        statesSortedByPriority.add(ExecutionState.RUNNING);
+        statesSortedByPriority.add(ExecutionState.FINISHED);
+
+        for (ExecutionState state : statesSortedByPriority) {
+            final Execution execution = ev.createNewSpeculativeExecution(0);
+            execution.transitionState(state);
+
+            // Check the AchievedExecutionVertex in each state.
+            ArchivedExecutionVertex aev = ev.archive();
+            ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+        }
+    }
+
+    private SpeculativeExecutionVertex createSpeculativeExecutionVertex() throws Exception {
+        final JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
+        final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex);
+        final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+        ExecutionJobVertex jv = executionGraph.getJobVertex(jobVertex.getID());
+        assertThat(jv).isNotNull();
+        return (SpeculativeExecutionVertex) jv.getTaskVertices()[0];
+    }
+
+    private ExecutionGraph createExecutionGraph(final JobGraph jobGraph) throws Exception {
+        final ExecutionGraph executionGraph =
+                TestingDefaultExecutionGraphBuilder.newBuilder()
+                        .setJobGraph(jobGraph)
+                        .setExecutionJobVertexFactory(new SpeculativeExecutionJobVertex.Factory())
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        executionGraph.setInternalTaskFailuresListener(internalFailuresListener);
+        executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+
+        return executionGraph;
+    }
+}


[flink] 06/06: [FLINK-28588][rest] Acquire information of all current executions in REST handlers if applicable

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f436b20429b55ada2a9e8936a5e80fc672a397de
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Sun Jul 31 22:11:46 2022 +0800

    [FLINK-28588][rest] Acquire information of all current executions in REST handlers if applicable
    
    This closes #20296.
---
 .../src/test/resources/rest_api_v1.snapshot        |  31 +++
 .../handler/job/AbstractSubtaskAttemptHandler.java |  15 +-
 .../rest/handler/job/JobDetailsHandler.java        |   2 +
 .../rest/handler/job/JobExceptionsHandler.java     |  33 ++--
 .../handler/job/JobVertexBackPressureHandler.java  |  87 +++++++--
 .../rest/handler/job/JobVertexDetailsHandler.java  |  17 +-
 .../handler/job/JobVertexTaskManagersHandler.java  |  73 +++++---
 .../job/SubtaskCurrentAttemptDetailsHandler.java   |  19 +-
 ...SubtaskExecutionAttemptAccumulatorsHandler.java |  38 ++--
 .../job/SubtaskExecutionAttemptDetailsHandler.java |  52 +++---
 .../job/SubtasksAllAccumulatorsHandler.java        |  32 ++--
 .../rest/handler/job/SubtasksTimesHandler.java     |   3 +-
 .../rest/messages/JobVertexBackPressureInfo.java   |  45 ++++-
 .../job/SubtaskExecutionAttemptDetailsInfo.java    | 105 +++++++----
 .../threadinfo/JobVertexThreadInfoTracker.java     |  24 +--
 .../job/JobVertexBackPressureHandlerTest.java      | 207 +++++++++++++++++++++
 .../SubtaskCurrentAttemptDetailsHandlerTest.java   |   3 +-
 .../SubtaskExecutionAttemptDetailsHandlerTest.java |   3 +-
 .../messages/AggregatedTaskDetailsInfoTest.java    |   3 +-
 .../messages/JobVertexBackPressureInfoTest.java    |  24 ++-
 .../rest/messages/JobVertexDetailsInfoTest.java    |  24 ++-
 .../SubtaskExecutionAttemptDetailsInfoTest.java    |   3 +-
 22 files changed, 651 insertions(+), 192 deletions(-)

diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 873e5062d7d..85337fa8795 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -2403,6 +2403,13 @@
                   "type" : "integer"
                 }
               },
+              "other-concurrent-attempts" : {
+                "type" : "array",
+                "items" : {
+                  "type" : "object",
+                  "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtaskExecutionAttemptDetailsInfo"
+                }
+              },
               "start_time" : {
                 "type" : "integer"
               }
@@ -2522,6 +2529,9 @@
               "subtask" : {
                 "type" : "integer"
               },
+              "attempt-number" : {
+                "type" : "integer"
+              },
               "backpressure-level" : {
                 "type" : "string",
                 "enum" : [ "ok", "low", "high" ]
@@ -2534,6 +2544,13 @@
               },
               "busyRatio" : {
                 "type" : "number"
+              },
+              "other-concurrent-attempts" : {
+                "type" : "array",
+                "items" : {
+                  "type" : "object",
+                  "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobVertexBackPressureInfo:SubtaskBackPressureInfo"
+                }
               }
             }
           }
@@ -2803,6 +2820,13 @@
             "type" : "integer"
           }
         },
+        "other-concurrent-attempts" : {
+          "type" : "array",
+          "items" : {
+            "type" : "object",
+            "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtaskExecutionAttemptDetailsInfo"
+          }
+        },
         "start_time" : {
           "type" : "integer"
         }
@@ -2904,6 +2928,13 @@
             "type" : "integer"
           }
         },
+        "other-concurrent-attempts" : {
+          "type" : "array",
+          "items" : {
+            "type" : "object",
+            "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtaskExecutionAttemptDetailsInfo"
+          }
+        },
         "start_time" : {
           "type" : "integer"
         }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractSubtaskAttemptHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractSubtaskAttemptHandler.java
index 15d128e43e2..04b066860ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractSubtaskAttemptHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractSubtaskAttemptHandler.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
+import java.util.Collection;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.Executor;
@@ -88,11 +89,17 @@ public abstract class AbstractSubtaskAttemptHandler<
             throws RestHandlerException {
         final Integer attemptNumber = request.getPathParameter(SubtaskAttemptPathParameter.class);
 
-        final AccessExecution currentAttempt = executionVertex.getCurrentExecutionAttempt();
+        final Collection<AccessExecution> currentExecutions =
+                executionVertex.getCurrentExecutions();
         final ExecutionHistory executionHistory = executionVertex.getExecutionHistory();
-        if (attemptNumber == currentAttempt.getAttemptNumber()) {
-            return handleRequest(request, currentAttempt);
-        } else if (executionHistory.isValidAttemptNumber(attemptNumber)) {
+
+        for (AccessExecution currentExecution : currentExecutions) {
+            if (attemptNumber == currentExecution.getAttemptNumber()) {
+                return handleRequest(request, currentExecution);
+            }
+        }
+
+        if (executionHistory.isValidAttemptNumber(attemptNumber)) {
             final Optional<? extends AccessExecution> execution =
                     executionHistory.getHistoricalExecution(attemptNumber);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
index 4f850bc0712..f38a7688ab9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
@@ -202,6 +202,8 @@ public class JobDetailsHandler
         MutableIOMetrics counts = new MutableIOMetrics();
 
         for (AccessExecutionVertex vertex : ejv.getTaskVertices()) {
+            // Here we use the metrics of one of the current attempts to represent the subtask,
+            // rather than the aggregation of all attempts.
             counts.addIOMetrics(
                     vertex.getCurrentExecutionAttempt(),
                     metricFetcher,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
index c500792c6aa..4ae57f6bc28 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.handler.job;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
@@ -127,22 +128,24 @@ public class JobExceptionsHandler
         List<JobExceptionsInfo.ExecutionExceptionInfo> taskExceptionList = new ArrayList<>();
         boolean truncated = false;
         for (AccessExecutionVertex task : executionGraph.getAllExecutionVertices()) {
-            Optional<ErrorInfo> failure = task.getFailureInfo();
-            if (failure.isPresent()) {
-                if (taskExceptionList.size() >= exceptionToReportMaxSize) {
-                    truncated = true;
-                    break;
+            for (AccessExecution execution : task.getCurrentExecutions()) {
+                Optional<ErrorInfo> failure = execution.getFailureInfo();
+                if (failure.isPresent()) {
+                    if (taskExceptionList.size() >= exceptionToReportMaxSize) {
+                        truncated = true;
+                        break;
+                    }
+
+                    TaskManagerLocation location = execution.getAssignedResourceLocation();
+                    String locationString = toString(location);
+                    long timestamp = execution.getStateTimestamp(ExecutionState.FAILED);
+                    taskExceptionList.add(
+                            new JobExceptionsInfo.ExecutionExceptionInfo(
+                                    failure.get().getExceptionAsString(),
+                                    task.getTaskNameWithSubtaskIndex(),
+                                    locationString,
+                                    timestamp == 0 ? -1 : timestamp));
                 }
-
-                TaskManagerLocation location = task.getCurrentAssignedResourceLocation();
-                String locationString = toString(location);
-                long timestamp = task.getStateTimestamp(ExecutionState.FAILED);
-                taskExceptionList.add(
-                        new JobExceptionsInfo.ExecutionExceptionInfo(
-                                failure.get().getExceptionAsString(),
-                                task.getTaskNameWithSubtaskIndex(),
-                                locationString,
-                                timestamp == 0 ? -1 : timestamp));
             }
         }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java
index 895fe7011ba..898b74ba0d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.ComponentMetricStore;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.SubtaskMetricStore;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.TaskMetricStore;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
@@ -39,6 +40,7 @@ import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -80,18 +82,23 @@ public class JobVertexBackPressureHandler
                 metricFetcher
                         .getMetricStore()
                         .getTaskMetricStore(jobId.toString(), jobVertexId.toString());
+        Map<String, Map<Integer, Integer>> jobCurrentExecutions =
+                metricFetcher.getMetricStore().getCurrentExecutionAttempts().get(jobId.toString());
+        Map<Integer, Integer> currentExecutionAttempts =
+                jobCurrentExecutions != null
+                        ? jobCurrentExecutions.get(jobVertexId.toString())
+                        : null;
 
         return CompletableFuture.completedFuture(
                 taskMetricStore != null
-                        ? createJobVertexBackPressureInfo(
-                                taskMetricStore.getAllSubtaskMetricStores())
+                        ? createJobVertexBackPressureInfo(taskMetricStore, currentExecutionAttempts)
                         : JobVertexBackPressureInfo.deprecated());
     }
 
     private JobVertexBackPressureInfo createJobVertexBackPressureInfo(
-            Map<Integer, ComponentMetricStore> allSubtaskMetricStores) {
+            TaskMetricStore taskMetricStore, Map<Integer, Integer> currentExecutionAttempts) {
         List<SubtaskBackPressureInfo> subtaskBackPressureInfos =
-                createSubtaskBackPressureInfo(allSubtaskMetricStores);
+                createSubtaskBackPressureInfo(taskMetricStore, currentExecutionAttempts);
         return new JobVertexBackPressureInfo(
                 JobVertexBackPressureInfo.VertexBackPressureStatus.OK,
                 getBackPressureLevel(getMaxBackPressureRatio(subtaskBackPressureInfos)),
@@ -100,26 +107,72 @@ public class JobVertexBackPressureHandler
     }
 
     private List<SubtaskBackPressureInfo> createSubtaskBackPressureInfo(
-            Map<Integer, ComponentMetricStore> subtaskMetricStores) {
+            TaskMetricStore taskMetricStore, Map<Integer, Integer> currentExecutionAttempts) {
+        Map<Integer, SubtaskMetricStore> subtaskMetricStores =
+                taskMetricStore.getAllSubtaskMetricStores();
         List<SubtaskBackPressureInfo> result = new ArrayList<>(subtaskMetricStores.size());
-        for (Map.Entry<Integer, ComponentMetricStore> entry : subtaskMetricStores.entrySet()) {
+        for (Map.Entry<Integer, SubtaskMetricStore> entry : subtaskMetricStores.entrySet()) {
             int subtaskIndex = entry.getKey();
-            ComponentMetricStore subtaskMetricStore = entry.getValue();
-            double backPressureRatio = getBackPressureRatio(subtaskMetricStore);
-            double idleRatio = getIdleRatio(subtaskMetricStore);
-            double busyRatio = getBusyRatio(subtaskMetricStore);
-            result.add(
-                    new SubtaskBackPressureInfo(
-                            subtaskIndex,
-                            getBackPressureLevel(backPressureRatio),
-                            backPressureRatio,
-                            idleRatio,
-                            busyRatio));
+            SubtaskMetricStore subtaskMetricStore = entry.getValue();
+            Map<Integer, ComponentMetricStore> allAttemptsMetricStores =
+                    subtaskMetricStore.getAllAttemptsMetricStores();
+            if (allAttemptsMetricStores.isEmpty() || allAttemptsMetricStores.size() == 1) {
+                result.add(
+                        createSubtaskAttemptBackpressureInfo(
+                                subtaskIndex, null, subtaskMetricStore, null));
+            } else {
+                int currentAttempt =
+                        currentExecutionAttempts == null
+                                ? -1
+                                : currentExecutionAttempts.getOrDefault(subtaskIndex, -1);
+                if (!allAttemptsMetricStores.containsKey(currentAttempt)) {
+                    // allAttemptsMetricStores is not empty here
+                    currentAttempt = allAttemptsMetricStores.keySet().iterator().next();
+                }
+                List<SubtaskBackPressureInfo> otherConcurrentAttempts =
+                        new ArrayList<>(allAttemptsMetricStores.size() - 1);
+                for (Map.Entry<Integer, ComponentMetricStore> attemptStore :
+                        allAttemptsMetricStores.entrySet()) {
+                    if (attemptStore.getKey() == currentAttempt) {
+                        continue;
+                    }
+                    otherConcurrentAttempts.add(
+                            createSubtaskAttemptBackpressureInfo(
+                                    subtaskIndex,
+                                    attemptStore.getKey(),
+                                    attemptStore.getValue(),
+                                    null));
+                }
+                result.add(
+                        createSubtaskAttemptBackpressureInfo(
+                                subtaskIndex,
+                                currentAttempt,
+                                allAttemptsMetricStores.get(currentAttempt),
+                                otherConcurrentAttempts));
+            }
         }
         result.sort(Comparator.comparingInt(SubtaskBackPressureInfo::getSubtask));
         return result;
     }
 
+    private SubtaskBackPressureInfo createSubtaskAttemptBackpressureInfo(
+            int subtaskIndex,
+            @Nullable Integer attemptNumber,
+            ComponentMetricStore metricStore,
+            @Nullable List<SubtaskBackPressureInfo> otherConcurrentAttempts) {
+        double backPressureRatio = getBackPressureRatio(metricStore);
+        double idleRatio = getIdleRatio(metricStore);
+        double busyRatio = getBusyRatio(metricStore);
+        return new SubtaskBackPressureInfo(
+                subtaskIndex,
+                attemptNumber,
+                getBackPressureLevel(backPressureRatio),
+                backPressureRatio,
+                idleRatio,
+                busyRatio,
+                otherConcurrentAttempts);
+    }
+
     private double getMaxBackPressureRatio(List<SubtaskBackPressureInfo> subtaskBackPressureInfos) {
         return subtaskBackPressureInfos.stream()
                 .mapToDouble(backPressureInfo -> backPressureInfo.getBackPressuredRatio())
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
index e4ac7708489..5877567db77 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
@@ -120,9 +120,24 @@ public class JobVertexDetailsHandler
         for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
             final AccessExecution execution = vertex.getCurrentExecutionAttempt();
             final JobVertexID jobVertexID = jobVertex.getJobVertexId();
+
+            final Collection<AccessExecution> attempts = vertex.getCurrentExecutions();
+            List<SubtaskExecutionAttemptDetailsInfo> otherConcurrentAttempts = null;
+
+            if (attempts.size() > 1) {
+                otherConcurrentAttempts = new ArrayList<>();
+                for (AccessExecution attempt : attempts) {
+                    if (attempt.getAttemptNumber() != execution.getAttemptNumber()) {
+                        otherConcurrentAttempts.add(
+                                SubtaskExecutionAttemptDetailsInfo.create(
+                                        attempt, metricFetcher, jobID, jobVertexID, null));
+                    }
+                }
+            }
+
             subtasks.add(
                     SubtaskExecutionAttemptDetailsInfo.create(
-                            execution, metricFetcher, jobID, jobVertexID));
+                            execution, metricFetcher, jobID, jobVertexID, otherConcurrentAttempts));
         }
 
         return new JobVertexDetailsInfo(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
index 3feea441d2f..450535f8c7f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.handler.job;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
@@ -55,8 +56,10 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.stream.Collectors;
 
@@ -127,40 +130,48 @@ public class JobVertexTaskManagersHandler
             AccessExecutionJobVertex jobVertex,
             JobID jobID,
             @Nullable MetricFetcher metricFetcher) {
-        // Build a map that groups tasks by TaskManager
+        // Build a map that groups task executions by TaskManager
         Map<String, String> taskManagerId2Host = new HashMap<>();
-        Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>();
+        Map<String, List<AccessExecution>> taskManagerExecutions = new HashMap<>();
+        Set<AccessExecution> representativeExecutions = new HashSet<>();
         for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
-            TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
-            String taskManagerHost =
-                    location == null
-                            ? "(unassigned)"
-                            : location.getHostname() + ':' + location.dataPort();
-            String taskmanagerId =
-                    location == null ? "(unassigned)" : location.getResourceID().toString();
-            taskManagerId2Host.put(taskmanagerId, taskManagerHost);
-            List<AccessExecutionVertex> vertices =
-                    taskManagerVertices.computeIfAbsent(
-                            taskmanagerId, ignored -> new ArrayList<>(4));
-            vertices.add(vertex);
+            AccessExecution representativeAttempt = vertex.getCurrentExecutionAttempt();
+            representativeExecutions.add(representativeAttempt);
+
+            for (AccessExecution execution : vertex.getCurrentExecutions()) {
+                TaskManagerLocation location = execution.getAssignedResourceLocation();
+                String taskManagerHost =
+                        location == null
+                                ? "(unassigned)"
+                                : location.getHostname() + ':' + location.dataPort();
+                String taskmanagerId =
+                        location == null ? "(unassigned)" : location.getResourceID().toString();
+                taskManagerId2Host.put(taskmanagerId, taskManagerHost);
+                List<AccessExecution> executions =
+                        taskManagerExecutions.computeIfAbsent(
+                                taskmanagerId, ignored -> new ArrayList<>());
+                executions.add(execution);
+            }
         }
 
         final long now = System.currentTimeMillis();
 
         List<JobVertexTaskManagersInfo.TaskManagersInfo> taskManagersInfoList = new ArrayList<>(4);
-        for (Map.Entry<String, List<AccessExecutionVertex>> entry :
-                taskManagerVertices.entrySet()) {
+        for (Map.Entry<String, List<AccessExecution>> entry : taskManagerExecutions.entrySet()) {
             String taskmanagerId = entry.getKey();
             String host = taskManagerId2Host.get(taskmanagerId);
-            List<AccessExecutionVertex> taskVertices = entry.getValue();
+            List<AccessExecution> executions = entry.getValue();
 
             List<IOMetricsInfo> ioMetricsInfos = new ArrayList<>();
             List<Map<ExecutionState, Long>> status =
-                    taskVertices.stream()
-                            .map(AccessExecutionVertex::getCurrentExecutionAttempt)
+                    executions.stream()
                             .map(StatusDurationUtils::getExecutionStateDuration)
                             .collect(Collectors.toList());
 
+            // executionsPerState counts attempts of a subtask separately
+            int[] executionsPerState = new int[ExecutionState.values().length];
+            // tasksPerState counts only the representative attempts, and is used to aggregate the
+            // task manager state
             int[] tasksPerState = new int[ExecutionState.values().length];
 
             long startTime = Long.MAX_VALUE;
@@ -169,27 +180,32 @@ public class JobVertexTaskManagersHandler
 
             MutableIOMetrics counts = new MutableIOMetrics();
 
-            for (AccessExecutionVertex vertex : taskVertices) {
-                final ExecutionState state = vertex.getExecutionState();
-                tasksPerState[state.ordinal()]++;
+            int representativeAttemptsCount = 0;
+            for (AccessExecution execution : executions) {
+                final ExecutionState state = execution.getState();
+                executionsPerState[state.ordinal()]++;
+                if (representativeExecutions.contains(execution)) {
+                    tasksPerState[state.ordinal()]++;
+                    representativeAttemptsCount++;
+                }
 
                 // take the earliest start time
-                long started = vertex.getStateTimestamp(ExecutionState.DEPLOYING);
+                long started = execution.getStateTimestamp(ExecutionState.DEPLOYING);
                 if (started > 0) {
                     startTime = Math.min(startTime, started);
                 }
 
                 allFinished &= state.isTerminal();
-                endTime = Math.max(endTime, vertex.getStateTimestamp(state));
+                endTime = Math.max(endTime, execution.getStateTimestamp(state));
 
                 counts.addIOMetrics(
-                        vertex.getCurrentExecutionAttempt(),
+                        execution,
                         metricFetcher,
                         jobID.toString(),
                         jobVertex.getJobVertexId().toString());
                 MutableIOMetrics current = new MutableIOMetrics();
                 current.addIOMetrics(
-                        vertex.getCurrentExecutionAttempt(),
+                        execution,
                         metricFetcher,
                         jobID.toString(),
                         jobVertex.getJobVertexId().toString());
@@ -222,9 +238,10 @@ public class JobVertexTaskManagersHandler
                 duration = -1L;
             }
 
+            // Safe when tasksPerState are all zero and representativeAttemptsCount is zero
             ExecutionState jobVertexState =
                     ExecutionJobVertex.getAggregateJobVertexState(
-                            tasksPerState, taskVertices.size());
+                            tasksPerState, representativeAttemptsCount);
 
             final IOMetricsInfo jobVertexMetrics =
                     new IOMetricsInfo(
@@ -243,7 +260,7 @@ public class JobVertexTaskManagersHandler
             Map<ExecutionState, Integer> statusCounts =
                     new HashMap<>(ExecutionState.values().length);
             for (ExecutionState state : ExecutionState.values()) {
-                statusCounts.put(state, tasksPerState[state.ordinal()]);
+                statusCounts.put(state, executionsPerState[state.ordinal()]);
             }
             taskManagersInfoList.add(
                     new JobVertexTaskManagersInfo.TaskManagersInfo(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java
index ba25ac5c24f..0e52d088158 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java
@@ -37,6 +37,9 @@ import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.Preconditions;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
 
@@ -81,7 +84,21 @@ public class SubtaskCurrentAttemptDetailsHandler
         final JobID jobID = request.getPathParameter(JobIDPathParameter.class);
         final JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);
 
+        final Collection<AccessExecution> attempts = executionVertex.getCurrentExecutions();
+        List<SubtaskExecutionAttemptDetailsInfo> otherConcurrentAttempts = null;
+
+        if (attempts.size() > 1) {
+            otherConcurrentAttempts = new ArrayList<>();
+            for (AccessExecution attempt : attempts) {
+                if (attempt.getAttemptNumber() != execution.getAttemptNumber()) {
+                    otherConcurrentAttempts.add(
+                            SubtaskExecutionAttemptDetailsInfo.create(
+                                    attempt, metricFetcher, jobID, jobVertexID, null));
+                }
+            }
+        }
+
         return SubtaskExecutionAttemptDetailsInfo.create(
-                execution, metricFetcher, jobID, jobVertexID);
+                execution, metricFetcher, jobID, jobVertexID, otherConcurrentAttempts);
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
index 7093308bf3b..a188a19cb5f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -99,25 +99,25 @@ public class SubtaskExecutionAttemptAccumulatorsHandler
         List<ArchivedJson> archive = new ArrayList<>(16);
         for (AccessExecutionJobVertex task : graph.getAllVertices().values()) {
             for (AccessExecutionVertex subtask : task.getTaskVertices()) {
-                ResponseBody curAttemptJson =
-                        createAccumulatorInfo(subtask.getCurrentExecutionAttempt());
-                String curAttemptPath =
-                        getMessageHeaders()
-                                .getTargetRestEndpointURL()
-                                .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString())
-                                .replace(
-                                        ':' + JobVertexIdPathParameter.KEY,
-                                        task.getJobVertexId().toString())
-                                .replace(
-                                        ':' + SubtaskIndexPathParameter.KEY,
-                                        String.valueOf(subtask.getParallelSubtaskIndex()))
-                                .replace(
-                                        ':' + SubtaskAttemptPathParameter.KEY,
-                                        String.valueOf(
-                                                subtask.getCurrentExecutionAttempt()
-                                                        .getAttemptNumber()));
-
-                archive.add(new ArchivedJson(curAttemptPath, curAttemptJson));
+                for (AccessExecution attempt : subtask.getCurrentExecutions()) {
+                    ResponseBody curAttemptJson = createAccumulatorInfo(attempt);
+                    String curAttemptPath =
+                            getMessageHeaders()
+                                    .getTargetRestEndpointURL()
+                                    .replace(
+                                            ':' + JobIDPathParameter.KEY,
+                                            graph.getJobID().toString())
+                                    .replace(
+                                            ':' + JobVertexIdPathParameter.KEY,
+                                            task.getJobVertexId().toString())
+                                    .replace(
+                                            ':' + SubtaskIndexPathParameter.KEY,
+                                            String.valueOf(subtask.getParallelSubtaskIndex()))
+                                    .replace(
+                                            ':' + SubtaskAttemptPathParameter.KEY,
+                                            String.valueOf(attempt.getAttemptNumber()));
+                    archive.add(new ArchivedJson(curAttemptPath, curAttemptJson));
+                }
 
                 for (AccessExecution attempt :
                         subtask.getExecutionHistory().getHistoricalExecutions()) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
index 398a10c57b4..d4eaa00906e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
@@ -102,7 +102,7 @@ public class SubtaskExecutionAttemptDetailsHandler
         final JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);
 
         return SubtaskExecutionAttemptDetailsInfo.create(
-                execution, metricFetcher, jobID, jobVertexID);
+                execution, metricFetcher, jobID, jobVertexID, null);
     }
 
     @Override
@@ -111,36 +111,38 @@ public class SubtaskExecutionAttemptDetailsHandler
         List<ArchivedJson> archive = new ArrayList<>(16);
         for (AccessExecutionJobVertex task : graph.getAllVertices().values()) {
             for (AccessExecutionVertex subtask : task.getTaskVertices()) {
-                ResponseBody curAttemptJson =
-                        SubtaskExecutionAttemptDetailsInfo.create(
-                                subtask.getCurrentExecutionAttempt(),
-                                null,
-                                graph.getJobID(),
-                                task.getJobVertexId());
-                String curAttemptPath =
-                        getMessageHeaders()
-                                .getTargetRestEndpointURL()
-                                .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString())
-                                .replace(
-                                        ':' + JobVertexIdPathParameter.KEY,
-                                        task.getJobVertexId().toString())
-                                .replace(
-                                        ':' + SubtaskIndexPathParameter.KEY,
-                                        String.valueOf(subtask.getParallelSubtaskIndex()))
-                                .replace(
-                                        ':' + SubtaskAttemptPathParameter.KEY,
-                                        String.valueOf(
-                                                subtask.getCurrentExecutionAttempt()
-                                                        .getAttemptNumber()));
-
-                archive.add(new ArchivedJson(curAttemptPath, curAttemptJson));
+                for (AccessExecution attempt : subtask.getCurrentExecutions()) {
+                    ResponseBody curAttemptJson =
+                            SubtaskExecutionAttemptDetailsInfo.create(
+                                    attempt, null, graph.getJobID(), task.getJobVertexId(), null);
+                    String curAttemptPath =
+                            getMessageHeaders()
+                                    .getTargetRestEndpointURL()
+                                    .replace(
+                                            ':' + JobIDPathParameter.KEY,
+                                            graph.getJobID().toString())
+                                    .replace(
+                                            ':' + JobVertexIdPathParameter.KEY,
+                                            task.getJobVertexId().toString())
+                                    .replace(
+                                            ':' + SubtaskIndexPathParameter.KEY,
+                                            String.valueOf(subtask.getParallelSubtaskIndex()))
+                                    .replace(
+                                            ':' + SubtaskAttemptPathParameter.KEY,
+                                            String.valueOf(attempt.getAttemptNumber()));
 
+                    archive.add(new ArchivedJson(curAttemptPath, curAttemptJson));
+                }
                 for (AccessExecution attempt :
                         subtask.getExecutionHistory().getHistoricalExecutions()) {
                     if (attempt != null) {
                         ResponseBody json =
                                 SubtaskExecutionAttemptDetailsInfo.create(
-                                        attempt, null, graph.getJobID(), task.getJobVertexId());
+                                        attempt,
+                                        null,
+                                        graph.getJobID(),
+                                        task.getJobVertexId(),
+                                        null);
                         String path =
                                 getMessageHeaders()
                                         .getTargetRestEndpointURL()
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java
index 88c3cc851ad..a47ad6a46be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.handler.job;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -75,23 +76,24 @@ public class SubtasksAllAccumulatorsHandler
                 new ArrayList<>();
 
         for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
-            TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
-            String locationString = location == null ? "(unassigned)" : location.getHostname();
+            for (AccessExecution execution : vertex.getCurrentExecutions()) {
+                TaskManagerLocation location = execution.getAssignedResourceLocation();
+                String locationString = location == null ? "(unassigned)" : location.getHostname();
 
-            StringifiedAccumulatorResult[] accs =
-                    vertex.getCurrentExecutionAttempt().getUserAccumulatorsStringified();
-            List<UserAccumulator> userAccumulators = new ArrayList<>(accs.length);
-            for (StringifiedAccumulatorResult acc : accs) {
-                userAccumulators.add(
-                        new UserAccumulator(acc.getName(), acc.getType(), acc.getValue()));
-            }
+                StringifiedAccumulatorResult[] accs = execution.getUserAccumulatorsStringified();
+                List<UserAccumulator> userAccumulators = new ArrayList<>(accs.length);
+                for (StringifiedAccumulatorResult acc : accs) {
+                    userAccumulators.add(
+                            new UserAccumulator(acc.getName(), acc.getType(), acc.getValue()));
+                }
 
-            subtaskAccumulatorsInfos.add(
-                    new SubtasksAllAccumulatorsInfo.SubtaskAccumulatorsInfo(
-                            vertex.getCurrentExecutionAttempt().getParallelSubtaskIndex(),
-                            vertex.getCurrentExecutionAttempt().getAttemptNumber(),
-                            locationString,
-                            userAccumulators));
+                subtaskAccumulatorsInfos.add(
+                        new SubtasksAllAccumulatorsInfo.SubtaskAccumulatorsInfo(
+                                execution.getParallelSubtaskIndex(),
+                                execution.getAttemptNumber(),
+                                locationString,
+                                userAccumulators));
+            }
         }
 
         return new SubtasksAllAccumulatorsInfo(jobVertexId, parallelism, subtaskAccumulatorsInfos);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
index 32f04899810..ac9c8322c9c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
@@ -101,7 +101,8 @@ public class SubtasksTimesHandler
 
         int num = 0;
         for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
-
+            // Use one of the current execution attempts to represent the subtask, rather than
+            // adding times info of all attempts.
             long[] timestamps = vertex.getCurrentExecutionAttempt().getStateTimestamps();
             ExecutionState status = vertex.getExecutionState();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
index 5415c297cbf..c475b6d7cd5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
 
@@ -121,14 +122,22 @@ public class JobVertexBackPressureInfo implements ResponseBody {
     public static final class SubtaskBackPressureInfo {
 
         public static final String FIELD_NAME_SUBTASK = "subtask";
+        public static final String FIELD_NAME_ATTEMPT_NUMBER = "attempt-number";
         public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level";
         public static final String FIELD_NAME_BACK_PRESSURED_RATIO = "ratio";
         public static final String FIELD_NAME_IDLE_RATIO = "idleRatio";
         public static final String FIELD_NAME_BUSY_RATIO = "busyRatio";
+        public static final String FIELD_NAME_OTHER_CONCURRENT_ATTEMPTS =
+                "other-concurrent-attempts";
 
         @JsonProperty(FIELD_NAME_SUBTASK)
         private final int subtask;
 
+        @JsonProperty(FIELD_NAME_ATTEMPT_NUMBER)
+        @JsonInclude(Include.NON_NULL)
+        @Nullable
+        private final Integer attemptNumber;
+
         @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
         private final VertexBackPressureLevel backpressureLevel;
 
@@ -141,18 +150,30 @@ public class JobVertexBackPressureInfo implements ResponseBody {
         @JsonProperty(FIELD_NAME_BUSY_RATIO)
         private final double busyRatio;
 
+        @JsonProperty(FIELD_NAME_OTHER_CONCURRENT_ATTEMPTS)
+        @JsonInclude(Include.NON_EMPTY)
+        @Nullable
+        private final List<SubtaskBackPressureInfo> otherConcurrentAttempts;
+
+        // otherConcurrentAttempts and attemptNumber are Nullable since Jackson will assign null if
+        // the fields are absent while parsing
         public SubtaskBackPressureInfo(
                 @JsonProperty(FIELD_NAME_SUBTASK) int subtask,
+                @JsonProperty(FIELD_NAME_ATTEMPT_NUMBER) @Nullable Integer attemptNumber,
                 @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
                         VertexBackPressureLevel backpressureLevel,
                 @JsonProperty(FIELD_NAME_BACK_PRESSURED_RATIO) double backPressuredRatio,
                 @JsonProperty(FIELD_NAME_IDLE_RATIO) double idleRatio,
-                @JsonProperty(FIELD_NAME_BUSY_RATIO) double busyRatio) {
+                @JsonProperty(FIELD_NAME_BUSY_RATIO) double busyRatio,
+                @JsonProperty(FIELD_NAME_OTHER_CONCURRENT_ATTEMPTS) @Nullable
+                        List<SubtaskBackPressureInfo> otherConcurrentAttempts) {
             this.subtask = subtask;
+            this.attemptNumber = attemptNumber;
             this.backpressureLevel = checkNotNull(backpressureLevel);
             this.backPressuredRatio = backPressuredRatio;
             this.idleRatio = idleRatio;
             this.busyRatio = busyRatio;
+            this.otherConcurrentAttempts = otherConcurrentAttempts;
         }
 
         @Override
@@ -165,16 +186,24 @@ public class JobVertexBackPressureInfo implements ResponseBody {
             }
             SubtaskBackPressureInfo that = (SubtaskBackPressureInfo) o;
             return subtask == that.subtask
+                    && Objects.equals(attemptNumber, that.attemptNumber)
                     && backPressuredRatio == that.backPressuredRatio
                     && idleRatio == that.idleRatio
                     && busyRatio == that.busyRatio
-                    && Objects.equals(backpressureLevel, that.backpressureLevel);
+                    && Objects.equals(backpressureLevel, that.backpressureLevel)
+                    && Objects.equals(otherConcurrentAttempts, that.otherConcurrentAttempts);
         }
 
         @Override
         public int hashCode() {
             return Objects.hash(
-                    subtask, backpressureLevel, backPressuredRatio, idleRatio, busyRatio);
+                    subtask,
+                    attemptNumber,
+                    backpressureLevel,
+                    backPressuredRatio,
+                    idleRatio,
+                    busyRatio,
+                    otherConcurrentAttempts);
         }
 
         public int getSubtask() {
@@ -196,6 +225,16 @@ public class JobVertexBackPressureInfo implements ResponseBody {
         public double getBusyRatio() {
             return busyRatio;
         }
+
+        @Nullable
+        public Integer getAttemptNumber() {
+            return attemptNumber;
+        }
+
+        @Nullable
+        public List<SubtaskBackPressureInfo> getOtherConcurrentAttempts() {
+            return otherConcurrentAttempts;
+        }
     }
 
     /** Status of vertex back-pressure. */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
index 56577fb260b..1a8463ab559 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
@@ -30,12 +30,16 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
 import io.swagger.v3.oas.annotations.Hidden;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
@@ -66,6 +70,8 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
 
     public static final String FIELD_NAME_STATUS_DURATION = "status-duration";
 
+    public static final String FIELD_NAME_OTHER_CONCURRENT_ATTEMPTS = "other-concurrent-attempts";
+
     @JsonProperty(FIELD_NAME_SUBTASK_INDEX)
     private final int subtaskIndex;
 
@@ -100,7 +106,13 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
     @JsonProperty(FIELD_NAME_STATUS_DURATION)
     private final Map<ExecutionState, Long> statusDuration;
 
+    @JsonProperty(FIELD_NAME_OTHER_CONCURRENT_ATTEMPTS)
+    @JsonInclude(Include.NON_EMPTY)
+    @Nullable
+    private final List<SubtaskExecutionAttemptDetailsInfo> otherConcurrentAttempts;
+
     @JsonCreator
+    // blocked is Nullable since Jackson will assign null if the field is absent while parsing
     public SubtaskExecutionAttemptDetailsInfo(
             @JsonProperty(FIELD_NAME_SUBTASK_INDEX) int subtaskIndex,
             @JsonProperty(FIELD_NAME_STATUS) ExecutionState status,
@@ -111,7 +123,9 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
             @JsonProperty(FIELD_NAME_DURATION) long duration,
             @JsonProperty(FIELD_NAME_METRICS) IOMetricsInfo ioMetricsInfo,
             @JsonProperty(FIELD_NAME_TASKMANAGER_ID) String taskmanagerId,
-            @JsonProperty(FIELD_NAME_STATUS_DURATION) Map<ExecutionState, Long> statusDuration) {
+            @JsonProperty(FIELD_NAME_STATUS_DURATION) Map<ExecutionState, Long> statusDuration,
+            @JsonProperty(FIELD_NAME_OTHER_CONCURRENT_ATTEMPTS) @Nullable
+                    List<SubtaskExecutionAttemptDetailsInfo> otherConcurrentAttempts) {
 
         this.subtaskIndex = subtaskIndex;
         this.status = Preconditions.checkNotNull(status);
@@ -124,6 +138,7 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
         this.ioMetricsInfo = Preconditions.checkNotNull(ioMetricsInfo);
         this.taskmanagerId = Preconditions.checkNotNull(taskmanagerId);
         this.statusDuration = Preconditions.checkNotNull(statusDuration);
+        this.otherConcurrentAttempts = otherConcurrentAttempts;
     }
 
     public int getSubtaskIndex() {
@@ -174,51 +189,16 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
         return taskmanagerId;
     }
 
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-
-        SubtaskExecutionAttemptDetailsInfo that = (SubtaskExecutionAttemptDetailsInfo) o;
-
-        return subtaskIndex == that.subtaskIndex
-                && status == that.status
-                && attempt == that.attempt
-                && Objects.equals(host, that.host)
-                && startTime == that.startTime
-                && startTimeCompatible == that.startTimeCompatible
-                && endTime == that.endTime
-                && duration == that.duration
-                && Objects.equals(ioMetricsInfo, that.ioMetricsInfo)
-                && Objects.equals(taskmanagerId, that.taskmanagerId)
-                && Objects.equals(statusDuration, that.statusDuration);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(
-                subtaskIndex,
-                status,
-                attempt,
-                host,
-                startTime,
-                startTimeCompatible,
-                endTime,
-                duration,
-                ioMetricsInfo,
-                taskmanagerId,
-                statusDuration);
+    public List<SubtaskExecutionAttemptDetailsInfo> getOtherConcurrentAttempts() {
+        return otherConcurrentAttempts == null ? new ArrayList<>() : otherConcurrentAttempts;
     }
 
     public static SubtaskExecutionAttemptDetailsInfo create(
             AccessExecution execution,
             @Nullable MetricFetcher metricFetcher,
             JobID jobID,
-            JobVertexID jobVertexID) {
+            JobVertexID jobVertexID,
+            @Nullable List<SubtaskExecutionAttemptDetailsInfo> otherConcurrentAttempts) {
         final ExecutionState status = execution.getState();
         final long now = System.currentTimeMillis();
 
@@ -261,6 +241,49 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
                 duration,
                 ioMetricsInfo,
                 taskmanagerId,
-                getExecutionStateDuration(execution));
+                getExecutionStateDuration(execution),
+                otherConcurrentAttempts);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        SubtaskExecutionAttemptDetailsInfo that = (SubtaskExecutionAttemptDetailsInfo) o;
+
+        return subtaskIndex == that.subtaskIndex
+                && status == that.status
+                && attempt == that.attempt
+                && Objects.equals(host, that.host)
+                && startTime == that.startTime
+                && startTimeCompatible == that.startTimeCompatible
+                && endTime == that.endTime
+                && duration == that.duration
+                && Objects.equals(ioMetricsInfo, that.ioMetricsInfo)
+                && Objects.equals(taskmanagerId, that.taskmanagerId)
+                && Objects.equals(statusDuration, that.statusDuration)
+                && Objects.equals(otherConcurrentAttempts, that.otherConcurrentAttempts);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                subtaskIndex,
+                status,
+                attempt,
+                host,
+                startTime,
+                startTimeCompatible,
+                endTime,
+                duration,
+                ioMetricsInfo,
+                taskmanagerId,
+                statusDuration,
+                otherConcurrentAttempts);
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
index 76ec84509ec..45b469632b1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -257,18 +258,19 @@ public class JobVertexThreadInfoTracker<T extends Statistics> implements JobVert
                         executionVertex.getExecutionState());
                 continue;
             }
-            TaskManagerLocation tmLocation = executionVertex.getCurrentAssignedResourceLocation();
-            if (tmLocation == null) {
-                LOG.trace("ExecutionVertex {} is currently not assigned", executionVertex);
-                continue;
-            }
-            Set<ExecutionAttemptID> groupedAttemptIds =
-                    executionAttemptsByLocation.getOrDefault(tmLocation, new HashSet<>());
+            for (AccessExecution execution : executionVertex.getCurrentExecutions()) {
+                TaskManagerLocation tmLocation = execution.getAssignedResourceLocation();
+                if (tmLocation == null) {
+                    LOG.trace("ExecutionVertex {} is currently not assigned", executionVertex);
+                    continue;
+                }
+                Set<ExecutionAttemptID> groupedAttemptIds =
+                        executionAttemptsByLocation.getOrDefault(tmLocation, new HashSet<>());
 
-            ExecutionAttemptID attemptId =
-                    executionVertex.getCurrentExecutionAttempt().getAttemptId();
-            groupedAttemptIds.add(attemptId);
-            executionAttemptsByLocation.put(tmLocation, ImmutableSet.copyOf(groupedAttemptIds));
+                ExecutionAttemptID attemptId = execution.getAttemptId();
+                groupedAttemptIds.add(attemptId);
+                executionAttemptsByLocation.put(tmLocation, ImmutableSet.copyOf(groupedAttemptIds));
+            }
         }
 
         return executionAttemptsByLocation.entrySet().stream()
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
index 85bf3a44cd5..93714132681 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
@@ -46,6 +46,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
@@ -141,6 +142,63 @@ class JobVertexBackPressureHandlerTest {
                         });
     }
 
+    private static Collection<MetricDump> getMultipleAttemptsMetricDumps() {
+        Collection<MetricDump> dumps = new ArrayList<>();
+        TaskQueryScopeInfo task0 =
+                new TaskQueryScopeInfo(
+                        TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(),
+                        TEST_JOB_VERTEX_ID.toString(),
+                        0,
+                        0);
+        dumps.add(new GaugeDump(task0, MetricNames.TASK_BACK_PRESSURED_TIME, "1000"));
+        dumps.add(new GaugeDump(task0, MetricNames.TASK_IDLE_TIME, "0"));
+        dumps.add(new GaugeDump(task0, MetricNames.TASK_BUSY_TIME, "0"));
+
+        TaskQueryScopeInfo speculativeTask0 =
+                new TaskQueryScopeInfo(
+                        TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(),
+                        TEST_JOB_VERTEX_ID.toString(),
+                        0,
+                        1);
+        dumps.add(new GaugeDump(speculativeTask0, MetricNames.TASK_BACK_PRESSURED_TIME, "200"));
+        dumps.add(new GaugeDump(speculativeTask0, MetricNames.TASK_IDLE_TIME, "100"));
+        dumps.add(new GaugeDump(speculativeTask0, MetricNames.TASK_BUSY_TIME, "800"));
+
+        TaskQueryScopeInfo task1 =
+                new TaskQueryScopeInfo(
+                        TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(),
+                        TEST_JOB_VERTEX_ID.toString(),
+                        1,
+                        0);
+        dumps.add(new GaugeDump(task1, MetricNames.TASK_BACK_PRESSURED_TIME, "500"));
+        dumps.add(new GaugeDump(task1, MetricNames.TASK_IDLE_TIME, "100"));
+        dumps.add(new GaugeDump(task1, MetricNames.TASK_BUSY_TIME, "900"));
+
+        TaskQueryScopeInfo speculativeTask1 =
+                new TaskQueryScopeInfo(
+                        TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(),
+                        TEST_JOB_VERTEX_ID.toString(),
+                        1,
+                        1);
+        dumps.add(new GaugeDump(speculativeTask1, MetricNames.TASK_BACK_PRESSURED_TIME, "900"));
+        dumps.add(new GaugeDump(speculativeTask1, MetricNames.TASK_IDLE_TIME, "0"));
+        dumps.add(new GaugeDump(speculativeTask1, MetricNames.TASK_BUSY_TIME, "100"));
+
+        // missing task2
+
+        TaskQueryScopeInfo task3 =
+                new TaskQueryScopeInfo(
+                        TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(),
+                        TEST_JOB_VERTEX_ID.toString(),
+                        3,
+                        0);
+        dumps.add(new GaugeDump(task3, MetricNames.TASK_BACK_PRESSURED_TIME, "100"));
+        dumps.add(new GaugeDump(task3, MetricNames.TASK_IDLE_TIME, "200"));
+        dumps.add(new GaugeDump(task3, MetricNames.TASK_BUSY_TIME, "700"));
+
+        return dumps;
+    }
+
     @Test
     void testGetBackPressure() throws Exception {
         final Map<String, String> pathParameters = new HashMap<>();
@@ -220,4 +278,153 @@ class JobVertexBackPressureHandlerTest {
         assertThat(jobVertexBackPressureInfo.getStatus())
                 .isEqualTo(VertexBackPressureStatus.DEPRECATED);
     }
+
+    @Test
+    void testGetBackPressureFromMultipleCurrentAttempts() throws Exception {
+        MetricStore multipleAttemptsMetricStore = new MetricStore();
+        for (MetricDump metricDump : getMultipleAttemptsMetricDumps()) {
+            multipleAttemptsMetricStore.add(metricDump);
+        }
+        // Update currentExecutionAttempts directly without JobDetails.
+        Map<Integer, Integer> currentExecutionAttempts = new HashMap<>();
+        currentExecutionAttempts.put(0, 1);
+        currentExecutionAttempts.put(1, 0);
+        multipleAttemptsMetricStore
+                .getCurrentExecutionAttempts()
+                .put(
+                        TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(),
+                        Collections.singletonMap(
+                                TEST_JOB_VERTEX_ID.toString(), currentExecutionAttempts));
+
+        JobVertexBackPressureHandler jobVertexBackPressureHandler =
+                new JobVertexBackPressureHandler(
+                        () -> CompletableFuture.completedFuture(restfulGateway),
+                        Time.seconds(10),
+                        Collections.emptyMap(),
+                        JobVertexBackPressureHeaders.getInstance(),
+                        new MetricFetcher() {
+                            private long updateCount = 0;
+
+                            @Override
+                            public MetricStore getMetricStore() {
+                                return multipleAttemptsMetricStore;
+                            }
+
+                            @Override
+                            public void update() {
+                                updateCount++;
+                            }
+
+                            @Override
+                            public long getLastUpdateTime() {
+                                return updateCount;
+                            }
+                        });
+
+        final Map<String, String> pathParameters = new HashMap<>();
+        pathParameters.put(
+                JobIDPathParameter.KEY, TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString());
+        pathParameters.put(JobVertexIdPathParameter.KEY, TEST_JOB_VERTEX_ID.toString());
+
+        final HandlerRequest<EmptyRequestBody> request =
+                HandlerRequest.resolveParametersAndCreate(
+                        EmptyRequestBody.getInstance(),
+                        new JobVertexMessageParameters(),
+                        pathParameters,
+                        Collections.emptyMap(),
+                        Collections.emptyList());
+
+        final CompletableFuture<JobVertexBackPressureInfo>
+                jobVertexBackPressureInfoCompletableFuture =
+                        jobVertexBackPressureHandler.handleRequest(request, restfulGateway);
+        final JobVertexBackPressureInfo jobVertexBackPressureInfo =
+                jobVertexBackPressureInfoCompletableFuture.get();
+
+        assertThat(jobVertexBackPressureInfo.getStatus()).isEqualTo(VertexBackPressureStatus.OK);
+        assertThat(jobVertexBackPressureInfo.getBackpressureLevel()).isEqualTo(LOW);
+
+        assertThat(
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                .map(SubtaskBackPressureInfo::getAttemptNumber)
+                                .collect(Collectors.toList()))
+                .containsExactly(1, 0, null);
+        assertThat(
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                .map(SubtaskBackPressureInfo::getOtherConcurrentAttempts)
+                                .filter(Objects::nonNull)
+                                .flatMap(Collection::stream)
+                                .map(SubtaskBackPressureInfo::getAttemptNumber)
+                                .collect(Collectors.toList()))
+                .containsExactly(0, 1);
+
+        assertThat(
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                .map(SubtaskBackPressureInfo::getBackPressuredRatio)
+                                .collect(Collectors.toList()))
+                .containsExactly(0.2, 0.5, 0.1);
+        assertThat(
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                .map(SubtaskBackPressureInfo::getOtherConcurrentAttempts)
+                                .filter(Objects::nonNull)
+                                .flatMap(Collection::stream)
+                                .map(SubtaskBackPressureInfo::getBackPressuredRatio)
+                                .collect(Collectors.toList()))
+                .containsExactly(1.0, 0.9);
+
+        assertThat(
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                .map(SubtaskBackPressureInfo::getIdleRatio)
+                                .collect(Collectors.toList()))
+                .containsExactly(0.1, 0.1, 0.2);
+        assertThat(
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                .map(SubtaskBackPressureInfo::getOtherConcurrentAttempts)
+                                .filter(Objects::nonNull)
+                                .flatMap(Collection::stream)
+                                .map(SubtaskBackPressureInfo::getIdleRatio)
+                                .collect(Collectors.toList()))
+                .containsExactly(0.0, 0.0);
+
+        assertThat(
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                .map(SubtaskBackPressureInfo::getBusyRatio)
+                                .collect(Collectors.toList()))
+                .containsExactly(0.8, 0.9, 0.7);
+        assertThat(
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                .map(SubtaskBackPressureInfo::getOtherConcurrentAttempts)
+                                .filter(Objects::nonNull)
+                                .flatMap(Collection::stream)
+                                .map(SubtaskBackPressureInfo::getBusyRatio)
+                                .collect(Collectors.toList()))
+                .containsExactly(0.0, 0.1);
+
+        assertThat(
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                .map(SubtaskBackPressureInfo::getBackpressureLevel)
+                                .collect(Collectors.toList()))
+                .containsExactly(LOW, LOW, OK);
+        assertThat(
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                .map(SubtaskBackPressureInfo::getOtherConcurrentAttempts)
+                                .filter(Objects::nonNull)
+                                .flatMap(Collection::stream)
+                                .map(SubtaskBackPressureInfo::getBackpressureLevel)
+                                .collect(Collectors.toList()))
+                .containsExactly(HIGH, HIGH);
+
+        assertThat(
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                .map(SubtaskBackPressureInfo::getSubtask)
+                                .collect(Collectors.toList()))
+                .containsExactly(0, 1, 3);
+        assertThat(
+                        jobVertexBackPressureInfo.getSubtasks().stream()
+                                .map(SubtaskBackPressureInfo::getOtherConcurrentAttempts)
+                                .filter(Objects::nonNull)
+                                .flatMap(Collection::stream)
+                                .map(SubtaskBackPressureInfo::getSubtask)
+                                .collect(Collectors.toList()))
+                .containsExactly(0, 1);
+    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
index 4c267329a74..70f46c1d302 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -192,7 +192,8 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger {
                         finishedTs - deployingTs,
                         ioMetricsInfo,
                         assignedResourceLocation.getResourceID().getResourceIdString(),
-                        statusDuration);
+                        statusDuration,
+                        null);
 
         assertEquals(expectedDetailsInfo, detailsInfo);
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
index a044f3f87ff..40f2286d722 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -195,7 +195,8 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
                         -1L,
                         ioMetricsInfo,
                         "(unassigned)",
-                        statusDuration);
+                        statusDuration,
+                        null);
 
         assertEquals(expectedDetailsInfo, detailsInfo);
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/AggregatedTaskDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/AggregatedTaskDetailsInfoTest.java
index 3490a05d029..df3e3fef220 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/AggregatedTaskDetailsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/AggregatedTaskDetailsInfoTest.java
@@ -77,7 +77,8 @@ public class AggregatedTaskDetailsInfoTest
                                 Math.abs(random.nextLong()),
                                 ioMetricsInfo,
                                 "taskmanagerId",
-                                statusDuration)));
+                                statusDuration,
+                                null)));
     }
 
     @Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfoTest.java
index 0dc78e48a54..566f668837a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfoTest.java
@@ -34,13 +34,31 @@ public class JobVertexBackPressureInfoTest
         List<JobVertexBackPressureInfo.SubtaskBackPressureInfo> subtaskList = new ArrayList<>();
         subtaskList.add(
                 new JobVertexBackPressureInfo.SubtaskBackPressureInfo(
-                        0, JobVertexBackPressureInfo.VertexBackPressureLevel.LOW, 0.1, 0.5, 0.4));
+                        0,
+                        0,
+                        JobVertexBackPressureInfo.VertexBackPressureLevel.LOW,
+                        0.1,
+                        0.5,
+                        0.4,
+                        null));
         subtaskList.add(
                 new JobVertexBackPressureInfo.SubtaskBackPressureInfo(
-                        1, JobVertexBackPressureInfo.VertexBackPressureLevel.OK, 0.4, 0.3, 0.3));
+                        1,
+                        0,
+                        JobVertexBackPressureInfo.VertexBackPressureLevel.OK,
+                        0.4,
+                        0.3,
+                        0.3,
+                        null));
         subtaskList.add(
                 new JobVertexBackPressureInfo.SubtaskBackPressureInfo(
-                        2, JobVertexBackPressureInfo.VertexBackPressureLevel.HIGH, 0.9, 0.0, 0.1));
+                        2,
+                        0,
+                        JobVertexBackPressureInfo.VertexBackPressureLevel.HIGH,
+                        0.9,
+                        0.0,
+                        0.1,
+                        null));
         return new JobVertexBackPressureInfo(
                 JobVertexBackPressureInfo.VertexBackPressureStatus.OK,
                 JobVertexBackPressureInfo.VertexBackPressureLevel.LOW,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java
index 11682f37ceb..00b12f7c68d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetails
 import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -71,11 +72,12 @@ public class JobVertexDetailsInfoTest
                         1L,
                         jobVertexMetrics,
                         "taskmanagerId1",
-                        statusDuration));
+                        statusDuration,
+                        null));
         vertexTaskDetailList.add(
                 new SubtaskExecutionAttemptDetailsInfo(
                         1,
-                        ExecutionState.FAILED,
+                        ExecutionState.RUNNING,
                         random.nextInt(),
                         "local2",
                         System.currentTimeMillis(),
@@ -83,7 +85,20 @@ public class JobVertexDetailsInfoTest
                         1L,
                         jobVertexMetrics,
                         "taskmanagerId2",
-                        statusDuration));
+                        statusDuration,
+                        Collections.singletonList(
+                                new SubtaskExecutionAttemptDetailsInfo(
+                                        1,
+                                        ExecutionState.FAILED,
+                                        random.nextInt(),
+                                        "local2",
+                                        System.currentTimeMillis(),
+                                        System.currentTimeMillis(),
+                                        1L,
+                                        jobVertexMetrics,
+                                        "taskmanagerId2",
+                                        statusDuration,
+                                        null))));
         vertexTaskDetailList.add(
                 new SubtaskExecutionAttemptDetailsInfo(
                         2,
@@ -95,7 +110,8 @@ public class JobVertexDetailsInfoTest
                         1L,
                         jobVertexMetrics,
                         "taskmanagerId3",
-                        statusDuration));
+                        statusDuration,
+                        null));
 
         int parallelism = 1 + (random.nextInt() / 3);
         return new JobVertexDetailsInfo(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java
index 48158c05b56..c4110811e0c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java
@@ -70,6 +70,7 @@ public class SubtaskExecutionAttemptDetailsInfoTest
                 Math.abs(random.nextLong()),
                 ioMetricsInfo,
                 "taskmanagerId",
-                statusDuration);
+                statusDuration,
+                null);
     }
 }


[flink] 05/06: [FLINK-28588][rest] MetricStore supports to store and query metrics of multiple execution attempts of a subtask.

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1a48fd53bd317ac2102adb00c1209350b57a687e
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Tue Jul 26 16:54:10 2022 +0800

    [FLINK-28588][rest] MetricStore supports to store and query metrics of multiple execution attempts of a subtask.
---
 .../history/HistoryServerArchiveFetcher.java       |   3 +-
 .../runtime/messages/webmonitor/JobDetails.java    |  98 ++++++++++-
 .../metrics/dump/MetricDumpSerialization.java      |  10 +-
 .../flink/runtime/metrics/dump/QueryScopeInfo.java |  28 +++-
 .../groups/InternalOperatorMetricGroup.java        |   1 +
 .../runtime/metrics/groups/TaskMetricGroup.java    |   5 +-
 .../handler/legacy/metrics/MetricFetcherImpl.java  |   1 +
 .../rest/handler/legacy/metrics/MetricStore.java   | 186 +++++++++++++++++----
 .../rest/handler/util/MutableIOMetrics.java        |   7 +-
 .../messages/webmonitor/JobDetailsTest.java        |  31 ++++
 .../metrics/dump/MetricDumpSerializerTest.java     |   5 +-
 .../runtime/metrics/dump/QueryScopeInfoTest.java   |  10 +-
 .../job/JobVertexBackPressureHandlerTest.java      |   7 +-
 .../AggregatingSubtasksMetricsHandlerTest.java     |   6 +-
 .../job/metrics/JobVertexMetricsHandlerTest.java   |   4 +-
 .../job/metrics/SubtaskMetricsHandlerTest.java     |   4 +-
 .../handler/legacy/metrics/MetricFetcherTest.java  |   5 +-
 .../handler/legacy/metrics/MetricStoreTest.java    |  61 ++++++-
 18 files changed, 408 insertions(+), 64 deletions(-)

diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index eb5a34b2c2d..8e5aee9c633 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -425,7 +425,8 @@ class HistoryServerArchiveFetcher {
                         state,
                         lastMod,
                         tasksPerState,
-                        numTasks);
+                        numTasks,
+                        new HashMap<>());
         MultipleJobsDetails multipleJobsDetails =
                 new MultipleJobsDetails(Collections.singleton(jobDetails));
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
index f9b609ac1cf..74b2964228c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.messages.webmonitor;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -39,6 +40,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -57,6 +60,8 @@ public class JobDetails implements Serializable {
     private static final String FIELD_NAME_STATUS = "state";
     private static final String FIELD_NAME_LAST_MODIFICATION = "last-modification";
     private static final String FIELD_NAME_TOTAL_NUMBER_TASKS = "total";
+    private static final String FIELD_NAME_CURRENT_EXECUTION_ATTEMPTS =
+            "current-execution-attempts";
 
     private final JobID jobId;
 
@@ -76,6 +81,15 @@ public class JobDetails implements Serializable {
 
     private final int numTasks;
 
+    /**
+     * The map holds the attempt number of the current execution attempt in the Execution, which is
+     * considered as the representing execution for the subtask of the vertex. The keys and values
+     * are JobVertexID -> SubtaskIndex -> CurrentExecutionAttemptNumber. It is used to accumulate
+     * the metrics of a subtask in MetricFetcher.
+     */
+    private final Map<String, Map<Integer, Integer>> currentExecutionAttempts;
+
+    @VisibleForTesting
     public JobDetails(
             JobID jobId,
             String jobName,
@@ -86,7 +100,30 @@ public class JobDetails implements Serializable {
             long lastUpdateTime,
             int[] tasksPerState,
             int numTasks) {
+        this(
+                jobId,
+                jobName,
+                startTime,
+                endTime,
+                duration,
+                status,
+                lastUpdateTime,
+                tasksPerState,
+                numTasks,
+                new HashMap<>());
+    }
 
+    public JobDetails(
+            JobID jobId,
+            String jobName,
+            long startTime,
+            long endTime,
+            long duration,
+            JobStatus status,
+            long lastUpdateTime,
+            int[] tasksPerState,
+            int numTasks,
+            Map<String, Map<Integer, Integer>> currentExecutionAttempts) {
         this.jobId = checkNotNull(jobId);
         this.jobName = checkNotNull(jobName);
         this.startTime = startTime;
@@ -100,6 +137,7 @@ public class JobDetails implements Serializable {
                 ExecutionState.values().length);
         this.tasksPerState = checkNotNull(tasksPerState);
         this.numTasks = numTasks;
+        this.currentExecutionAttempts = checkNotNull(currentExecutionAttempts);
     }
 
     public static JobDetails createDetailsForJob(AccessExecutionGraph job) {
@@ -112,16 +150,27 @@ public class JobDetails implements Serializable {
         int[] countsPerStatus = new int[ExecutionState.values().length];
         long lastChanged = 0;
         int numTotalTasks = 0;
+        Map<String, Map<Integer, Integer>> currentExecutionAttempts = new HashMap<>();
 
         for (AccessExecutionJobVertex ejv : job.getVerticesTopologically()) {
             AccessExecutionVertex[] taskVertices = ejv.getTaskVertices();
             numTotalTasks += taskVertices.length;
+            Map<Integer, Integer> vertexAttempts = new HashMap<>();
 
             for (AccessExecutionVertex taskVertex : taskVertices) {
+                if (taskVertex.getCurrentExecutions().size() > 1) {
+                    vertexAttempts.put(
+                            taskVertex.getParallelSubtaskIndex(),
+                            taskVertex.getCurrentExecutionAttempt().getAttemptNumber());
+                }
                 ExecutionState state = taskVertex.getExecutionState();
                 countsPerStatus[state.ordinal()]++;
                 lastChanged = Math.max(lastChanged, taskVertex.getStateTimestamp(state));
             }
+
+            if (!vertexAttempts.isEmpty()) {
+                currentExecutionAttempts.put(String.valueOf(ejv.getJobVertexId()), vertexAttempts);
+            }
         }
 
         lastChanged = Math.max(lastChanged, finished);
@@ -135,7 +184,8 @@ public class JobDetails implements Serializable {
                 status,
                 lastChanged,
                 countsPerStatus,
-                numTotalTasks);
+                numTotalTasks,
+                currentExecutionAttempts);
     }
 
     // ------------------------------------------------------------------------
@@ -176,6 +226,9 @@ public class JobDetails implements Serializable {
         return tasksPerState;
     }
 
+    public Map<String, Map<Integer, Integer>> getCurrentExecutionAttempts() {
+        return currentExecutionAttempts;
+    }
     // ------------------------------------------------------------------------
 
     @Override
@@ -192,7 +245,8 @@ public class JobDetails implements Serializable {
                     && this.status == that.status
                     && this.jobId.equals(that.jobId)
                     && this.jobName.equals(that.jobName)
-                    && Arrays.equals(this.tasksPerState, that.tasksPerState);
+                    && Arrays.equals(this.tasksPerState, that.tasksPerState)
+                    && this.currentExecutionAttempts.equals(that.currentExecutionAttempts);
         } else {
             return false;
         }
@@ -208,6 +262,7 @@ public class JobDetails implements Serializable {
         result = 31 * result + (int) (lastUpdateTime ^ (lastUpdateTime >>> 32));
         result = 31 * result + Arrays.hashCode(tasksPerState);
         result = 31 * result + numTasks;
+        result = 31 * result + currentExecutionAttempts.hashCode();
         return result;
     }
 
@@ -271,6 +326,20 @@ public class JobDetails implements Serializable {
 
             jsonGenerator.writeEndObject();
 
+            if (!jobDetails.currentExecutionAttempts.isEmpty()) {
+                jsonGenerator.writeObjectFieldStart(FIELD_NAME_CURRENT_EXECUTION_ATTEMPTS);
+                for (Map.Entry<String, Map<Integer, Integer>> vertex :
+                        jobDetails.currentExecutionAttempts.entrySet()) {
+                    jsonGenerator.writeObjectFieldStart(vertex.getKey());
+                    for (Map.Entry<Integer, Integer> attempt : vertex.getValue().entrySet()) {
+                        jsonGenerator.writeNumberField(
+                                String.valueOf(attempt.getKey()), attempt.getValue());
+                    }
+                    jsonGenerator.writeEndObject();
+                }
+                jsonGenerator.writeEndObject();
+            }
+
             jsonGenerator.writeEndObject();
         }
     }
@@ -310,6 +379,28 @@ public class JobDetails implements Serializable {
                         jsonNode == null ? 0 : jsonNode.intValue();
             }
 
+            Map<String, Map<Integer, Integer>> attempts = new HashMap<>();
+            JsonNode attemptsNode = rootNode.get(FIELD_NAME_CURRENT_EXECUTION_ATTEMPTS);
+            if (attemptsNode != null) {
+                attemptsNode
+                        .fields()
+                        .forEachRemaining(
+                                vertex -> {
+                                    String vertexId = vertex.getKey();
+                                    Map<Integer, Integer> vertexAttempts =
+                                            attempts.computeIfAbsent(
+                                                    vertexId, k -> new HashMap<>());
+                                    vertex.getValue()
+                                            .fields()
+                                            .forEachRemaining(
+                                                    attempt ->
+                                                            vertexAttempts.put(
+                                                                    Integer.parseInt(
+                                                                            attempt.getKey()),
+                                                                    attempt.getValue().intValue()));
+                                });
+            }
+
             return new JobDetails(
                     jobId,
                     jobName,
@@ -319,7 +410,8 @@ public class JobDetails implements Serializable {
                     jobStatus,
                     lastUpdateTime,
                     numVerticesPerExecutionState,
-                    numTasks);
+                    numTasks,
+                    attempts);
         }
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
index de423dadf9b..90187fed27f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
@@ -240,6 +240,7 @@ public class MetricDumpSerialization {
                 out.writeUTF(taskInfo.jobID);
                 out.writeUTF(taskInfo.vertexID);
                 out.writeInt(taskInfo.subtaskIndex);
+                out.writeInt(taskInfo.attemptNumber);
                 break;
             case INFO_CATEGORY_OPERATOR:
                 QueryScopeInfo.OperatorQueryScopeInfo operatorInfo =
@@ -247,6 +248,7 @@ public class MetricDumpSerialization {
                 out.writeUTF(operatorInfo.jobID);
                 out.writeUTF(operatorInfo.vertexID);
                 out.writeInt(operatorInfo.subtaskIndex);
+                out.writeInt(operatorInfo.attemptNumber);
                 out.writeUTF(operatorInfo.operatorName);
                 break;
             default:
@@ -436,6 +438,7 @@ public class MetricDumpSerialization {
         String jobID;
         String vertexID;
         int subtaskIndex;
+        int attemptNumber;
 
         String scope = dis.readUTF();
         byte cat = dis.readByte();
@@ -452,14 +455,17 @@ public class MetricDumpSerialization {
                 jobID = dis.readUTF();
                 vertexID = dis.readUTF();
                 subtaskIndex = dis.readInt();
-                return new QueryScopeInfo.TaskQueryScopeInfo(jobID, vertexID, subtaskIndex, scope);
+                attemptNumber = dis.readInt();
+                return new QueryScopeInfo.TaskQueryScopeInfo(
+                        jobID, vertexID, subtaskIndex, attemptNumber, scope);
             case INFO_CATEGORY_OPERATOR:
                 jobID = dis.readUTF();
                 vertexID = dis.readUTF();
                 subtaskIndex = dis.readInt();
+                attemptNumber = dis.readInt();
                 String operatorName = dis.readUTF();
                 return new QueryScopeInfo.OperatorQueryScopeInfo(
-                        jobID, vertexID, subtaskIndex, operatorName, scope);
+                        jobID, vertexID, subtaskIndex, attemptNumber, operatorName, scope);
             default:
                 throw new IOException("Unknown scope category: " + cat);
         }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
index c61a9d76412..d0d9652f627 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
@@ -142,22 +142,30 @@ public abstract class QueryScopeInfo {
         public final String jobID;
         public final String vertexID;
         public final int subtaskIndex;
+        public final int attemptNumber;
 
-        public TaskQueryScopeInfo(String jobID, String vertexid, int subtaskIndex) {
-            this(jobID, vertexid, subtaskIndex, "");
+        public TaskQueryScopeInfo(
+                String jobID, String vertexid, int subtaskIndex, int attemptNumber) {
+            this(jobID, vertexid, subtaskIndex, attemptNumber, "");
         }
 
-        public TaskQueryScopeInfo(String jobID, String vertexid, int subtaskIndex, String scope) {
+        public TaskQueryScopeInfo(
+                String jobID, String vertexid, int subtaskIndex, int attemptNumber, String scope) {
             super(scope);
             this.jobID = jobID;
             this.vertexID = vertexid;
             this.subtaskIndex = subtaskIndex;
+            this.attemptNumber = attemptNumber;
         }
 
         @Override
         public TaskQueryScopeInfo copy(String additionalScope) {
             return new TaskQueryScopeInfo(
-                    this.jobID, this.vertexID, this.subtaskIndex, concatScopes(additionalScope));
+                    this.jobID,
+                    this.vertexID,
+                    this.subtaskIndex,
+                    this.attemptNumber,
+                    concatScopes(additionalScope));
         }
 
         @Override
@@ -174,23 +182,30 @@ public abstract class QueryScopeInfo {
         public final String jobID;
         public final String vertexID;
         public final int subtaskIndex;
+        public final int attemptNumber;
         public final String operatorName;
 
         public OperatorQueryScopeInfo(
-                String jobID, String vertexid, int subtaskIndex, String operatorName) {
-            this(jobID, vertexid, subtaskIndex, operatorName, "");
+                String jobID,
+                String vertexid,
+                int subtaskIndex,
+                int attemptNumber,
+                String operatorName) {
+            this(jobID, vertexid, subtaskIndex, attemptNumber, operatorName, "");
         }
 
         public OperatorQueryScopeInfo(
                 String jobID,
                 String vertexid,
                 int subtaskIndex,
+                int attemptNumber,
                 String operatorName,
                 String scope) {
             super(scope);
             this.jobID = jobID;
             this.vertexID = vertexid;
             this.subtaskIndex = subtaskIndex;
+            this.attemptNumber = attemptNumber;
             this.operatorName = operatorName;
         }
 
@@ -200,6 +215,7 @@ public abstract class QueryScopeInfo {
                     this.jobID,
                     this.vertexID,
                     this.subtaskIndex,
+                    this.attemptNumber,
                     this.operatorName,
                     concatScopes(additionalScope));
         }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorMetricGroup.java
index 68a85ba5f2f..d075675b80c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorMetricGroup.java
@@ -75,6 +75,7 @@ public class InternalOperatorMetricGroup extends ComponentMetricGroup<TaskMetric
                 this.parent.parent.jobId.toString(),
                 this.parent.vertexId.toString(),
                 this.parent.subtaskIndex,
+                this.parent.attemptNumber(),
                 filter.filterCharacters(this.operatorName));
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
index 0dc0ff9b6c5..afcbbaa44bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
@@ -134,7 +134,10 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
     protected QueryScopeInfo.TaskQueryScopeInfo createQueryServiceMetricInfo(
             CharacterFilter filter) {
         return new QueryScopeInfo.TaskQueryScopeInfo(
-                this.parent.jobId.toString(), String.valueOf(this.vertexId), this.subtaskIndex);
+                this.parent.jobId.toString(),
+                String.valueOf(this.vertexId),
+                this.subtaskIndex,
+                this.attemptNumber);
     }
 
     // ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java
index beb652f97c7..65d2e162532 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java
@@ -143,6 +143,7 @@ public class MetricFetcherImpl<T extends RestfulGateway> implements MetricFetche
                                     toRetain.add(job.getJobId().toString());
                                 }
                                 metrics.retainJobs(toRetain);
+                                metrics.updateCurrentExecutionAttempts(jobDetails.getJobs());
                             }
                         },
                         executor);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
index 93f289fa036..c86ac763f9c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rest.handler.legacy.metrics;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.metrics.dump.MetricDump;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 
@@ -27,8 +28,10 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.ThreadSafe;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -54,6 +57,15 @@ public class MetricStore {
     private final Map<String, TaskManagerMetricStore> taskManagers = new ConcurrentHashMap<>();
     private final Map<String, JobMetricStore> jobs = new ConcurrentHashMap<>();
 
+    /**
+     * The map holds the attempt number of the representing execution for each subtask of each
+     * vertex. The keys and values are JobID -> JobVertexID -> SubtaskIndex ->
+     * CurrentExecutionAttemptNumber. When a metric of an execution attempt is added, the metric can
+     * also be added to the SubtaskMetricStore when it is of the representing execution.
+     */
+    private final Map<String, Map<String, Map<Integer, Integer>>> currentExecutionAttempts =
+            new ConcurrentHashMap<>();
+
     /**
      * Remove inactive task managers.
      *
@@ -70,6 +82,18 @@ public class MetricStore {
      */
     synchronized void retainJobs(List<String> activeJobs) {
         jobs.keySet().retainAll(activeJobs);
+        currentExecutionAttempts.keySet().retainAll(activeJobs);
+    }
+
+    public synchronized void updateCurrentExecutionAttempts(Collection<JobDetails> jobs) {
+        jobs.forEach(
+                job ->
+                        currentExecutionAttempts.put(
+                                job.getJobId().toString(), job.getCurrentExecutionAttempts()));
+    }
+
+    public Map<String, Map<String, Map<Integer, Integer>>> getCurrentExecutionAttempts() {
+        return currentExecutionAttempts;
     }
 
     /**
@@ -153,7 +177,24 @@ public class MetricStore {
         if (task == null) {
             return null;
         }
-        return ComponentMetricStore.unmodifiable(task.getSubtaskMetricStore(subtaskIndex));
+        return SubtaskMetricStore.unmodifiable(task.getSubtaskMetricStore(subtaskIndex));
+    }
+
+    public synchronized ComponentMetricStore getSubtaskAttemptMetricStore(
+            String jobID, String taskID, int subtaskIndex, int attemptNumber) {
+        JobMetricStore job = jobID == null ? null : jobs.get(jobID);
+        if (job == null) {
+            return null;
+        }
+        TaskMetricStore task = job.getTaskMetricStore(taskID);
+        if (task == null) {
+            return null;
+        }
+        SubtaskMetricStore subtask = task.getSubtaskMetricStore(subtaskIndex);
+        if (subtask == null) {
+            return null;
+        }
+        return ComponentMetricStore.unmodifiable(subtask.getAttemptsMetricStore(attemptNumber));
     }
 
     public synchronized Map<String, JobMetricStore> getJobs() {
@@ -177,7 +218,9 @@ public class MetricStore {
             TaskManagerMetricStore tm;
             JobMetricStore job;
             TaskMetricStore task;
-            ComponentMetricStore subtask;
+            SubtaskMetricStore subtask;
+            ComponentMetricStore attempt;
+            boolean isRepresentativeAttempt;
 
             String name = info.scope.isEmpty() ? metric.name : info.scope + "." + metric.name;
 
@@ -214,15 +257,34 @@ public class MetricStore {
                     task = job.tasks.computeIfAbsent(taskInfo.vertexID, k -> new TaskMetricStore());
                     subtask =
                             task.subtasks.computeIfAbsent(
-                                    taskInfo.subtaskIndex, k -> new ComponentMetricStore());
-                    /**
-                     * The duplication is intended. Metrics scoped by subtask are useful for several
-                     * job/task handlers, while the WebInterface task metric queries currently do
-                     * not account for subtasks, so we don't divide by subtask and instead use the
-                     * concatenation of subtask index and metric name as the name for those.
-                     */
-                    addMetric(subtask.metrics, name, metric);
-                    addMetric(task.metrics, taskInfo.subtaskIndex + "." + name, metric);
+                                    taskInfo.subtaskIndex, k -> new SubtaskMetricStore());
+
+                    // The attempt is the representative one if the current execution attempt
+                    // number for the subtask is not present in the currentExecutionAttempts,
+                    // which means there should be only one execution
+                    isRepresentativeAttempt =
+                            isRepresentativeAttempt(
+                                    taskInfo.jobID,
+                                    taskInfo.vertexID,
+                                    taskInfo.subtaskIndex,
+                                    taskInfo.attemptNumber);
+                    attempt =
+                            subtask.attempts.computeIfAbsent(
+                                    taskInfo.attemptNumber, k -> new ComponentMetricStore());
+                    addMetric(attempt.metrics, name, metric);
+                    // If the attempt is representative one, its metrics can be updated to the
+                    // subtask and task metric store.
+                    if (isRepresentativeAttempt) {
+                        /**
+                         * The duplication is intended. Metrics scoped by subtask are useful for
+                         * several job/task handlers, while the WebInterface task metric queries
+                         * currently do not account for subtasks, so we don't divide by subtask and
+                         * instead use the concatenation of subtask index and metric name as the
+                         * name for those.
+                         */
+                        addMetric(subtask.metrics, name, metric);
+                        addMetric(task.metrics, taskInfo.subtaskIndex + "." + name, metric);
+                    }
                     break;
                 case INFO_CATEGORY_OPERATOR:
                     QueryScopeInfo.OperatorQueryScopeInfo operatorInfo =
@@ -233,21 +295,38 @@ public class MetricStore {
                                     operatorInfo.vertexID, k -> new TaskMetricStore());
                     subtask =
                             task.subtasks.computeIfAbsent(
-                                    operatorInfo.subtaskIndex, k -> new ComponentMetricStore());
-                    /**
-                     * As the WebInterface does not account for operators (because it can't) we
-                     * don't divide by operator and instead use the concatenation of subtask index,
-                     * operator name and metric name as the name.
-                     */
-                    addMetric(subtask.metrics, operatorInfo.operatorName + "." + name, metric);
-                    addMetric(
-                            task.metrics,
-                            operatorInfo.subtaskIndex
-                                    + "."
-                                    + operatorInfo.operatorName
-                                    + "."
-                                    + name,
-                            metric);
+                                    operatorInfo.subtaskIndex, k -> new SubtaskMetricStore());
+
+                    isRepresentativeAttempt =
+                            isRepresentativeAttempt(
+                                    operatorInfo.jobID,
+                                    operatorInfo.vertexID,
+                                    operatorInfo.subtaskIndex,
+                                    operatorInfo.attemptNumber);
+
+                    attempt =
+                            subtask.attempts.computeIfAbsent(
+                                    operatorInfo.attemptNumber, k -> new ComponentMetricStore());
+                    addMetric(attempt.metrics, operatorInfo.operatorName + "." + name, metric);
+
+                    // If the attempt is representative one, its metrics can be updated to the
+                    // subtask and task metric store.
+                    if (isRepresentativeAttempt) {
+                        /**
+                         * As the WebInterface does not account for operators (because it can't) we
+                         * don't divide by operator and instead use the concatenation of subtask
+                         * index, operator name and metric name as the name.
+                         */
+                        addMetric(subtask.metrics, operatorInfo.operatorName + "." + name, metric);
+                        addMetric(
+                                task.metrics,
+                                operatorInfo.subtaskIndex
+                                        + "."
+                                        + operatorInfo.operatorName
+                                        + "."
+                                        + name,
+                                metric);
+                    }
                     break;
                 default:
                     LOG.debug("Invalid metric dump category: " + info.getCategory());
@@ -257,6 +336,19 @@ public class MetricStore {
         }
     }
 
+    // Returns whether the attempt is the representative one. It's also true if the current
+    // execution attempt number for the subtask is not present in the currentExecutionAttempts,
+    // which means there should be only one execution
+    private boolean isRepresentativeAttempt(
+            String jobID, String vertexID, int subtaskIndex, int attemptNumber) {
+        return Optional.of(currentExecutionAttempts)
+                        .map(m -> m.get(jobID))
+                        .map(m -> m.get(vertexID))
+                        .map(m -> m.get(subtaskIndex))
+                        .orElse(attemptNumber)
+                == attemptNumber;
+    }
+
     private void addMetric(Map<String, String> target, String name, MetricDump metric) {
         switch (metric.getCategory()) {
             case METRIC_CATEGORY_COUNTER:
@@ -363,24 +455,24 @@ public class MetricStore {
     /** Sub-structure containing metrics of a single Task. */
     @ThreadSafe
     public static class TaskMetricStore extends ComponentMetricStore {
-        private final Map<Integer, ComponentMetricStore> subtasks;
+        private final Map<Integer, SubtaskMetricStore> subtasks;
 
         private TaskMetricStore() {
             this(new ConcurrentHashMap<>(), new ConcurrentHashMap<>());
         }
 
         private TaskMetricStore(
-                Map<String, String> metrics, Map<Integer, ComponentMetricStore> subtasks) {
+                Map<String, String> metrics, Map<Integer, SubtaskMetricStore> subtasks) {
             super(metrics);
             this.subtasks = checkNotNull(subtasks);
         }
 
-        public ComponentMetricStore getSubtaskMetricStore(int subtaskIndex) {
+        public SubtaskMetricStore getSubtaskMetricStore(int subtaskIndex) {
             return subtasks.get(subtaskIndex);
         }
 
-        public Map<Integer, ComponentMetricStore> getAllSubtaskMetricStores() {
-            return subtasks;
+        public Map<Integer, SubtaskMetricStore> getAllSubtaskMetricStores() {
+            return unmodifiableMap(subtasks);
         }
 
         private static TaskMetricStore unmodifiable(TaskMetricStore source) {
@@ -391,4 +483,36 @@ public class MetricStore {
                     unmodifiableMap(source.metrics), unmodifiableMap(source.subtasks));
         }
     }
+
+    /** Sub-structure containing metrics of a single subtask. */
+    @ThreadSafe
+    public static class SubtaskMetricStore extends ComponentMetricStore {
+        private final Map<Integer, ComponentMetricStore> attempts;
+
+        private SubtaskMetricStore() {
+            this(new ConcurrentHashMap<>(), new ConcurrentHashMap<>());
+        }
+
+        private SubtaskMetricStore(
+                Map<String, String> metrics, Map<Integer, ComponentMetricStore> attempts) {
+            super(metrics);
+            this.attempts = checkNotNull(attempts);
+        }
+
+        public ComponentMetricStore getAttemptsMetricStore(int attemptNumber) {
+            return attempts.get(attemptNumber);
+        }
+
+        public Map<Integer, ComponentMetricStore> getAllAttemptsMetricStores() {
+            return unmodifiableMap(attempts);
+        }
+
+        private static SubtaskMetricStore unmodifiable(SubtaskMetricStore source) {
+            if (source == null) {
+                return null;
+            }
+            return new SubtaskMetricStore(
+                    unmodifiableMap(source.metrics), unmodifiableMap(source.attempts));
+        }
+    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
index 7da9061a090..8ba34a10d85 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
@@ -99,8 +99,11 @@ public class MutableIOMetrics extends IOMetrics {
                 fetcher.update();
                 MetricStore.ComponentMetricStore metrics =
                         fetcher.getMetricStore()
-                                .getSubtaskMetricStore(
-                                        jobID, taskID, attempt.getParallelSubtaskIndex());
+                                .getSubtaskAttemptMetricStore(
+                                        jobID,
+                                        taskID,
+                                        attempt.getParallelSubtaskIndex(),
+                                        attempt.getAttemptNumber());
                 if (metrics != null) {
                     /**
                      * We want to keep track of missing metrics to be able to make a difference
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java
index 790ca43ce7d..609ef1f8e0d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java
@@ -29,6 +29,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -102,4 +104,33 @@ class JobDetailsTest {
 
         assertThat(unmarshalled).isEqualTo(expected);
     }
+
+    @Test
+    void testJobDetailsWithExecutionAttemptsMarshalling() throws JsonProcessingException {
+        Map<String, Map<Integer, Integer>> currentExecutionAttempts = new HashMap<>();
+        currentExecutionAttempts.computeIfAbsent("a", k -> new HashMap<>()).put(1, 2);
+        currentExecutionAttempts.computeIfAbsent("a", k -> new HashMap<>()).put(2, 4);
+        currentExecutionAttempts.computeIfAbsent("b", k -> new HashMap<>()).put(3, 1);
+
+        final JobDetails expected =
+                new JobDetails(
+                        new JobID(),
+                        "foobar",
+                        1L,
+                        10L,
+                        9L,
+                        JobStatus.RUNNING,
+                        8L,
+                        new int[] {1, 3, 3, 4, 7, 4, 2, 7, 3, 3},
+                        42,
+                        currentExecutionAttempts);
+
+        final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
+
+        final JsonNode marshalled = objectMapper.valueToTree(expected);
+
+        final JobDetails unmarshalled = objectMapper.treeToValue(marshalled, JobDetails.class);
+
+        assertThat(unmarshalled).isEqualTo(expected);
+    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
index 52eec21ab5d..2876cf1635a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
@@ -159,11 +159,12 @@ class MetricDumpSerializerTest {
         gauges.put(
                 g1,
                 new Tuple2<QueryScopeInfo, String>(
-                        new QueryScopeInfo.TaskQueryScopeInfo("jid", "vid", 2, "D"), "g1"));
+                        new QueryScopeInfo.TaskQueryScopeInfo("jid", "vid", 2, 0, "D"), "g1"));
         histograms.put(
                 h1,
                 new Tuple2<QueryScopeInfo, String>(
-                        new QueryScopeInfo.OperatorQueryScopeInfo("jid", "vid", 2, "opname", "E"),
+                        new QueryScopeInfo.OperatorQueryScopeInfo(
+                                "jid", "vid", 2, 0, "opname", "E"),
                         "h1"));
 
         MetricDumpSerialization.MetricSerializationResult serialized =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
index 0355d01fcf0..7380cd004b8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
@@ -94,7 +94,7 @@ class QueryScopeInfoTest {
     @Test
     void testTaskQueryScopeInfo() {
         QueryScopeInfo.TaskQueryScopeInfo info =
-                new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 2);
+                new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 2, 0);
         assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_TASK);
         assertThat(info.scope).isEmpty();
         assertThat(info.jobID).isEqualTo("jobid");
@@ -108,7 +108,7 @@ class QueryScopeInfoTest {
         assertThat(info.vertexID).isEqualTo("taskid");
         assertThat(info.subtaskIndex).isEqualTo(2);
 
-        info = new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 2, "hello");
+        info = new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 2, 0, "hello");
         assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_TASK);
         assertThat(info.scope).isEqualTo("hello");
         assertThat(info.jobID).isEqualTo("jobid");
@@ -126,7 +126,7 @@ class QueryScopeInfoTest {
     @Test
     void testOperatorQueryScopeInfo() {
         QueryScopeInfo.OperatorQueryScopeInfo info =
-                new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 2, "opname");
+                new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 2, 0, "opname");
         assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_OPERATOR);
         assertThat(info.scope).isEmpty();
         assertThat(info.jobID).isEqualTo("jobid");
@@ -142,7 +142,9 @@ class QueryScopeInfoTest {
         assertThat(info.operatorName).isEqualTo("opname");
         assertThat(info.subtaskIndex).isEqualTo(2);
 
-        info = new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 2, "opname", "hello");
+        info =
+                new QueryScopeInfo.OperatorQueryScopeInfo(
+                        "jobid", "taskid", 2, 0, "opname", "hello");
         assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_OPERATOR);
         assertThat(info.scope).isEqualTo("hello");
         assertThat(info.jobID).isEqualTo("jobid");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
index 77cd5047d3e..85bf3a44cd5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
@@ -77,6 +77,7 @@ class JobVertexBackPressureHandlerTest {
                 new TaskQueryScopeInfo(
                         TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(),
                         TEST_JOB_VERTEX_ID.toString(),
+                        0,
                         0);
         dumps.add(new GaugeDump(task0, MetricNames.TASK_BACK_PRESSURED_TIME, "1000"));
         dumps.add(new GaugeDump(task0, MetricNames.TASK_IDLE_TIME, "0"));
@@ -86,7 +87,8 @@ class JobVertexBackPressureHandlerTest {
                 new TaskQueryScopeInfo(
                         TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(),
                         TEST_JOB_VERTEX_ID.toString(),
-                        1);
+                        1,
+                        0);
         dumps.add(new GaugeDump(task1, MetricNames.TASK_BACK_PRESSURED_TIME, "500"));
         dumps.add(new GaugeDump(task1, MetricNames.TASK_IDLE_TIME, "100"));
         dumps.add(new GaugeDump(task1, MetricNames.TASK_BUSY_TIME, "900"));
@@ -97,7 +99,8 @@ class JobVertexBackPressureHandlerTest {
                 new TaskQueryScopeInfo(
                         TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(),
                         TEST_JOB_VERTEX_ID.toString(),
-                        3);
+                        3,
+                        0);
         dumps.add(new GaugeDump(task3, MetricNames.TASK_BACK_PRESSURED_TIME, "100"));
         dumps.add(new GaugeDump(task3, MetricNames.TASK_IDLE_TIME, "200"));
         dumps.add(new GaugeDump(task3, MetricNames.TASK_BUSY_TIME, "700"));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java
index b04020ab411..ed13c7da040 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java
@@ -65,19 +65,19 @@ public class AggregatingSubtasksMetricsHandlerTest
         Collection<MetricDump> dumps = new ArrayList<>(3);
         QueryScopeInfo.TaskQueryScopeInfo task1 =
                 new QueryScopeInfo.TaskQueryScopeInfo(
-                        JOB_ID.toString(), TASK_ID.toString(), 1, "abc");
+                        JOB_ID.toString(), TASK_ID.toString(), 1, 0, "abc");
         MetricDump.CounterDump cd1 = new MetricDump.CounterDump(task1, "metric1", 1);
         dumps.add(cd1);
 
         QueryScopeInfo.TaskQueryScopeInfo task2 =
                 new QueryScopeInfo.TaskQueryScopeInfo(
-                        JOB_ID.toString(), TASK_ID.toString(), 2, "abc");
+                        JOB_ID.toString(), TASK_ID.toString(), 2, 0, "abc");
         MetricDump.CounterDump cd2 = new MetricDump.CounterDump(task2, "metric1", 3);
         dumps.add(cd2);
 
         QueryScopeInfo.TaskQueryScopeInfo task3 =
                 new QueryScopeInfo.TaskQueryScopeInfo(
-                        JOB_ID.toString(), TASK_ID.toString(), 3, "abc");
+                        JOB_ID.toString(), TASK_ID.toString(), 3, 0, "abc");
         MetricDump.CounterDump cd3 = new MetricDump.CounterDump(task3, "metric2", 5);
         dumps.add(cd3);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandlerTest.java
index 66ea8c22c00..b48dbbac38f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandlerTest.java
@@ -34,6 +34,8 @@ public class JobVertexMetricsHandlerTest extends MetricsHandlerTestBase<JobVerte
 
     private static final int TEST_SUBTASK_INDEX = 1;
 
+    private static final int TEST_ATTEMPT_NUMBER = 0;
+
     @Override
     JobVertexMetricsHandler getMetricsHandler() {
         return new JobVertexMetricsHandler(
@@ -43,7 +45,7 @@ public class JobVertexMetricsHandlerTest extends MetricsHandlerTestBase<JobVerte
     @Override
     QueryScopeInfo getQueryScopeInfo() {
         return new QueryScopeInfo.TaskQueryScopeInfo(
-                TEST_JOB_ID, TEST_VERTEX_ID, TEST_SUBTASK_INDEX);
+                TEST_JOB_ID, TEST_VERTEX_ID, TEST_SUBTASK_INDEX, TEST_ATTEMPT_NUMBER);
     }
 
     @Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandlerTest.java
index f59569c1d12..30559e56aee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandlerTest.java
@@ -37,6 +37,8 @@ public class SubtaskMetricsHandlerTest extends MetricsHandlerTestBase<SubtaskMet
 
     private static final int TEST_SUBTASK_INDEX = 0;
 
+    private static final int TEST_ATTEMPT_NUMBER = 0;
+
     @Override
     SubtaskMetricsHandler getMetricsHandler() {
         return new SubtaskMetricsHandler(leaderRetriever, TIMEOUT, TEST_HEADERS, mockMetricFetcher);
@@ -45,7 +47,7 @@ public class SubtaskMetricsHandlerTest extends MetricsHandlerTestBase<SubtaskMet
     @Override
     QueryScopeInfo getQueryScopeInfo() {
         return new QueryScopeInfo.TaskQueryScopeInfo(
-                TEST_JOB_ID, TEST_VERTEX_ID, TEST_SUBTASK_INDEX);
+                TEST_JOB_ID, TEST_VERTEX_ID, TEST_SUBTASK_INDEX, TEST_ATTEMPT_NUMBER);
     }
 
     @Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
index 9dc014d8b50..a7259988672 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
@@ -177,12 +177,13 @@ public class MetricFetcherTest extends TestLogger {
                 c1,
                 new Tuple2<>(
                         new QueryScopeInfo.OperatorQueryScopeInfo(
-                                jobID.toString(), "taskid", 2, "opname", "abc"),
+                                jobID.toString(), "taskid", 2, 0, "opname", "abc"),
                         "oc"));
         counters.put(
                 c2,
                 new Tuple2<>(
-                        new QueryScopeInfo.TaskQueryScopeInfo(jobID.toString(), "taskid", 2, "abc"),
+                        new QueryScopeInfo.TaskQueryScopeInfo(
+                                jobID.toString(), "taskid", 2, 0, "abc"),
                         "tc"));
         meters.put(
                 new Meter() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
index 97c739df224..c54f50291fc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java
@@ -24,6 +24,9 @@ import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -41,7 +44,18 @@ class MetricStoreTest {
         assertThat(store.getJobMetricStore("jobid").getMetric("abc.metric4", "-1")).isEqualTo("3");
 
         assertThat(store.getTaskMetricStore("jobid", "taskid").getMetric("8.abc.metric5", "-1"))
+                .isEqualTo("14");
+        assertThat(store.getSubtaskMetricStore("jobid", "taskid", 8).getMetric("abc.metric5", "-1"))
+                .isEqualTo("14");
+        assertThat(
+                        store.getSubtaskAttemptMetricStore("jobid", "taskid", 8, 1)
+                                .getMetric("abc.metric5", "-1"))
                 .isEqualTo("4");
+        assertThat(
+                        store.getSubtaskAttemptMetricStore("jobid", "taskid", 8, 2)
+                                .getMetric("abc.metric5", "-1"))
+                .isEqualTo("14");
+
         assertThat(
                         store.getTaskMetricStore("jobid", "taskid")
                                 .getMetric("8.opname.abc.metric6", "-1"))
@@ -50,6 +64,27 @@ class MetricStoreTest {
                         store.getTaskMetricStore("jobid", "taskid")
                                 .getMetric("8.opname.abc.metric7", "-1"))
                 .isEqualTo("6");
+        assertThat(
+                        store.getTaskMetricStore("jobid", "taskid")
+                                .getMetric("1.opname.abc.metric7", "-1"))
+                .isEqualTo("6");
+        assertThat(
+                        store.getSubtaskMetricStore("jobid", "taskid", 1)
+                                .getMetric("opname.abc.metric7", "-1"))
+                .isEqualTo("6");
+        assertThat(store.getSubtaskAttemptMetricStore("jobid", "taskid", 1, 2)).isNull();
+        assertThat(
+                        store.getSubtaskAttemptMetricStore("jobid", "taskid", 1, 3)
+                                .getMetric("opname.abc.metric7", "-1"))
+                .isEqualTo("6");
+        assertThat(
+                        store.getSubtaskAttemptMetricStore("jobid", "taskid", 8, 2)
+                                .getMetric("opname.abc.metric7", "-1"))
+                .isEqualTo("6");
+        assertThat(
+                        store.getSubtaskAttemptMetricStore("jobid", "taskid", 8, 4)
+                                .getMetric("opname.abc.metric7", "-1"))
+                .isEqualTo("16");
     }
 
     @Test
@@ -72,6 +107,11 @@ class MetricStoreTest {
     }
 
     static MetricStore setupStore(MetricStore store) {
+        Map<Integer, Integer> currentExecutionAttempts = new HashMap<>();
+        currentExecutionAttempts.put(8, 2);
+        store.getCurrentExecutionAttempts()
+                .put("jobid", Collections.singletonMap("taskid", currentExecutionAttempts));
+
         QueryScopeInfo.JobManagerQueryScopeInfo jm =
                 new QueryScopeInfo.JobManagerQueryScopeInfo("abc");
         MetricDump.CounterDump cd1 = new MetricDump.CounterDump(jm, "metric1", 0);
@@ -96,19 +136,30 @@ class MetricStoreTest {
         MetricDump.CounterDump cd42 = new MetricDump.CounterDump(job2, "metric4", 3);
 
         QueryScopeInfo.TaskQueryScopeInfo task =
-                new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 8, "abc");
+                new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 8, 1, "abc");
         MetricDump.CounterDump cd5 = new MetricDump.CounterDump(task, "metric5", 4);
 
+        QueryScopeInfo.TaskQueryScopeInfo speculativeTask =
+                new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 8, 2, "abc");
+        MetricDump.CounterDump cd52 = new MetricDump.CounterDump(speculativeTask, "metric5", 14);
+
         QueryScopeInfo.OperatorQueryScopeInfo operator =
-                new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 8, "opname", "abc");
+                new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 8, 2, "opname", "abc");
         MetricDump.CounterDump cd6 = new MetricDump.CounterDump(operator, "metric6", 5);
         MetricDump.CounterDump cd7 = new MetricDump.CounterDump(operator, "metric7", 6);
 
         QueryScopeInfo.OperatorQueryScopeInfo operator2 =
-                new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 1, "opname", "abc");
+                new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 1, 3, "opname", "abc");
         MetricDump.CounterDump cd62 = new MetricDump.CounterDump(operator2, "metric6", 5);
         MetricDump.CounterDump cd72 = new MetricDump.CounterDump(operator2, "metric7", 6);
 
+        QueryScopeInfo.OperatorQueryScopeInfo speculativeOperator2 =
+                new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 8, 4, "opname", "abc");
+        MetricDump.CounterDump cd63 =
+                new MetricDump.CounterDump(speculativeOperator2, "metric6", 15);
+        MetricDump.CounterDump cd73 =
+                new MetricDump.CounterDump(speculativeOperator2, "metric7", 16);
+
         store.add(cd1);
         store.add(cd2);
         store.add(cd2a);
@@ -125,6 +176,10 @@ class MetricStoreTest {
         store.add(cd32);
         store.add(cd42);
 
+        store.add(cd52);
+        store.add(cd63);
+        store.add(cd73);
+
         return store;
     }
 }


[flink] 03/06: [FLINK-28588][rest] Add blocked task manager count and blocked slot count in ResourceOverview and ClusterOverview

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0263b55288be7b569f56dd42a94c5e48bcc1607b
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Tue Jul 26 16:16:45 2022 +0800

    [FLINK-28588][rest] Add blocked task manager count and blocked slot count in ResourceOverview and ClusterOverview
---
 .../src/test/resources/rest_api_v1.snapshot        |  6 ++
 .../messages/webmonitor/ClusterOverview.java       | 52 +++++++++---
 .../runtime/resourcemanager/ResourceManager.java   | 26 +++++-
 .../runtime/resourcemanager/ResourceOverview.java  | 18 +++-
 .../messages/ClusterOverviewWithVersion.java       | 26 +++---
 .../resourcemanager/ResourceManagerTest.java       | 96 ++++++++++++++++++++++
 .../utils/TestingResourceManagerGateway.java       |  2 +-
 .../messages/ClusterOverviewWithVersionTest.java   |  2 +-
 .../runtime/webmonitor/TestingRestfulGateway.java  |  2 +-
 9 files changed, 196 insertions(+), 34 deletions(-)

diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 427f59c2723..873e5062d7d 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -3225,6 +3225,12 @@
         "slots-available" : {
           "type" : "integer"
         },
+        "taskmanagers-blocked" : {
+          "type" : "integer"
+        },
+        "slots-free-and-blocked" : {
+          "type" : "integer"
+        },
         "jobs-running" : {
           "type" : "integer"
         },
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java
index 34bf05d0711..7ea327c8029 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java
@@ -21,8 +21,12 @@ package org.apache.flink.runtime.messages.webmonitor;
 import org.apache.flink.runtime.resourcemanager.ResourceOverview;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
+import javax.annotation.Nullable;
+
 /**
  * Response to the {@link RequestStatusOverview} message, carrying a description of the Flink
  * cluster status.
@@ -34,6 +38,8 @@ public class ClusterOverview extends JobsOverview {
     public static final String FIELD_NAME_TASKMANAGERS = "taskmanagers";
     public static final String FIELD_NAME_SLOTS_TOTAL = "slots-total";
     public static final String FIELD_NAME_SLOTS_AVAILABLE = "slots-available";
+    public static final String FIELD_NAME_TASKMANAGERS_BLOCKED = "taskmanagers-blocked";
+    public static final String FIELD_NAME_SLOTS_FREE_AND_BLOCKED = "slots-free-and-blocked";
 
     @JsonProperty(FIELD_NAME_TASKMANAGERS)
     private final int numTaskManagersConnected;
@@ -44,11 +50,24 @@ public class ClusterOverview extends JobsOverview {
     @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE)
     private final int numSlotsAvailable;
 
+    @JsonProperty(FIELD_NAME_TASKMANAGERS_BLOCKED)
+    @JsonInclude(Include.NON_DEFAULT)
+    private final int numTaskManagersBlocked;
+
+    @JsonProperty(FIELD_NAME_SLOTS_FREE_AND_BLOCKED)
+    @JsonInclude(Include.NON_DEFAULT)
+    private final int numSlotsFreeAndBlocked;
+
     @JsonCreator
+    // numTaskManagersBlocked and numSlotsFreeAndBlocked is Nullable since Jackson will assign null
+    // if the field is absent while parsing
     public ClusterOverview(
             @JsonProperty(FIELD_NAME_TASKMANAGERS) int numTaskManagersConnected,
             @JsonProperty(FIELD_NAME_SLOTS_TOTAL) int numSlotsTotal,
             @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) int numSlotsAvailable,
+            @JsonProperty(FIELD_NAME_TASKMANAGERS_BLOCKED) @Nullable Integer numTaskManagersBlocked,
+            @JsonProperty(FIELD_NAME_SLOTS_FREE_AND_BLOCKED) @Nullable
+                    Integer numSlotsFreeAndBlocked,
             @JsonProperty(FIELD_NAME_JOBS_RUNNING) int numJobsRunningOrPending,
             @JsonProperty(FIELD_NAME_JOBS_FINISHED) int numJobsFinished,
             @JsonProperty(FIELD_NAME_JOBS_CANCELLED) int numJobsCancelled,
@@ -59,18 +78,8 @@ public class ClusterOverview extends JobsOverview {
         this.numTaskManagersConnected = numTaskManagersConnected;
         this.numSlotsTotal = numSlotsTotal;
         this.numSlotsAvailable = numSlotsAvailable;
-    }
-
-    public ClusterOverview(
-            int numTaskManagersConnected,
-            int numSlotsTotal,
-            int numSlotsAvailable,
-            JobsOverview jobs1,
-            JobsOverview jobs2) {
-        super(jobs1, jobs2);
-        this.numTaskManagersConnected = numTaskManagersConnected;
-        this.numSlotsTotal = numSlotsTotal;
-        this.numSlotsAvailable = numSlotsAvailable;
+        this.numTaskManagersBlocked = numTaskManagersBlocked == null ? 0 : numTaskManagersBlocked;
+        this.numSlotsFreeAndBlocked = numSlotsFreeAndBlocked == null ? 0 : numSlotsFreeAndBlocked;
     }
 
     public ClusterOverview(ResourceOverview resourceOverview, JobsOverview jobsOverview) {
@@ -78,6 +87,8 @@ public class ClusterOverview extends JobsOverview {
                 resourceOverview.getNumberTaskManagers(),
                 resourceOverview.getNumberRegisteredSlots(),
                 resourceOverview.getNumberFreeSlots(),
+                resourceOverview.getNumberBlockedTaskManagers(),
+                resourceOverview.getNumberBlockedFreeSlots(),
                 jobsOverview.getNumJobsRunningOrPending(),
                 jobsOverview.getNumJobsFinished(),
                 jobsOverview.getNumJobsCancelled(),
@@ -96,6 +107,13 @@ public class ClusterOverview extends JobsOverview {
         return numSlotsAvailable;
     }
 
+    public int getNumTaskManagersBlocked() {
+        return numTaskManagersBlocked;
+    }
+
+    public int getNumSlotsFreeAndBlocked() {
+        return numSlotsFreeAndBlocked;
+    }
     // ------------------------------------------------------------------------
 
     @Override
@@ -107,6 +125,8 @@ public class ClusterOverview extends JobsOverview {
             return this.numTaskManagersConnected == that.numTaskManagersConnected
                     && this.numSlotsTotal == that.numSlotsTotal
                     && this.numSlotsAvailable == that.numSlotsAvailable
+                    && this.numTaskManagersBlocked == that.numTaskManagersBlocked
+                    && this.numSlotsFreeAndBlocked == that.numSlotsFreeAndBlocked
                     && this.getNumJobsRunningOrPending() == that.getNumJobsRunningOrPending()
                     && this.getNumJobsFinished() == that.getNumJobsFinished()
                     && this.getNumJobsCancelled() == that.getNumJobsCancelled()
@@ -122,6 +142,8 @@ public class ClusterOverview extends JobsOverview {
         result = 31 * result + numTaskManagersConnected;
         result = 31 * result + numSlotsTotal;
         result = 31 * result + numSlotsAvailable;
+        result = 31 * result + numTaskManagersBlocked;
+        result = 31 * result + numSlotsFreeAndBlocked;
         return result;
     }
 
@@ -130,10 +152,16 @@ public class ClusterOverview extends JobsOverview {
         return "StatusOverview {"
                 + "numTaskManagersConnected="
                 + numTaskManagersConnected
+                + (numTaskManagersBlocked == 0
+                        ? ""
+                        : (", numTaskManagersBlocked=" + numTaskManagersBlocked))
                 + ", numSlotsTotal="
                 + numSlotsTotal
                 + ", numSlotsAvailable="
                 + numSlotsAvailable
+                + (numSlotsFreeAndBlocked == 0
+                        ? ""
+                        : (", numSlotsFreeAndBlocked=" + numSlotsFreeAndBlocked))
                 + ", numJobsRunningOrPending="
                 + getNumJobsRunningOrPending()
                 + ", numJobsFinished="
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index fb85cf88231..8b4f26ea68d 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -688,15 +688,34 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
     @Override
     public CompletableFuture<ResourceOverview> requestResourceOverview(Time timeout) {
         final int numberSlots = slotManager.getNumberRegisteredSlots();
-        final int numberFreeSlots = slotManager.getNumberFreeSlots();
         final ResourceProfile totalResource = slotManager.getRegisteredResource();
-        final ResourceProfile freeResource = slotManager.getFreeResource();
 
+        int numberFreeSlots = slotManager.getNumberFreeSlots();
+        ResourceProfile freeResource = slotManager.getFreeResource();
+
+        int blockedTaskManagers = 0;
+        int totalBlockedFreeSlots = 0;
+        if (!blocklistHandler.getAllBlockedNodeIds().isEmpty()) {
+            for (WorkerRegistration<WorkerType> registration : taskExecutors.values()) {
+                if (blocklistHandler.isBlockedTaskManager(registration.getResourceID())) {
+                    blockedTaskManagers++;
+                    int blockedFreeSlots =
+                            slotManager.getNumberFreeSlotsOf(registration.getInstanceID());
+                    totalBlockedFreeSlots += blockedFreeSlots;
+                    numberFreeSlots -= blockedFreeSlots;
+                    freeResource =
+                            freeResource.subtract(
+                                    slotManager.getFreeResourceOf(registration.getInstanceID()));
+                }
+            }
+        }
         return CompletableFuture.completedFuture(
                 new ResourceOverview(
                         taskExecutors.size(),
                         numberSlots,
                         numberFreeSlots,
+                        blockedTaskManagers,
+                        totalBlockedFreeSlots,
                         totalResource,
                         freeResource));
     }
@@ -866,7 +885,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
     //  Internal methods
     // ------------------------------------------------------------------------
 
-    private String getNodeIdOfTaskManager(ResourceID taskManagerId) {
+    @VisibleForTesting
+    String getNodeIdOfTaskManager(ResourceID taskManagerId) {
         checkState(taskExecutors.containsKey(taskManagerId));
         return taskExecutors.get(taskManagerId).getNodeId();
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java
index 8e9ee2dad1f..d68b17346fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java
@@ -28,7 +28,7 @@ public class ResourceOverview implements Serializable {
     private static final long serialVersionUID = 7618746920569224557L;
 
     private static final ResourceOverview EMPTY_RESOURCE_OVERVIEW =
-            new ResourceOverview(0, 0, 0, ResourceProfile.ZERO, ResourceProfile.ZERO);
+            new ResourceOverview(0, 0, 0, 0, 0, ResourceProfile.ZERO, ResourceProfile.ZERO);
 
     private final int numberTaskManagers;
 
@@ -36,6 +36,10 @@ public class ResourceOverview implements Serializable {
 
     private final int numberFreeSlots;
 
+    private final int numberBlockedTaskManagers;
+
+    private final int numberBlockedFreeSlots;
+
     private final ResourceProfile totalResource;
 
     private final ResourceProfile freeResource;
@@ -44,11 +48,15 @@ public class ResourceOverview implements Serializable {
             int numberTaskManagers,
             int numberRegisteredSlots,
             int numberFreeSlots,
+            int numberBlockedTaskManagers,
+            int numberBlockedFreeSlots,
             ResourceProfile totalResource,
             ResourceProfile freeResource) {
         this.numberTaskManagers = numberTaskManagers;
         this.numberRegisteredSlots = numberRegisteredSlots;
         this.numberFreeSlots = numberFreeSlots;
+        this.numberBlockedTaskManagers = numberBlockedTaskManagers;
+        this.numberBlockedFreeSlots = numberBlockedFreeSlots;
         this.totalResource = totalResource;
         this.freeResource = freeResource;
     }
@@ -65,6 +73,14 @@ public class ResourceOverview implements Serializable {
         return numberFreeSlots;
     }
 
+    public int getNumberBlockedTaskManagers() {
+        return numberBlockedTaskManagers;
+    }
+
+    public int getNumberBlockedFreeSlots() {
+        return numberBlockedFreeSlots;
+    }
+
     public ResourceProfile getTotalResource() {
         return totalResource;
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersion.java
index fa40db26cfd..7beaa32ab65 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersion.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersion.java
@@ -19,13 +19,14 @@
 package org.apache.flink.runtime.rest.handler.legacy.messages;
 
 import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
-import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
+import javax.annotation.Nullable;
+
 import java.util.Objects;
 
 /** Cluster overview message including the current Flink version and commit id. */
@@ -43,10 +44,15 @@ public class ClusterOverviewWithVersion extends ClusterOverview implements Respo
     private final String commitId;
 
     @JsonCreator
+    // numTaskManagersBlocked and numSlotsFreeAndBlocked is Nullable since Jackson will assign null
+    // if the field is absent while parsing
     public ClusterOverviewWithVersion(
             @JsonProperty(FIELD_NAME_TASKMANAGERS) int numTaskManagersConnected,
             @JsonProperty(FIELD_NAME_SLOTS_TOTAL) int numSlotsTotal,
             @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) int numSlotsAvailable,
+            @JsonProperty(FIELD_NAME_TASKMANAGERS_BLOCKED) @Nullable Integer numTaskManagersBlocked,
+            @JsonProperty(FIELD_NAME_SLOTS_FREE_AND_BLOCKED) @Nullable
+                    Integer numSlotsFreeAndBlocked,
             @JsonProperty(FIELD_NAME_JOBS_RUNNING) int numJobsRunningOrPending,
             @JsonProperty(FIELD_NAME_JOBS_FINISHED) int numJobsFinished,
             @JsonProperty(FIELD_NAME_JOBS_CANCELLED) int numJobsCancelled,
@@ -57,6 +63,8 @@ public class ClusterOverviewWithVersion extends ClusterOverview implements Respo
                 numTaskManagersConnected,
                 numSlotsTotal,
                 numSlotsAvailable,
+                numTaskManagersBlocked,
+                numSlotsFreeAndBlocked,
                 numJobsRunningOrPending,
                 numJobsFinished,
                 numJobsCancelled,
@@ -66,26 +74,14 @@ public class ClusterOverviewWithVersion extends ClusterOverview implements Respo
         this.commitId = Preconditions.checkNotNull(commitId);
     }
 
-    public ClusterOverviewWithVersion(
-            int numTaskManagersConnected,
-            int numSlotsTotal,
-            int numSlotsAvailable,
-            JobsOverview jobs1,
-            JobsOverview jobs2,
-            String version,
-            String commitId) {
-        super(numTaskManagersConnected, numSlotsTotal, numSlotsAvailable, jobs1, jobs2);
-
-        this.version = Preconditions.checkNotNull(version);
-        this.commitId = Preconditions.checkNotNull(commitId);
-    }
-
     public static ClusterOverviewWithVersion fromStatusOverview(
             ClusterOverview statusOverview, String version, String commitId) {
         return new ClusterOverviewWithVersion(
                 statusOverview.getNumTaskManagersConnected(),
                 statusOverview.getNumSlotsTotal(),
                 statusOverview.getNumSlotsAvailable(),
+                statusOverview.getNumTaskManagersBlocked(),
+                statusOverview.getNumSlotsFreeAndBlocked(),
                 statusOverview.getNumJobsRunningOrPending(),
                 statusOverview.getNumJobsFinished(),
                 statusOverview.getNumJobsCancelled(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index eba7a62bd5b..d318fd2449f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -28,9 +28,11 @@ import org.apache.flink.runtime.blocklist.DefaultBlocklistHandler;
 import org.apache.flink.runtime.blocklist.NoOpBlocklistHandler;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
@@ -52,14 +54,18 @@ import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
 import org.apache.flink.runtime.security.token.NoOpDelegationTokenManager;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 import org.apache.flink.runtime.slots.ResourceRequirements;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.util.function.ThrowingConsumer;
 
 import org.junit.jupiter.api.AfterAll;
@@ -72,6 +78,7 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -670,6 +677,95 @@ class ResourceManagerTest {
         assertThat(receivedBlockedNodes2).containsExactlyInAnyOrder(blockedNode1, blockedNode2);
     }
 
+    @Test
+    void testResourceOverviewWithBlockedSlots() throws Exception {
+        ManuallyTriggeredScheduledExecutor executor = new ManuallyTriggeredScheduledExecutor();
+        final SlotManager slotManager = DeclarativeSlotManagerBuilder.newBuilder(executor).build();
+        resourceManager =
+                new ResourceManagerBuilder()
+                        .withSlotManager(slotManager)
+                        .withBlocklistHandlerFactory(
+                                new DefaultBlocklistHandler.Factory(Duration.ofMillis(100L)))
+                        .buildAndStart();
+
+        final ResourceManagerGateway resourceManagerGateway =
+                resourceManager.getSelfGateway(ResourceManagerGateway.class);
+
+        ResourceID taskExecutor = ResourceID.generate();
+        ResourceID taskExecutorToBlock = ResourceID.generate();
+        registerTaskExecutorAndSlot(resourceManagerGateway, taskExecutor, 3);
+        registerTaskExecutorAndSlot(resourceManagerGateway, taskExecutorToBlock, 5);
+        executor.triggerAll();
+
+        ResourceOverview overview =
+                resourceManagerGateway.requestResourceOverview(Time.seconds(5)).get();
+        assertThat(overview.getNumberTaskManagers()).isEqualTo(2);
+        assertThat(overview.getNumberRegisteredSlots()).isEqualTo(8);
+        assertThat(overview.getNumberFreeSlots()).isEqualTo(8);
+        assertThat(overview.getNumberBlockedTaskManagers()).isEqualTo(0);
+        assertThat(overview.getNumberBlockedFreeSlots()).isEqualTo(0);
+        assertThat(overview.getTotalResource())
+                .isEqualTo(ResourceProfile.fromResources(1, 1024).multiply(8));
+        assertThat(overview.getFreeResource())
+                .isEqualTo(ResourceProfile.fromResources(1, 1024).multiply(8));
+
+        resourceManagerGateway.notifyNewBlockedNodes(
+                Collections.singleton(
+                        new BlockedNode(
+                                resourceManager.getNodeIdOfTaskManager(taskExecutorToBlock),
+                                "Test cause",
+                                Long.MAX_VALUE)));
+
+        ResourceOverview overviewBlocked =
+                resourceManagerGateway.requestResourceOverview(Time.seconds(5)).get();
+        assertThat(overviewBlocked.getNumberTaskManagers()).isEqualTo(2);
+        assertThat(overviewBlocked.getNumberRegisteredSlots()).isEqualTo(8);
+        assertThat(overviewBlocked.getNumberFreeSlots()).isEqualTo(3);
+        assertThat(overviewBlocked.getNumberBlockedTaskManagers()).isEqualTo(1);
+        assertThat(overviewBlocked.getNumberBlockedFreeSlots()).isEqualTo(5);
+        assertThat(overviewBlocked.getTotalResource())
+                .isEqualTo(ResourceProfile.fromResources(1, 1024).multiply(8));
+        assertThat(overviewBlocked.getFreeResource())
+                .isEqualTo(ResourceProfile.fromResources(1, 1024).multiply(3));
+    }
+
+    private void registerTaskExecutorAndSlot(
+            ResourceManagerGateway resourceManagerGateway, ResourceID taskManagerId, int slotCount)
+            throws Exception {
+        final TaskExecutorGateway taskExecutorGateway =
+                new TestingTaskExecutorGatewayBuilder()
+                        .setAddress(UUID.randomUUID().toString())
+                        .createTestingTaskExecutorGateway();
+        rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway);
+        TaskExecutorRegistration taskExecutorRegistration =
+                new TaskExecutorRegistration(
+                        taskExecutorGateway.getAddress(),
+                        taskManagerId,
+                        dataPort,
+                        jmxPort,
+                        hardwareDescription,
+                        new TaskExecutorMemoryConfiguration(
+                                1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
+                        ResourceProfile.fromResources(1, 1024),
+                        ResourceProfile.fromResources(1, 1024).multiply(slotCount),
+                        taskExecutorGateway.getAddress());
+        RegistrationResponse registrationResult =
+                resourceManagerGateway
+                        .registerTaskExecutor(taskExecutorRegistration, TestingUtils.TIMEOUT)
+                        .get();
+        assertThat(registrationResult).isInstanceOf(TaskExecutorRegistrationSuccess.class);
+        InstanceID instanceID =
+                ((TaskExecutorRegistrationSuccess) registrationResult).getRegistrationId();
+        List<SlotStatus> slots = new ArrayList<>();
+        for (int i = 0; i < slotCount; i++) {
+            slots.add(
+                    new SlotStatus(
+                            new SlotID(taskManagerId, i), ResourceProfile.fromResources(1, 1024)));
+        }
+        resourceManagerGateway.sendSlotReport(
+                taskManagerId, instanceID, new SlotReport(slots), Time.seconds(5));
+    }
+
     private JobMasterGateway createJobMasterGateway(Collection<BlockedNode> receivedBlockedNodes) {
         final TestingJobMasterGateway jobMasterGateway =
                 new TestingJobMasterGatewayBuilder()
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
index 82b0236190a..7a5c7493a31 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
@@ -405,7 +405,7 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
     @Override
     public CompletableFuture<ResourceOverview> requestResourceOverview(Time timeout) {
         return CompletableFuture.completedFuture(
-                new ResourceOverview(1, 1, 1, ResourceProfile.ZERO, ResourceProfile.ZERO));
+                new ResourceOverview(1, 1, 1, 0, 0, ResourceProfile.ZERO, ResourceProfile.ZERO));
     }
 
     @Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersionTest.java
index 4dff5fb8ae8..71a2977ecbb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersionTest.java
@@ -31,6 +31,6 @@ public class ClusterOverviewWithVersionTest
 
     @Override
     protected ClusterOverviewWithVersion getTestResponseInstance() {
-        return new ClusterOverviewWithVersion(1, 3, 3, 7, 4, 2, 0, "version", "commit");
+        return new ClusterOverviewWithVersion(2, 6, 3, 1, 3, 7, 4, 2, 0, "version", "commit");
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
index bb313cacd8d..ffd53db2b9b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
@@ -74,7 +74,7 @@ public class TestingRestfulGateway implements RestfulGateway {
             DEFAULT_REQUEST_CLUSTER_OVERVIEW_SUPPLIER =
                     () ->
                             CompletableFuture.completedFuture(
-                                    new ClusterOverview(0, 0, 0, 0, 0, 0, 0));
+                                    new ClusterOverview(0, 0, 0, 0, 0, 0, 0, 0, 0));
     static final Supplier<CompletableFuture<Collection<String>>>
             DEFAULT_REQUEST_METRIC_QUERY_SERVICE_PATHS_SUPPLIER =
                     () -> CompletableFuture.completedFuture(Collections.emptyList());