You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/08/01 10:30:56 UTC
[flink] 03/06: [FLINK-28588][rest] Add blocked task manager count and blocked slot count in ResourceOverview and ClusterOverview
This is an automated email from the ASF dual-hosted git repository.
gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0263b55288be7b569f56dd42a94c5e48bcc1607b
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Tue Jul 26 16:16:45 2022 +0800
[FLINK-28588][rest] Add blocked task manager count and blocked slot count in ResourceOverview and ClusterOverview
---
.../src/test/resources/rest_api_v1.snapshot | 6 ++
.../messages/webmonitor/ClusterOverview.java | 52 +++++++++---
.../runtime/resourcemanager/ResourceManager.java | 26 +++++-
.../runtime/resourcemanager/ResourceOverview.java | 18 +++-
.../messages/ClusterOverviewWithVersion.java | 26 +++---
.../resourcemanager/ResourceManagerTest.java | 96 ++++++++++++++++++++++
.../utils/TestingResourceManagerGateway.java | 2 +-
.../messages/ClusterOverviewWithVersionTest.java | 2 +-
.../runtime/webmonitor/TestingRestfulGateway.java | 2 +-
9 files changed, 196 insertions(+), 34 deletions(-)
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 427f59c2723..873e5062d7d 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -3225,6 +3225,12 @@
"slots-available" : {
"type" : "integer"
},
+ "taskmanagers-blocked" : {
+ "type" : "integer"
+ },
+ "slots-free-and-blocked" : {
+ "type" : "integer"
+ },
"jobs-running" : {
"type" : "integer"
},
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java
index 34bf05d0711..7ea327c8029 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java
@@ -21,8 +21,12 @@ package org.apache.flink.runtime.messages.webmonitor;
import org.apache.flink.runtime.resourcemanager.ResourceOverview;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import javax.annotation.Nullable;
+
/**
* Response to the {@link RequestStatusOverview} message, carrying a description of the Flink
* cluster status.
@@ -34,6 +38,8 @@ public class ClusterOverview extends JobsOverview {
public static final String FIELD_NAME_TASKMANAGERS = "taskmanagers";
public static final String FIELD_NAME_SLOTS_TOTAL = "slots-total";
public static final String FIELD_NAME_SLOTS_AVAILABLE = "slots-available";
+ public static final String FIELD_NAME_TASKMANAGERS_BLOCKED = "taskmanagers-blocked";
+ public static final String FIELD_NAME_SLOTS_FREE_AND_BLOCKED = "slots-free-and-blocked";
@JsonProperty(FIELD_NAME_TASKMANAGERS)
private final int numTaskManagersConnected;
@@ -44,11 +50,24 @@ public class ClusterOverview extends JobsOverview {
@JsonProperty(FIELD_NAME_SLOTS_AVAILABLE)
private final int numSlotsAvailable;
+ @JsonProperty(FIELD_NAME_TASKMANAGERS_BLOCKED)
+ @JsonInclude(Include.NON_DEFAULT)
+ private final int numTaskManagersBlocked;
+
+ @JsonProperty(FIELD_NAME_SLOTS_FREE_AND_BLOCKED)
+ @JsonInclude(Include.NON_DEFAULT)
+ private final int numSlotsFreeAndBlocked;
+
@JsonCreator
+ // numTaskManagersBlocked and numSlotsFreeAndBlocked is Nullable since Jackson will assign null
+ // if the field is absent while parsing
public ClusterOverview(
@JsonProperty(FIELD_NAME_TASKMANAGERS) int numTaskManagersConnected,
@JsonProperty(FIELD_NAME_SLOTS_TOTAL) int numSlotsTotal,
@JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) int numSlotsAvailable,
+ @JsonProperty(FIELD_NAME_TASKMANAGERS_BLOCKED) @Nullable Integer numTaskManagersBlocked,
+ @JsonProperty(FIELD_NAME_SLOTS_FREE_AND_BLOCKED) @Nullable
+ Integer numSlotsFreeAndBlocked,
@JsonProperty(FIELD_NAME_JOBS_RUNNING) int numJobsRunningOrPending,
@JsonProperty(FIELD_NAME_JOBS_FINISHED) int numJobsFinished,
@JsonProperty(FIELD_NAME_JOBS_CANCELLED) int numJobsCancelled,
@@ -59,18 +78,8 @@ public class ClusterOverview extends JobsOverview {
this.numTaskManagersConnected = numTaskManagersConnected;
this.numSlotsTotal = numSlotsTotal;
this.numSlotsAvailable = numSlotsAvailable;
- }
-
- public ClusterOverview(
- int numTaskManagersConnected,
- int numSlotsTotal,
- int numSlotsAvailable,
- JobsOverview jobs1,
- JobsOverview jobs2) {
- super(jobs1, jobs2);
- this.numTaskManagersConnected = numTaskManagersConnected;
- this.numSlotsTotal = numSlotsTotal;
- this.numSlotsAvailable = numSlotsAvailable;
+ this.numTaskManagersBlocked = numTaskManagersBlocked == null ? 0 : numTaskManagersBlocked;
+ this.numSlotsFreeAndBlocked = numSlotsFreeAndBlocked == null ? 0 : numSlotsFreeAndBlocked;
}
public ClusterOverview(ResourceOverview resourceOverview, JobsOverview jobsOverview) {
@@ -78,6 +87,8 @@ public class ClusterOverview extends JobsOverview {
resourceOverview.getNumberTaskManagers(),
resourceOverview.getNumberRegisteredSlots(),
resourceOverview.getNumberFreeSlots(),
+ resourceOverview.getNumberBlockedTaskManagers(),
+ resourceOverview.getNumberBlockedFreeSlots(),
jobsOverview.getNumJobsRunningOrPending(),
jobsOverview.getNumJobsFinished(),
jobsOverview.getNumJobsCancelled(),
@@ -96,6 +107,13 @@ public class ClusterOverview extends JobsOverview {
return numSlotsAvailable;
}
+ public int getNumTaskManagersBlocked() {
+ return numTaskManagersBlocked;
+ }
+
+ public int getNumSlotsFreeAndBlocked() {
+ return numSlotsFreeAndBlocked;
+ }
// ------------------------------------------------------------------------
@Override
@@ -107,6 +125,8 @@ public class ClusterOverview extends JobsOverview {
return this.numTaskManagersConnected == that.numTaskManagersConnected
&& this.numSlotsTotal == that.numSlotsTotal
&& this.numSlotsAvailable == that.numSlotsAvailable
+ && this.numTaskManagersBlocked == that.numTaskManagersBlocked
+ && this.numSlotsFreeAndBlocked == that.numSlotsFreeAndBlocked
&& this.getNumJobsRunningOrPending() == that.getNumJobsRunningOrPending()
&& this.getNumJobsFinished() == that.getNumJobsFinished()
&& this.getNumJobsCancelled() == that.getNumJobsCancelled()
@@ -122,6 +142,8 @@ public class ClusterOverview extends JobsOverview {
result = 31 * result + numTaskManagersConnected;
result = 31 * result + numSlotsTotal;
result = 31 * result + numSlotsAvailable;
+ result = 31 * result + numTaskManagersBlocked;
+ result = 31 * result + numSlotsFreeAndBlocked;
return result;
}
@@ -130,10 +152,16 @@ public class ClusterOverview extends JobsOverview {
return "StatusOverview {"
+ "numTaskManagersConnected="
+ numTaskManagersConnected
+ + (numTaskManagersBlocked == 0
+ ? ""
+ : (", numTaskManagersBlocked=" + numTaskManagersBlocked))
+ ", numSlotsTotal="
+ numSlotsTotal
+ ", numSlotsAvailable="
+ numSlotsAvailable
+ + (numSlotsFreeAndBlocked == 0
+ ? ""
+ : (", numSlotsFreeAndBlocked=" + numSlotsFreeAndBlocked))
+ ", numJobsRunningOrPending="
+ getNumJobsRunningOrPending()
+ ", numJobsFinished="
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index fb85cf88231..8b4f26ea68d 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -688,15 +688,34 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
@Override
public CompletableFuture<ResourceOverview> requestResourceOverview(Time timeout) {
final int numberSlots = slotManager.getNumberRegisteredSlots();
- final int numberFreeSlots = slotManager.getNumberFreeSlots();
final ResourceProfile totalResource = slotManager.getRegisteredResource();
- final ResourceProfile freeResource = slotManager.getFreeResource();
+ int numberFreeSlots = slotManager.getNumberFreeSlots();
+ ResourceProfile freeResource = slotManager.getFreeResource();
+
+ int blockedTaskManagers = 0;
+ int totalBlockedFreeSlots = 0;
+ if (!blocklistHandler.getAllBlockedNodeIds().isEmpty()) {
+ for (WorkerRegistration<WorkerType> registration : taskExecutors.values()) {
+ if (blocklistHandler.isBlockedTaskManager(registration.getResourceID())) {
+ blockedTaskManagers++;
+ int blockedFreeSlots =
+ slotManager.getNumberFreeSlotsOf(registration.getInstanceID());
+ totalBlockedFreeSlots += blockedFreeSlots;
+ numberFreeSlots -= blockedFreeSlots;
+ freeResource =
+ freeResource.subtract(
+ slotManager.getFreeResourceOf(registration.getInstanceID()));
+ }
+ }
+ }
return CompletableFuture.completedFuture(
new ResourceOverview(
taskExecutors.size(),
numberSlots,
numberFreeSlots,
+ blockedTaskManagers,
+ totalBlockedFreeSlots,
totalResource,
freeResource));
}
@@ -866,7 +885,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
// Internal methods
// ------------------------------------------------------------------------
- private String getNodeIdOfTaskManager(ResourceID taskManagerId) {
+ @VisibleForTesting
+ String getNodeIdOfTaskManager(ResourceID taskManagerId) {
checkState(taskExecutors.containsKey(taskManagerId));
return taskExecutors.get(taskManagerId).getNodeId();
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java
index 8e9ee2dad1f..d68b17346fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java
@@ -28,7 +28,7 @@ public class ResourceOverview implements Serializable {
private static final long serialVersionUID = 7618746920569224557L;
private static final ResourceOverview EMPTY_RESOURCE_OVERVIEW =
- new ResourceOverview(0, 0, 0, ResourceProfile.ZERO, ResourceProfile.ZERO);
+ new ResourceOverview(0, 0, 0, 0, 0, ResourceProfile.ZERO, ResourceProfile.ZERO);
private final int numberTaskManagers;
@@ -36,6 +36,10 @@ public class ResourceOverview implements Serializable {
private final int numberFreeSlots;
+ private final int numberBlockedTaskManagers;
+
+ private final int numberBlockedFreeSlots;
+
private final ResourceProfile totalResource;
private final ResourceProfile freeResource;
@@ -44,11 +48,15 @@ public class ResourceOverview implements Serializable {
int numberTaskManagers,
int numberRegisteredSlots,
int numberFreeSlots,
+ int numberBlockedTaskManagers,
+ int numberBlockedFreeSlots,
ResourceProfile totalResource,
ResourceProfile freeResource) {
this.numberTaskManagers = numberTaskManagers;
this.numberRegisteredSlots = numberRegisteredSlots;
this.numberFreeSlots = numberFreeSlots;
+ this.numberBlockedTaskManagers = numberBlockedTaskManagers;
+ this.numberBlockedFreeSlots = numberBlockedFreeSlots;
this.totalResource = totalResource;
this.freeResource = freeResource;
}
@@ -65,6 +73,14 @@ public class ResourceOverview implements Serializable {
return numberFreeSlots;
}
+ public int getNumberBlockedTaskManagers() {
+ return numberBlockedTaskManagers;
+ }
+
+ public int getNumberBlockedFreeSlots() {
+ return numberBlockedFreeSlots;
+ }
+
public ResourceProfile getTotalResource() {
return totalResource;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersion.java
index fa40db26cfd..7beaa32ab65 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersion.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersion.java
@@ -19,13 +19,14 @@
package org.apache.flink.runtime.rest.handler.legacy.messages;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
-import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import javax.annotation.Nullable;
+
import java.util.Objects;
/** Cluster overview message including the current Flink version and commit id. */
@@ -43,10 +44,15 @@ public class ClusterOverviewWithVersion extends ClusterOverview implements Respo
private final String commitId;
@JsonCreator
+ // numTaskManagersBlocked and numSlotsFreeAndBlocked is Nullable since Jackson will assign null
+ // if the field is absent while parsing
public ClusterOverviewWithVersion(
@JsonProperty(FIELD_NAME_TASKMANAGERS) int numTaskManagersConnected,
@JsonProperty(FIELD_NAME_SLOTS_TOTAL) int numSlotsTotal,
@JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) int numSlotsAvailable,
+ @JsonProperty(FIELD_NAME_TASKMANAGERS_BLOCKED) @Nullable Integer numTaskManagersBlocked,
+ @JsonProperty(FIELD_NAME_SLOTS_FREE_AND_BLOCKED) @Nullable
+ Integer numSlotsFreeAndBlocked,
@JsonProperty(FIELD_NAME_JOBS_RUNNING) int numJobsRunningOrPending,
@JsonProperty(FIELD_NAME_JOBS_FINISHED) int numJobsFinished,
@JsonProperty(FIELD_NAME_JOBS_CANCELLED) int numJobsCancelled,
@@ -57,6 +63,8 @@ public class ClusterOverviewWithVersion extends ClusterOverview implements Respo
numTaskManagersConnected,
numSlotsTotal,
numSlotsAvailable,
+ numTaskManagersBlocked,
+ numSlotsFreeAndBlocked,
numJobsRunningOrPending,
numJobsFinished,
numJobsCancelled,
@@ -66,26 +74,14 @@ public class ClusterOverviewWithVersion extends ClusterOverview implements Respo
this.commitId = Preconditions.checkNotNull(commitId);
}
- public ClusterOverviewWithVersion(
- int numTaskManagersConnected,
- int numSlotsTotal,
- int numSlotsAvailable,
- JobsOverview jobs1,
- JobsOverview jobs2,
- String version,
- String commitId) {
- super(numTaskManagersConnected, numSlotsTotal, numSlotsAvailable, jobs1, jobs2);
-
- this.version = Preconditions.checkNotNull(version);
- this.commitId = Preconditions.checkNotNull(commitId);
- }
-
public static ClusterOverviewWithVersion fromStatusOverview(
ClusterOverview statusOverview, String version, String commitId) {
return new ClusterOverviewWithVersion(
statusOverview.getNumTaskManagersConnected(),
statusOverview.getNumSlotsTotal(),
statusOverview.getNumSlotsAvailable(),
+ statusOverview.getNumTaskManagersBlocked(),
+ statusOverview.getNumSlotsFreeAndBlocked(),
statusOverview.getNumJobsRunningOrPending(),
statusOverview.getNumJobsFinished(),
statusOverview.getNumJobsCancelled(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index eba7a62bd5b..d318fd2449f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -28,9 +28,11 @@ import org.apache.flink.runtime.blocklist.DefaultBlocklistHandler;
import org.apache.flink.runtime.blocklist.NoOpBlocklistHandler;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
@@ -52,14 +54,18 @@ import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
import org.apache.flink.runtime.security.token.NoOpDelegationTokenManager;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.slots.ResourceRequirements;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.function.ThrowingConsumer;
import org.junit.jupiter.api.AfterAll;
@@ -72,6 +78,7 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -670,6 +677,95 @@ class ResourceManagerTest {
assertThat(receivedBlockedNodes2).containsExactlyInAnyOrder(blockedNode1, blockedNode2);
}
+ @Test
+ void testResourceOverviewWithBlockedSlots() throws Exception {
+ ManuallyTriggeredScheduledExecutor executor = new ManuallyTriggeredScheduledExecutor();
+ final SlotManager slotManager = DeclarativeSlotManagerBuilder.newBuilder(executor).build();
+ resourceManager =
+ new ResourceManagerBuilder()
+ .withSlotManager(slotManager)
+ .withBlocklistHandlerFactory(
+ new DefaultBlocklistHandler.Factory(Duration.ofMillis(100L)))
+ .buildAndStart();
+
+ final ResourceManagerGateway resourceManagerGateway =
+ resourceManager.getSelfGateway(ResourceManagerGateway.class);
+
+ ResourceID taskExecutor = ResourceID.generate();
+ ResourceID taskExecutorToBlock = ResourceID.generate();
+ registerTaskExecutorAndSlot(resourceManagerGateway, taskExecutor, 3);
+ registerTaskExecutorAndSlot(resourceManagerGateway, taskExecutorToBlock, 5);
+ executor.triggerAll();
+
+ ResourceOverview overview =
+ resourceManagerGateway.requestResourceOverview(Time.seconds(5)).get();
+ assertThat(overview.getNumberTaskManagers()).isEqualTo(2);
+ assertThat(overview.getNumberRegisteredSlots()).isEqualTo(8);
+ assertThat(overview.getNumberFreeSlots()).isEqualTo(8);
+ assertThat(overview.getNumberBlockedTaskManagers()).isEqualTo(0);
+ assertThat(overview.getNumberBlockedFreeSlots()).isEqualTo(0);
+ assertThat(overview.getTotalResource())
+ .isEqualTo(ResourceProfile.fromResources(1, 1024).multiply(8));
+ assertThat(overview.getFreeResource())
+ .isEqualTo(ResourceProfile.fromResources(1, 1024).multiply(8));
+
+ resourceManagerGateway.notifyNewBlockedNodes(
+ Collections.singleton(
+ new BlockedNode(
+ resourceManager.getNodeIdOfTaskManager(taskExecutorToBlock),
+ "Test cause",
+ Long.MAX_VALUE)));
+
+ ResourceOverview overviewBlocked =
+ resourceManagerGateway.requestResourceOverview(Time.seconds(5)).get();
+ assertThat(overviewBlocked.getNumberTaskManagers()).isEqualTo(2);
+ assertThat(overviewBlocked.getNumberRegisteredSlots()).isEqualTo(8);
+ assertThat(overviewBlocked.getNumberFreeSlots()).isEqualTo(3);
+ assertThat(overviewBlocked.getNumberBlockedTaskManagers()).isEqualTo(1);
+ assertThat(overviewBlocked.getNumberBlockedFreeSlots()).isEqualTo(5);
+ assertThat(overviewBlocked.getTotalResource())
+ .isEqualTo(ResourceProfile.fromResources(1, 1024).multiply(8));
+ assertThat(overviewBlocked.getFreeResource())
+ .isEqualTo(ResourceProfile.fromResources(1, 1024).multiply(3));
+ }
+
+ private void registerTaskExecutorAndSlot(
+ ResourceManagerGateway resourceManagerGateway, ResourceID taskManagerId, int slotCount)
+ throws Exception {
+ final TaskExecutorGateway taskExecutorGateway =
+ new TestingTaskExecutorGatewayBuilder()
+ .setAddress(UUID.randomUUID().toString())
+ .createTestingTaskExecutorGateway();
+ rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway);
+ TaskExecutorRegistration taskExecutorRegistration =
+ new TaskExecutorRegistration(
+ taskExecutorGateway.getAddress(),
+ taskManagerId,
+ dataPort,
+ jmxPort,
+ hardwareDescription,
+ new TaskExecutorMemoryConfiguration(
+ 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
+ ResourceProfile.fromResources(1, 1024),
+ ResourceProfile.fromResources(1, 1024).multiply(slotCount),
+ taskExecutorGateway.getAddress());
+ RegistrationResponse registrationResult =
+ resourceManagerGateway
+ .registerTaskExecutor(taskExecutorRegistration, TestingUtils.TIMEOUT)
+ .get();
+ assertThat(registrationResult).isInstanceOf(TaskExecutorRegistrationSuccess.class);
+ InstanceID instanceID =
+ ((TaskExecutorRegistrationSuccess) registrationResult).getRegistrationId();
+ List<SlotStatus> slots = new ArrayList<>();
+ for (int i = 0; i < slotCount; i++) {
+ slots.add(
+ new SlotStatus(
+ new SlotID(taskManagerId, i), ResourceProfile.fromResources(1, 1024)));
+ }
+ resourceManagerGateway.sendSlotReport(
+ taskManagerId, instanceID, new SlotReport(slots), Time.seconds(5));
+ }
+
private JobMasterGateway createJobMasterGateway(Collection<BlockedNode> receivedBlockedNodes) {
final TestingJobMasterGateway jobMasterGateway =
new TestingJobMasterGatewayBuilder()
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
index 82b0236190a..7a5c7493a31 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
@@ -405,7 +405,7 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
@Override
public CompletableFuture<ResourceOverview> requestResourceOverview(Time timeout) {
return CompletableFuture.completedFuture(
- new ResourceOverview(1, 1, 1, ResourceProfile.ZERO, ResourceProfile.ZERO));
+ new ResourceOverview(1, 1, 1, 0, 0, ResourceProfile.ZERO, ResourceProfile.ZERO));
}
@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersionTest.java
index 4dff5fb8ae8..71a2977ecbb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersionTest.java
@@ -31,6 +31,6 @@ public class ClusterOverviewWithVersionTest
@Override
protected ClusterOverviewWithVersion getTestResponseInstance() {
- return new ClusterOverviewWithVersion(1, 3, 3, 7, 4, 2, 0, "version", "commit");
+ return new ClusterOverviewWithVersion(2, 6, 3, 1, 3, 7, 4, 2, 0, "version", "commit");
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
index bb313cacd8d..ffd53db2b9b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
@@ -74,7 +74,7 @@ public class TestingRestfulGateway implements RestfulGateway {
DEFAULT_REQUEST_CLUSTER_OVERVIEW_SUPPLIER =
() ->
CompletableFuture.completedFuture(
- new ClusterOverview(0, 0, 0, 0, 0, 0, 0));
+ new ClusterOverview(0, 0, 0, 0, 0, 0, 0, 0, 0));
static final Supplier<CompletableFuture<Collection<String>>>
DEFAULT_REQUEST_METRIC_QUERY_SERVICE_PATHS_SUPPLIER =
() -> CompletableFuture.completedFuture(Collections.emptyList());