You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2021/02/07 11:17:18 UTC
[flink] branch master updated: [FLINK-21047][coordination] Fix the
incorrect registered/free resources information exposed by SlotManager
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0ad9a76 [FLINK-21047][coordination] Fix the incorrect registered/free resources information exposed by SlotManager
0ad9a76 is described below
commit 0ad9a76d8c2fcd0af576cb483daf511f24b517ce
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Thu Feb 4 17:42:38 2021 +0800
[FLINK-21047][coordination] Fix the incorrect registered/free resources information exposed by SlotManager
This closes #14869
---
.../slotmanager/DeclarativeSlotManager.java | 5 +-
.../slotmanager/SlotManagerImpl.java | 38 ++++++++-----
.../slotmanager/TaskExecutorManager.java | 41 +++++++++-----
.../slotmanager/TaskManagerRegistration.java | 21 +++++++-
.../slotmanager/SlotManagerImplTest.java | 63 ++++++++++++++++++++++
.../slotmanager/TaskExecutorManagerTest.java | 45 +++++++++++++++-
6 files changed, 182 insertions(+), 31 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
index 8aed8f7..be62cdd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
@@ -296,7 +296,10 @@ public class DeclarativeSlotManager implements SlotManager {
return false;
} else {
if (!taskExecutorManager.registerTaskManager(
- taskExecutorConnection, initialSlotReport)) {
+ taskExecutorConnection,
+ initialSlotReport,
+ totalResourceProfile,
+ defaultSlotResourceProfile)) {
LOG.debug(
"Task executor {} could not be registered.",
taskExecutorConnection.getResourceID());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
index abd7e0a..87a7650 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
@@ -230,30 +230,38 @@ public class SlotManagerImpl implements SlotManager {
@Override
public ResourceProfile getRegisteredResource() {
- return getResourceFromNumSlots(getNumberRegisteredSlots());
+ return taskManagerRegistrations.values().stream()
+ .map(TaskManagerRegistration::getTotalResource)
+ .reduce(ResourceProfile.ZERO, ResourceProfile::merge);
}
@Override
public ResourceProfile getRegisteredResourceOf(InstanceID instanceID) {
- return getResourceFromNumSlots(getNumberRegisteredSlotsOf(instanceID));
+ return Optional.ofNullable(taskManagerRegistrations.get(instanceID))
+ .map(TaskManagerRegistration::getTotalResource)
+ .orElse(ResourceProfile.ZERO);
}
@Override
public ResourceProfile getFreeResource() {
- return getResourceFromNumSlots(getNumberFreeSlots());
+ return taskManagerRegistrations.values().stream()
+ .map(
+ taskManagerRegistration ->
+ taskManagerRegistration
+ .getDefaultSlotResourceProfile()
+ .multiply(taskManagerRegistration.getNumberFreeSlots()))
+ .reduce(ResourceProfile.ZERO, ResourceProfile::merge);
}
@Override
public ResourceProfile getFreeResourceOf(InstanceID instanceID) {
- return getResourceFromNumSlots(getNumberFreeSlotsOf(instanceID));
- }
-
- private ResourceProfile getResourceFromNumSlots(int numSlots) {
- if (numSlots < 0 || defaultSlotResourceProfile == null) {
- return ResourceProfile.UNKNOWN;
- } else {
- return defaultSlotResourceProfile.multiply(numSlots);
- }
+ return Optional.ofNullable(taskManagerRegistrations.get(instanceID))
+ .map(
+ taskManagerRegistration ->
+ taskManagerRegistration
+ .getDefaultSlotResourceProfile()
+ .multiply(taskManagerRegistration.getNumberFreeSlots()))
+ .orElse(ResourceProfile.ZERO);
}
@VisibleForTesting
@@ -497,7 +505,11 @@ public class SlotManagerImpl implements SlotManager {
}
TaskManagerRegistration taskManagerRegistration =
- new TaskManagerRegistration(taskExecutorConnection, reportedSlots);
+ new TaskManagerRegistration(
+ taskExecutorConnection,
+ reportedSlots,
+ totalResourceProfile,
+ defaultSlotResourceProfile);
taskManagerRegistrations.put(
taskExecutorConnection.getInstanceID(), taskManagerRegistration);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
index d26e8a9..e0519a7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
@@ -149,7 +149,10 @@ class TaskExecutorManager implements AutoCloseable {
}
public boolean registerTaskManager(
- final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {
+ final TaskExecutorConnection taskExecutorConnection,
+ SlotReport initialSlotReport,
+ ResourceProfile totalResourceProfile,
+ ResourceProfile defaultSlotResourceProfile) {
if (isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {
LOG.info(
"The total number of slots exceeds the max limitation {}, releasing the excess task executor.",
@@ -165,7 +168,9 @@ class TaskExecutorManager implements AutoCloseable {
taskExecutorConnection,
StreamSupport.stream(initialSlotReport.spliterator(), false)
.map(SlotStatus::getSlotID)
- .collect(Collectors.toList()));
+ .collect(Collectors.toList()),
+ totalResourceProfile,
+ defaultSlotResourceProfile);
taskManagerRegistrations.put(
taskExecutorConnection.getInstanceID(), taskManagerRegistration);
@@ -402,27 +407,35 @@ class TaskExecutorManager implements AutoCloseable {
// ---------------------------------------------------------------------------------------------
public ResourceProfile getTotalRegisteredResources() {
- return getResourceFromNumSlots(getNumberRegisteredSlots());
+ return taskManagerRegistrations.values().stream()
+ .map(TaskManagerRegistration::getTotalResource)
+ .reduce(ResourceProfile.ZERO, ResourceProfile::merge);
}
public ResourceProfile getTotalRegisteredResourcesOf(InstanceID instanceID) {
- return getResourceFromNumSlots(getNumberRegisteredSlotsOf(instanceID));
+ return Optional.ofNullable(taskManagerRegistrations.get(instanceID))
+ .map(TaskManagerRegistration::getTotalResource)
+ .orElse(ResourceProfile.ZERO);
}
public ResourceProfile getTotalFreeResources() {
- return getResourceFromNumSlots(getNumberFreeSlots());
+ return taskManagerRegistrations.values().stream()
+ .map(
+ taskManagerRegistration ->
+ taskManagerRegistration
+ .getDefaultSlotResourceProfile()
+ .multiply(taskManagerRegistration.getNumberFreeSlots()))
+ .reduce(ResourceProfile.ZERO, ResourceProfile::merge);
}
public ResourceProfile getTotalFreeResourcesOf(InstanceID instanceID) {
- return getResourceFromNumSlots(getNumberFreeSlotsOf(instanceID));
- }
-
- private ResourceProfile getResourceFromNumSlots(int numSlots) {
- if (numSlots < 0 || defaultSlotResourceProfile == null) {
- return ResourceProfile.UNKNOWN;
- } else {
- return defaultSlotResourceProfile.multiply(numSlots);
- }
+ return Optional.ofNullable(taskManagerRegistrations.get(instanceID))
+ .map(
+ taskManagerRegistration ->
+ taskManagerRegistration
+ .getDefaultSlotResourceProfile()
+ .multiply(taskManagerRegistration.getNumberFreeSlots()))
+ .orElse(ResourceProfile.ZERO);
}
public Iterable<SlotID> getSlotsOf(InstanceID instanceId) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
index 08ef068..02915dc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.resourcemanager.slotmanager;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
@@ -30,6 +31,10 @@ public class TaskManagerRegistration {
private final TaskExecutorConnection taskManagerConnection;
+ private final ResourceProfile defaultSlotResourceProfile;
+
+ private final ResourceProfile totalResource;
+
private final HashSet<SlotID> slots;
private int numberFreeSlots;
@@ -38,12 +43,18 @@ public class TaskManagerRegistration {
private long idleSince;
public TaskManagerRegistration(
- TaskExecutorConnection taskManagerConnection, Collection<SlotID> slots) {
+ TaskExecutorConnection taskManagerConnection,
+ Collection<SlotID> slots,
+ ResourceProfile totalResourceProfile,
+ ResourceProfile defaultSlotResourceProfile) {
this.taskManagerConnection =
Preconditions.checkNotNull(taskManagerConnection, "taskManagerConnection");
Preconditions.checkNotNull(slots, "slots");
+ this.totalResource = Preconditions.checkNotNull(totalResourceProfile);
+ this.defaultSlotResourceProfile = Preconditions.checkNotNull(defaultSlotResourceProfile);
+
this.slots = new HashSet<>(slots);
this.numberFreeSlots = slots.size();
@@ -67,6 +78,14 @@ public class TaskManagerRegistration {
return numberFreeSlots;
}
+ public ResourceProfile getDefaultSlotResourceProfile() {
+ return defaultSlotResourceProfile;
+ }
+
+ public ResourceProfile getTotalResource() {
+ return totalResource;
+ }
+
public void freeSlot() {
Preconditions.checkState(
numberFreeSlots < slots.size(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImplTest.java
index 59b92b7..0191ced 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImplTest.java
@@ -1504,6 +1504,69 @@ public class SlotManagerImplTest extends TestLogger {
}
}
+ @Test
+ public void testGetResourceOverview() throws Exception {
+ final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
+ final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
+
+ final TaskExecutorGateway taskExecutorGateway =
+ new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
+ final ResourceID resourceId1 = ResourceID.generate();
+ final ResourceID resourceId2 = ResourceID.generate();
+ final TaskExecutorConnection taskManagerConnection1 =
+ new TaskExecutorConnection(resourceId1, taskExecutorGateway);
+ final TaskExecutorConnection taskManagerConnection2 =
+ new TaskExecutorConnection(resourceId2, taskExecutorGateway);
+
+ final SlotID slotId1 = new SlotID(resourceId1, 0);
+ final SlotID slotId2 = new SlotID(resourceId1, 1);
+ final SlotID slotId3 = new SlotID(resourceId2, 0);
+ final SlotID slotId4 = new SlotID(resourceId2, 1);
+ final ResourceProfile resourceProfile1 = ResourceProfile.fromResources(1, 10);
+ final ResourceProfile resourceProfile2 = ResourceProfile.fromResources(2, 20);
+ final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile1);
+ final SlotStatus slotStatus2 =
+ new SlotStatus(slotId2, resourceProfile1, new JobID(), new AllocationID());
+ final SlotStatus slotStatus3 = new SlotStatus(slotId3, resourceProfile2);
+ final SlotStatus slotStatus4 =
+ new SlotStatus(slotId4, resourceProfile2, new JobID(), new AllocationID());
+ final SlotReport slotReport1 = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
+ final SlotReport slotReport2 = new SlotReport(Arrays.asList(slotStatus3, slotStatus4));
+
+ try (SlotManagerImpl slotManager =
+ createSlotManager(resourceManagerId, resourceManagerActions)) {
+ slotManager.registerTaskManager(
+ taskManagerConnection1,
+ slotReport1,
+ resourceProfile1.multiply(2),
+ resourceProfile1);
+ slotManager.registerTaskManager(
+ taskManagerConnection2,
+ slotReport2,
+ resourceProfile2.multiply(2),
+ resourceProfile2);
+
+ assertThat(
+ slotManager.getFreeResource(),
+ equalTo(resourceProfile1.merge(resourceProfile2)));
+ assertThat(
+ slotManager.getFreeResourceOf(taskManagerConnection1.getInstanceID()),
+ equalTo(resourceProfile1));
+ assertThat(
+ slotManager.getFreeResourceOf(taskManagerConnection2.getInstanceID()),
+ equalTo(resourceProfile2));
+ assertThat(
+ slotManager.getRegisteredResource(),
+ equalTo(resourceProfile1.merge(resourceProfile2).multiply(2)));
+ assertThat(
+ slotManager.getRegisteredResourceOf(taskManagerConnection1.getInstanceID()),
+ equalTo(resourceProfile1.multiply(2)));
+ assertThat(
+ slotManager.getRegisteredResourceOf(taskManagerConnection2.getInstanceID()),
+ equalTo(resourceProfile2.multiply(2)));
+ }
+ }
+
private Set<AllocationID> extractFailedAllocationsForJob(
JobID jobId2,
Map<JobID, List<Tuple2<JobID, AllocationID>>> job2AndJob3FailedAllocationInfo) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
index e0a8abb..9a9dc3c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
@@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
@@ -112,7 +113,8 @@ public class TaskExecutorManagerTest extends TestLogger {
requestedSlotProfile,
JobID.generate(),
new AllocationID()));
- taskExecutorManager.registerTaskManager(taskExecutorConnection, slotReport);
+ taskExecutorManager.registerTaskManager(
+ taskExecutorConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
// the slot from the task executor should be accepted, but we should still be waiting
// for the originally requested slot
@@ -328,6 +330,41 @@ public class TaskExecutorManagerTest extends TestLogger {
}
}
+ @Test
+ public void testGetResourceOverview() {
+ final ResourceProfile resourceProfile1 = ResourceProfile.fromResources(1, 10);
+ final ResourceProfile resourceProfile2 = ResourceProfile.fromResources(2, 20);
+
+ try (final TaskExecutorManager taskExecutorManager =
+ createTaskExecutorManagerBuilder().setMaxNumSlots(4).createTaskExecutorManager()) {
+ final InstanceID instanceId1 =
+ createAndRegisterTaskExecutor(taskExecutorManager, 2, resourceProfile1);
+ final InstanceID instanceId2 =
+ createAndRegisterTaskExecutor(taskExecutorManager, 2, resourceProfile2);
+ taskExecutorManager.occupySlot(instanceId1);
+ taskExecutorManager.occupySlot(instanceId2);
+
+ assertThat(
+ taskExecutorManager.getTotalFreeResources(),
+ equalTo(resourceProfile1.merge(resourceProfile2)));
+ assertThat(
+ taskExecutorManager.getTotalFreeResourcesOf(instanceId1),
+ equalTo(resourceProfile1));
+ assertThat(
+ taskExecutorManager.getTotalFreeResourcesOf(instanceId2),
+ equalTo(resourceProfile2));
+ assertThat(
+ taskExecutorManager.getTotalRegisteredResources(),
+ equalTo(resourceProfile1.merge(resourceProfile2).multiply(2)));
+ assertThat(
+ taskExecutorManager.getTotalRegisteredResourcesOf(instanceId1),
+ equalTo(resourceProfile1.multiply(2)));
+ assertThat(
+ taskExecutorManager.getTotalRegisteredResourcesOf(instanceId2),
+ equalTo(resourceProfile2.multiply(2)));
+ }
+ }
+
private static TaskExecutorManagerBuilder createTaskExecutorManagerBuilder() {
return new TaskExecutorManagerBuilder()
.setResourceActions(createResourceActionsBuilder().build());
@@ -358,7 +395,11 @@ public class TaskExecutorManagerTest extends TestLogger {
final SlotReport slotReport = new SlotReport(slotStatuses);
- taskExecutorManager.registerTaskManager(taskExecutorConnection, slotReport);
+ taskExecutorManager.registerTaskManager(
+ taskExecutorConnection,
+ slotReport,
+ resourceProfile.multiply(numSlots),
+ resourceProfile);
return taskExecutorConnection.getInstanceID();
}