You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/08/01 10:30:56 UTC

[flink] 03/06: [FLINK-28588][rest] Add blocked task manager count and blocked slot count in ResourceOverview and ClusterOverview

This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0263b55288be7b569f56dd42a94c5e48bcc1607b
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Tue Jul 26 16:16:45 2022 +0800

    [FLINK-28588][rest] Add blocked task manager count and blocked slot count in ResourceOverview and ClusterOverview
---
 .../src/test/resources/rest_api_v1.snapshot        |  6 ++
 .../messages/webmonitor/ClusterOverview.java       | 52 +++++++++---
 .../runtime/resourcemanager/ResourceManager.java   | 26 +++++-
 .../runtime/resourcemanager/ResourceOverview.java  | 18 +++-
 .../messages/ClusterOverviewWithVersion.java       | 26 +++---
 .../resourcemanager/ResourceManagerTest.java       | 96 ++++++++++++++++++++++
 .../utils/TestingResourceManagerGateway.java       |  2 +-
 .../messages/ClusterOverviewWithVersionTest.java   |  2 +-
 .../runtime/webmonitor/TestingRestfulGateway.java  |  2 +-
 9 files changed, 196 insertions(+), 34 deletions(-)

diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 427f59c2723..873e5062d7d 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -3225,6 +3225,12 @@
         "slots-available" : {
           "type" : "integer"
         },
+        "taskmanagers-blocked" : {
+          "type" : "integer"
+        },
+        "slots-free-and-blocked" : {
+          "type" : "integer"
+        },
         "jobs-running" : {
           "type" : "integer"
         },
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java
index 34bf05d0711..7ea327c8029 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java
@@ -21,8 +21,12 @@ package org.apache.flink.runtime.messages.webmonitor;
 import org.apache.flink.runtime.resourcemanager.ResourceOverview;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
+import javax.annotation.Nullable;
+
 /**
  * Response to the {@link RequestStatusOverview} message, carrying a description of the Flink
  * cluster status.
@@ -34,6 +38,8 @@ public class ClusterOverview extends JobsOverview {
     public static final String FIELD_NAME_TASKMANAGERS = "taskmanagers";
     public static final String FIELD_NAME_SLOTS_TOTAL = "slots-total";
     public static final String FIELD_NAME_SLOTS_AVAILABLE = "slots-available";
+    public static final String FIELD_NAME_TASKMANAGERS_BLOCKED = "taskmanagers-blocked";
+    public static final String FIELD_NAME_SLOTS_FREE_AND_BLOCKED = "slots-free-and-blocked";
 
     @JsonProperty(FIELD_NAME_TASKMANAGERS)
     private final int numTaskManagersConnected;
@@ -44,11 +50,24 @@ public class ClusterOverview extends JobsOverview {
     @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE)
     private final int numSlotsAvailable;
 
+    @JsonProperty(FIELD_NAME_TASKMANAGERS_BLOCKED)
+    @JsonInclude(Include.NON_DEFAULT)
+    private final int numTaskManagersBlocked;
+
+    @JsonProperty(FIELD_NAME_SLOTS_FREE_AND_BLOCKED)
+    @JsonInclude(Include.NON_DEFAULT)
+    private final int numSlotsFreeAndBlocked;
+
     @JsonCreator
+    // numTaskManagersBlocked and numSlotsFreeAndBlocked is Nullable since Jackson will assign null
+    // if the field is absent while parsing
     public ClusterOverview(
             @JsonProperty(FIELD_NAME_TASKMANAGERS) int numTaskManagersConnected,
             @JsonProperty(FIELD_NAME_SLOTS_TOTAL) int numSlotsTotal,
             @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) int numSlotsAvailable,
+            @JsonProperty(FIELD_NAME_TASKMANAGERS_BLOCKED) @Nullable Integer numTaskManagersBlocked,
+            @JsonProperty(FIELD_NAME_SLOTS_FREE_AND_BLOCKED) @Nullable
+                    Integer numSlotsFreeAndBlocked,
             @JsonProperty(FIELD_NAME_JOBS_RUNNING) int numJobsRunningOrPending,
             @JsonProperty(FIELD_NAME_JOBS_FINISHED) int numJobsFinished,
             @JsonProperty(FIELD_NAME_JOBS_CANCELLED) int numJobsCancelled,
@@ -59,18 +78,8 @@ public class ClusterOverview extends JobsOverview {
         this.numTaskManagersConnected = numTaskManagersConnected;
         this.numSlotsTotal = numSlotsTotal;
         this.numSlotsAvailable = numSlotsAvailable;
-    }
-
-    public ClusterOverview(
-            int numTaskManagersConnected,
-            int numSlotsTotal,
-            int numSlotsAvailable,
-            JobsOverview jobs1,
-            JobsOverview jobs2) {
-        super(jobs1, jobs2);
-        this.numTaskManagersConnected = numTaskManagersConnected;
-        this.numSlotsTotal = numSlotsTotal;
-        this.numSlotsAvailable = numSlotsAvailable;
+        this.numTaskManagersBlocked = numTaskManagersBlocked == null ? 0 : numTaskManagersBlocked;
+        this.numSlotsFreeAndBlocked = numSlotsFreeAndBlocked == null ? 0 : numSlotsFreeAndBlocked;
     }
 
     public ClusterOverview(ResourceOverview resourceOverview, JobsOverview jobsOverview) {
@@ -78,6 +87,8 @@ public class ClusterOverview extends JobsOverview {
                 resourceOverview.getNumberTaskManagers(),
                 resourceOverview.getNumberRegisteredSlots(),
                 resourceOverview.getNumberFreeSlots(),
+                resourceOverview.getNumberBlockedTaskManagers(),
+                resourceOverview.getNumberBlockedFreeSlots(),
                 jobsOverview.getNumJobsRunningOrPending(),
                 jobsOverview.getNumJobsFinished(),
                 jobsOverview.getNumJobsCancelled(),
@@ -96,6 +107,13 @@ public class ClusterOverview extends JobsOverview {
         return numSlotsAvailable;
     }
 
+    public int getNumTaskManagersBlocked() {
+        return numTaskManagersBlocked;
+    }
+
+    public int getNumSlotsFreeAndBlocked() {
+        return numSlotsFreeAndBlocked;
+    }
     // ------------------------------------------------------------------------
 
     @Override
@@ -107,6 +125,8 @@ public class ClusterOverview extends JobsOverview {
             return this.numTaskManagersConnected == that.numTaskManagersConnected
                     && this.numSlotsTotal == that.numSlotsTotal
                     && this.numSlotsAvailable == that.numSlotsAvailable
+                    && this.numTaskManagersBlocked == that.numTaskManagersBlocked
+                    && this.numSlotsFreeAndBlocked == that.numSlotsFreeAndBlocked
                     && this.getNumJobsRunningOrPending() == that.getNumJobsRunningOrPending()
                     && this.getNumJobsFinished() == that.getNumJobsFinished()
                     && this.getNumJobsCancelled() == that.getNumJobsCancelled()
@@ -122,6 +142,8 @@ public class ClusterOverview extends JobsOverview {
         result = 31 * result + numTaskManagersConnected;
         result = 31 * result + numSlotsTotal;
         result = 31 * result + numSlotsAvailable;
+        result = 31 * result + numTaskManagersBlocked;
+        result = 31 * result + numSlotsFreeAndBlocked;
         return result;
     }
 
@@ -130,10 +152,16 @@ public class ClusterOverview extends JobsOverview {
         return "StatusOverview {"
                 + "numTaskManagersConnected="
                 + numTaskManagersConnected
+                + (numTaskManagersBlocked == 0
+                        ? ""
+                        : (", numTaskManagersBlocked=" + numTaskManagersBlocked))
                 + ", numSlotsTotal="
                 + numSlotsTotal
                 + ", numSlotsAvailable="
                 + numSlotsAvailable
+                + (numSlotsFreeAndBlocked == 0
+                        ? ""
+                        : (", numSlotsFreeAndBlocked=" + numSlotsFreeAndBlocked))
                 + ", numJobsRunningOrPending="
                 + getNumJobsRunningOrPending()
                 + ", numJobsFinished="
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index fb85cf88231..8b4f26ea68d 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -688,15 +688,34 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
     @Override
     public CompletableFuture<ResourceOverview> requestResourceOverview(Time timeout) {
         final int numberSlots = slotManager.getNumberRegisteredSlots();
-        final int numberFreeSlots = slotManager.getNumberFreeSlots();
         final ResourceProfile totalResource = slotManager.getRegisteredResource();
-        final ResourceProfile freeResource = slotManager.getFreeResource();
 
+        int numberFreeSlots = slotManager.getNumberFreeSlots();
+        ResourceProfile freeResource = slotManager.getFreeResource();
+
+        int blockedTaskManagers = 0;
+        int totalBlockedFreeSlots = 0;
+        if (!blocklistHandler.getAllBlockedNodeIds().isEmpty()) {
+            for (WorkerRegistration<WorkerType> registration : taskExecutors.values()) {
+                if (blocklistHandler.isBlockedTaskManager(registration.getResourceID())) {
+                    blockedTaskManagers++;
+                    int blockedFreeSlots =
+                            slotManager.getNumberFreeSlotsOf(registration.getInstanceID());
+                    totalBlockedFreeSlots += blockedFreeSlots;
+                    numberFreeSlots -= blockedFreeSlots;
+                    freeResource =
+                            freeResource.subtract(
+                                    slotManager.getFreeResourceOf(registration.getInstanceID()));
+                }
+            }
+        }
         return CompletableFuture.completedFuture(
                 new ResourceOverview(
                         taskExecutors.size(),
                         numberSlots,
                         numberFreeSlots,
+                        blockedTaskManagers,
+                        totalBlockedFreeSlots,
                         totalResource,
                         freeResource));
     }
@@ -866,7 +885,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
     //  Internal methods
     // ------------------------------------------------------------------------
 
-    private String getNodeIdOfTaskManager(ResourceID taskManagerId) {
+    @VisibleForTesting
+    String getNodeIdOfTaskManager(ResourceID taskManagerId) {
         checkState(taskExecutors.containsKey(taskManagerId));
         return taskExecutors.get(taskManagerId).getNodeId();
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java
index 8e9ee2dad1f..d68b17346fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java
@@ -28,7 +28,7 @@ public class ResourceOverview implements Serializable {
     private static final long serialVersionUID = 7618746920569224557L;
 
     private static final ResourceOverview EMPTY_RESOURCE_OVERVIEW =
-            new ResourceOverview(0, 0, 0, ResourceProfile.ZERO, ResourceProfile.ZERO);
+            new ResourceOverview(0, 0, 0, 0, 0, ResourceProfile.ZERO, ResourceProfile.ZERO);
 
     private final int numberTaskManagers;
 
@@ -36,6 +36,10 @@ public class ResourceOverview implements Serializable {
 
     private final int numberFreeSlots;
 
+    private final int numberBlockedTaskManagers;
+
+    private final int numberBlockedFreeSlots;
+
     private final ResourceProfile totalResource;
 
     private final ResourceProfile freeResource;
@@ -44,11 +48,15 @@ public class ResourceOverview implements Serializable {
             int numberTaskManagers,
             int numberRegisteredSlots,
             int numberFreeSlots,
+            int numberBlockedTaskManagers,
+            int numberBlockedFreeSlots,
             ResourceProfile totalResource,
             ResourceProfile freeResource) {
         this.numberTaskManagers = numberTaskManagers;
         this.numberRegisteredSlots = numberRegisteredSlots;
         this.numberFreeSlots = numberFreeSlots;
+        this.numberBlockedTaskManagers = numberBlockedTaskManagers;
+        this.numberBlockedFreeSlots = numberBlockedFreeSlots;
         this.totalResource = totalResource;
         this.freeResource = freeResource;
     }
@@ -65,6 +73,14 @@ public class ResourceOverview implements Serializable {
         return numberFreeSlots;
     }
 
+    public int getNumberBlockedTaskManagers() {
+        return numberBlockedTaskManagers;
+    }
+
+    public int getNumberBlockedFreeSlots() {
+        return numberBlockedFreeSlots;
+    }
+
     public ResourceProfile getTotalResource() {
         return totalResource;
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersion.java
index fa40db26cfd..7beaa32ab65 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersion.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersion.java
@@ -19,13 +19,14 @@
 package org.apache.flink.runtime.rest.handler.legacy.messages;
 
 import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
-import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
+import javax.annotation.Nullable;
+
 import java.util.Objects;
 
 /** Cluster overview message including the current Flink version and commit id. */
@@ -43,10 +44,15 @@ public class ClusterOverviewWithVersion extends ClusterOverview implements Respo
     private final String commitId;
 
     @JsonCreator
+    // numTaskManagersBlocked and numSlotsFreeAndBlocked is Nullable since Jackson will assign null
+    // if the field is absent while parsing
     public ClusterOverviewWithVersion(
             @JsonProperty(FIELD_NAME_TASKMANAGERS) int numTaskManagersConnected,
             @JsonProperty(FIELD_NAME_SLOTS_TOTAL) int numSlotsTotal,
             @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) int numSlotsAvailable,
+            @JsonProperty(FIELD_NAME_TASKMANAGERS_BLOCKED) @Nullable Integer numTaskManagersBlocked,
+            @JsonProperty(FIELD_NAME_SLOTS_FREE_AND_BLOCKED) @Nullable
+                    Integer numSlotsFreeAndBlocked,
             @JsonProperty(FIELD_NAME_JOBS_RUNNING) int numJobsRunningOrPending,
             @JsonProperty(FIELD_NAME_JOBS_FINISHED) int numJobsFinished,
             @JsonProperty(FIELD_NAME_JOBS_CANCELLED) int numJobsCancelled,
@@ -57,6 +63,8 @@ public class ClusterOverviewWithVersion extends ClusterOverview implements Respo
                 numTaskManagersConnected,
                 numSlotsTotal,
                 numSlotsAvailable,
+                numTaskManagersBlocked,
+                numSlotsFreeAndBlocked,
                 numJobsRunningOrPending,
                 numJobsFinished,
                 numJobsCancelled,
@@ -66,26 +74,14 @@ public class ClusterOverviewWithVersion extends ClusterOverview implements Respo
         this.commitId = Preconditions.checkNotNull(commitId);
     }
 
-    public ClusterOverviewWithVersion(
-            int numTaskManagersConnected,
-            int numSlotsTotal,
-            int numSlotsAvailable,
-            JobsOverview jobs1,
-            JobsOverview jobs2,
-            String version,
-            String commitId) {
-        super(numTaskManagersConnected, numSlotsTotal, numSlotsAvailable, jobs1, jobs2);
-
-        this.version = Preconditions.checkNotNull(version);
-        this.commitId = Preconditions.checkNotNull(commitId);
-    }
-
     public static ClusterOverviewWithVersion fromStatusOverview(
             ClusterOverview statusOverview, String version, String commitId) {
         return new ClusterOverviewWithVersion(
                 statusOverview.getNumTaskManagersConnected(),
                 statusOverview.getNumSlotsTotal(),
                 statusOverview.getNumSlotsAvailable(),
+                statusOverview.getNumTaskManagersBlocked(),
+                statusOverview.getNumSlotsFreeAndBlocked(),
                 statusOverview.getNumJobsRunningOrPending(),
                 statusOverview.getNumJobsFinished(),
                 statusOverview.getNumJobsCancelled(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index eba7a62bd5b..d318fd2449f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -28,9 +28,11 @@ import org.apache.flink.runtime.blocklist.DefaultBlocklistHandler;
 import org.apache.flink.runtime.blocklist.NoOpBlocklistHandler;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
@@ -52,14 +54,18 @@ import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
 import org.apache.flink.runtime.security.token.NoOpDelegationTokenManager;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 import org.apache.flink.runtime.slots.ResourceRequirements;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.util.function.ThrowingConsumer;
 
 import org.junit.jupiter.api.AfterAll;
@@ -72,6 +78,7 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -670,6 +677,95 @@ class ResourceManagerTest {
         assertThat(receivedBlockedNodes2).containsExactlyInAnyOrder(blockedNode1, blockedNode2);
     }
 
+    @Test
+    void testResourceOverviewWithBlockedSlots() throws Exception {
+        ManuallyTriggeredScheduledExecutor executor = new ManuallyTriggeredScheduledExecutor();
+        final SlotManager slotManager = DeclarativeSlotManagerBuilder.newBuilder(executor).build();
+        resourceManager =
+                new ResourceManagerBuilder()
+                        .withSlotManager(slotManager)
+                        .withBlocklistHandlerFactory(
+                                new DefaultBlocklistHandler.Factory(Duration.ofMillis(100L)))
+                        .buildAndStart();
+
+        final ResourceManagerGateway resourceManagerGateway =
+                resourceManager.getSelfGateway(ResourceManagerGateway.class);
+
+        ResourceID taskExecutor = ResourceID.generate();
+        ResourceID taskExecutorToBlock = ResourceID.generate();
+        registerTaskExecutorAndSlot(resourceManagerGateway, taskExecutor, 3);
+        registerTaskExecutorAndSlot(resourceManagerGateway, taskExecutorToBlock, 5);
+        executor.triggerAll();
+
+        ResourceOverview overview =
+                resourceManagerGateway.requestResourceOverview(Time.seconds(5)).get();
+        assertThat(overview.getNumberTaskManagers()).isEqualTo(2);
+        assertThat(overview.getNumberRegisteredSlots()).isEqualTo(8);
+        assertThat(overview.getNumberFreeSlots()).isEqualTo(8);
+        assertThat(overview.getNumberBlockedTaskManagers()).isEqualTo(0);
+        assertThat(overview.getNumberBlockedFreeSlots()).isEqualTo(0);
+        assertThat(overview.getTotalResource())
+                .isEqualTo(ResourceProfile.fromResources(1, 1024).multiply(8));
+        assertThat(overview.getFreeResource())
+                .isEqualTo(ResourceProfile.fromResources(1, 1024).multiply(8));
+
+        resourceManagerGateway.notifyNewBlockedNodes(
+                Collections.singleton(
+                        new BlockedNode(
+                                resourceManager.getNodeIdOfTaskManager(taskExecutorToBlock),
+                                "Test cause",
+                                Long.MAX_VALUE)));
+
+        ResourceOverview overviewBlocked =
+                resourceManagerGateway.requestResourceOverview(Time.seconds(5)).get();
+        assertThat(overviewBlocked.getNumberTaskManagers()).isEqualTo(2);
+        assertThat(overviewBlocked.getNumberRegisteredSlots()).isEqualTo(8);
+        assertThat(overviewBlocked.getNumberFreeSlots()).isEqualTo(3);
+        assertThat(overviewBlocked.getNumberBlockedTaskManagers()).isEqualTo(1);
+        assertThat(overviewBlocked.getNumberBlockedFreeSlots()).isEqualTo(5);
+        assertThat(overviewBlocked.getTotalResource())
+                .isEqualTo(ResourceProfile.fromResources(1, 1024).multiply(8));
+        assertThat(overviewBlocked.getFreeResource())
+                .isEqualTo(ResourceProfile.fromResources(1, 1024).multiply(3));
+    }
+
+    private void registerTaskExecutorAndSlot(
+            ResourceManagerGateway resourceManagerGateway, ResourceID taskManagerId, int slotCount)
+            throws Exception {
+        final TaskExecutorGateway taskExecutorGateway =
+                new TestingTaskExecutorGatewayBuilder()
+                        .setAddress(UUID.randomUUID().toString())
+                        .createTestingTaskExecutorGateway();
+        rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway);
+        TaskExecutorRegistration taskExecutorRegistration =
+                new TaskExecutorRegistration(
+                        taskExecutorGateway.getAddress(),
+                        taskManagerId,
+                        dataPort,
+                        jmxPort,
+                        hardwareDescription,
+                        new TaskExecutorMemoryConfiguration(
+                                1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
+                        ResourceProfile.fromResources(1, 1024),
+                        ResourceProfile.fromResources(1, 1024).multiply(slotCount),
+                        taskExecutorGateway.getAddress());
+        RegistrationResponse registrationResult =
+                resourceManagerGateway
+                        .registerTaskExecutor(taskExecutorRegistration, TestingUtils.TIMEOUT)
+                        .get();
+        assertThat(registrationResult).isInstanceOf(TaskExecutorRegistrationSuccess.class);
+        InstanceID instanceID =
+                ((TaskExecutorRegistrationSuccess) registrationResult).getRegistrationId();
+        List<SlotStatus> slots = new ArrayList<>();
+        for (int i = 0; i < slotCount; i++) {
+            slots.add(
+                    new SlotStatus(
+                            new SlotID(taskManagerId, i), ResourceProfile.fromResources(1, 1024)));
+        }
+        resourceManagerGateway.sendSlotReport(
+                taskManagerId, instanceID, new SlotReport(slots), Time.seconds(5));
+    }
+
     private JobMasterGateway createJobMasterGateway(Collection<BlockedNode> receivedBlockedNodes) {
         final TestingJobMasterGateway jobMasterGateway =
                 new TestingJobMasterGatewayBuilder()
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
index 82b0236190a..7a5c7493a31 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
@@ -405,7 +405,7 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
     @Override
     public CompletableFuture<ResourceOverview> requestResourceOverview(Time timeout) {
         return CompletableFuture.completedFuture(
-                new ResourceOverview(1, 1, 1, ResourceProfile.ZERO, ResourceProfile.ZERO));
+                new ResourceOverview(1, 1, 1, 0, 0, ResourceProfile.ZERO, ResourceProfile.ZERO));
     }
 
     @Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersionTest.java
index 4dff5fb8ae8..71a2977ecbb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersionTest.java
@@ -31,6 +31,6 @@ public class ClusterOverviewWithVersionTest
 
     @Override
     protected ClusterOverviewWithVersion getTestResponseInstance() {
-        return new ClusterOverviewWithVersion(1, 3, 3, 7, 4, 2, 0, "version", "commit");
+        return new ClusterOverviewWithVersion(2, 6, 3, 1, 3, 7, 4, 2, 0, "version", "commit");
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
index bb313cacd8d..ffd53db2b9b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
@@ -74,7 +74,7 @@ public class TestingRestfulGateway implements RestfulGateway {
             DEFAULT_REQUEST_CLUSTER_OVERVIEW_SUPPLIER =
                     () ->
                             CompletableFuture.completedFuture(
-                                    new ClusterOverview(0, 0, 0, 0, 0, 0, 0));
+                                    new ClusterOverview(0, 0, 0, 0, 0, 0, 0, 0, 0));
     static final Supplier<CompletableFuture<Collection<String>>>
             DEFAULT_REQUEST_METRIC_QUERY_SERVICE_PATHS_SUPPLIER =
                     () -> CompletableFuture.completedFuture(Collections.emptyList());