You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2021/02/07 11:17:18 UTC

[flink] branch master updated: [FLINK-21047][coordination] Fix the incorrect registered/free resources information exposed by SlotManager

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ad9a76  [FLINK-21047][coordination] Fix the incorrect registered/free resources information exposed by SlotManager
0ad9a76 is described below

commit 0ad9a76d8c2fcd0af576cb483daf511f24b517ce
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Thu Feb 4 17:42:38 2021 +0800

    [FLINK-21047][coordination] Fix the incorrect registered/free resources information exposed by SlotManager
    
    This closes #14869
---
 .../slotmanager/DeclarativeSlotManager.java        |  5 +-
 .../slotmanager/SlotManagerImpl.java               | 38 ++++++++-----
 .../slotmanager/TaskExecutorManager.java           | 41 +++++++++-----
 .../slotmanager/TaskManagerRegistration.java       | 21 +++++++-
 .../slotmanager/SlotManagerImplTest.java           | 63 ++++++++++++++++++++++
 .../slotmanager/TaskExecutorManagerTest.java       | 45 +++++++++++++++-
 6 files changed, 182 insertions(+), 31 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
index 8aed8f7..be62cdd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
@@ -296,7 +296,10 @@ public class DeclarativeSlotManager implements SlotManager {
             return false;
         } else {
             if (!taskExecutorManager.registerTaskManager(
-                    taskExecutorConnection, initialSlotReport)) {
+                    taskExecutorConnection,
+                    initialSlotReport,
+                    totalResourceProfile,
+                    defaultSlotResourceProfile)) {
                 LOG.debug(
                         "Task executor {} could not be registered.",
                         taskExecutorConnection.getResourceID());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
index abd7e0a..87a7650 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
@@ -230,30 +230,38 @@ public class SlotManagerImpl implements SlotManager {
 
     @Override
     public ResourceProfile getRegisteredResource() {
-        return getResourceFromNumSlots(getNumberRegisteredSlots());
+        return taskManagerRegistrations.values().stream()
+                .map(TaskManagerRegistration::getTotalResource)
+                .reduce(ResourceProfile.ZERO, ResourceProfile::merge);
     }
 
     @Override
     public ResourceProfile getRegisteredResourceOf(InstanceID instanceID) {
-        return getResourceFromNumSlots(getNumberRegisteredSlotsOf(instanceID));
+        return Optional.ofNullable(taskManagerRegistrations.get(instanceID))
+                .map(TaskManagerRegistration::getTotalResource)
+                .orElse(ResourceProfile.ZERO);
     }
 
     @Override
     public ResourceProfile getFreeResource() {
-        return getResourceFromNumSlots(getNumberFreeSlots());
+        return taskManagerRegistrations.values().stream()
+                .map(
+                        taskManagerRegistration ->
+                                taskManagerRegistration
+                                        .getDefaultSlotResourceProfile()
+                                        .multiply(taskManagerRegistration.getNumberFreeSlots()))
+                .reduce(ResourceProfile.ZERO, ResourceProfile::merge);
     }
 
     @Override
     public ResourceProfile getFreeResourceOf(InstanceID instanceID) {
-        return getResourceFromNumSlots(getNumberFreeSlotsOf(instanceID));
-    }
-
-    private ResourceProfile getResourceFromNumSlots(int numSlots) {
-        if (numSlots < 0 || defaultSlotResourceProfile == null) {
-            return ResourceProfile.UNKNOWN;
-        } else {
-            return defaultSlotResourceProfile.multiply(numSlots);
-        }
+        return Optional.ofNullable(taskManagerRegistrations.get(instanceID))
+                .map(
+                        taskManagerRegistration ->
+                                taskManagerRegistration
+                                        .getDefaultSlotResourceProfile()
+                                        .multiply(taskManagerRegistration.getNumberFreeSlots()))
+                .orElse(ResourceProfile.ZERO);
     }
 
     @VisibleForTesting
@@ -497,7 +505,11 @@ public class SlotManagerImpl implements SlotManager {
             }
 
             TaskManagerRegistration taskManagerRegistration =
-                    new TaskManagerRegistration(taskExecutorConnection, reportedSlots);
+                    new TaskManagerRegistration(
+                            taskExecutorConnection,
+                            reportedSlots,
+                            totalResourceProfile,
+                            defaultSlotResourceProfile);
 
             taskManagerRegistrations.put(
                     taskExecutorConnection.getInstanceID(), taskManagerRegistration);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
index d26e8a9..e0519a7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
@@ -149,7 +149,10 @@ class TaskExecutorManager implements AutoCloseable {
     }
 
     public boolean registerTaskManager(
-            final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {
+            final TaskExecutorConnection taskExecutorConnection,
+            SlotReport initialSlotReport,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultSlotResourceProfile) {
         if (isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {
             LOG.info(
                     "The total number of slots exceeds the max limitation {}, releasing the excess task executor.",
@@ -165,7 +168,9 @@ class TaskExecutorManager implements AutoCloseable {
                         taskExecutorConnection,
                         StreamSupport.stream(initialSlotReport.spliterator(), false)
                                 .map(SlotStatus::getSlotID)
-                                .collect(Collectors.toList()));
+                                .collect(Collectors.toList()),
+                        totalResourceProfile,
+                        defaultSlotResourceProfile);
 
         taskManagerRegistrations.put(
                 taskExecutorConnection.getInstanceID(), taskManagerRegistration);
@@ -402,27 +407,35 @@ class TaskExecutorManager implements AutoCloseable {
     // ---------------------------------------------------------------------------------------------
 
     public ResourceProfile getTotalRegisteredResources() {
-        return getResourceFromNumSlots(getNumberRegisteredSlots());
+        return taskManagerRegistrations.values().stream()
+                .map(TaskManagerRegistration::getTotalResource)
+                .reduce(ResourceProfile.ZERO, ResourceProfile::merge);
     }
 
     public ResourceProfile getTotalRegisteredResourcesOf(InstanceID instanceID) {
-        return getResourceFromNumSlots(getNumberRegisteredSlotsOf(instanceID));
+        return Optional.ofNullable(taskManagerRegistrations.get(instanceID))
+                .map(TaskManagerRegistration::getTotalResource)
+                .orElse(ResourceProfile.ZERO);
     }
 
     public ResourceProfile getTotalFreeResources() {
-        return getResourceFromNumSlots(getNumberFreeSlots());
+        return taskManagerRegistrations.values().stream()
+                .map(
+                        taskManagerRegistration ->
+                                taskManagerRegistration
+                                        .getDefaultSlotResourceProfile()
+                                        .multiply(taskManagerRegistration.getNumberFreeSlots()))
+                .reduce(ResourceProfile.ZERO, ResourceProfile::merge);
     }
 
     public ResourceProfile getTotalFreeResourcesOf(InstanceID instanceID) {
-        return getResourceFromNumSlots(getNumberFreeSlotsOf(instanceID));
-    }
-
-    private ResourceProfile getResourceFromNumSlots(int numSlots) {
-        if (numSlots < 0 || defaultSlotResourceProfile == null) {
-            return ResourceProfile.UNKNOWN;
-        } else {
-            return defaultSlotResourceProfile.multiply(numSlots);
-        }
+        return Optional.ofNullable(taskManagerRegistrations.get(instanceID))
+                .map(
+                        taskManagerRegistration ->
+                                taskManagerRegistration
+                                        .getDefaultSlotResourceProfile()
+                                        .multiply(taskManagerRegistration.getNumberFreeSlots()))
+                .orElse(ResourceProfile.ZERO);
     }
 
     public Iterable<SlotID> getSlotsOf(InstanceID instanceId) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
index 08ef068..02915dc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
@@ -30,6 +31,10 @@ public class TaskManagerRegistration {
 
     private final TaskExecutorConnection taskManagerConnection;
 
+    private final ResourceProfile defaultSlotResourceProfile;
+
+    private final ResourceProfile totalResource;
+
     private final HashSet<SlotID> slots;
 
     private int numberFreeSlots;
@@ -38,12 +43,18 @@ public class TaskManagerRegistration {
     private long idleSince;
 
     public TaskManagerRegistration(
-            TaskExecutorConnection taskManagerConnection, Collection<SlotID> slots) {
+            TaskExecutorConnection taskManagerConnection,
+            Collection<SlotID> slots,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultSlotResourceProfile) {
 
         this.taskManagerConnection =
                 Preconditions.checkNotNull(taskManagerConnection, "taskManagerConnection");
         Preconditions.checkNotNull(slots, "slots");
 
+        this.totalResource = Preconditions.checkNotNull(totalResourceProfile);
+        this.defaultSlotResourceProfile = Preconditions.checkNotNull(defaultSlotResourceProfile);
+
         this.slots = new HashSet<>(slots);
 
         this.numberFreeSlots = slots.size();
@@ -67,6 +78,14 @@ public class TaskManagerRegistration {
         return numberFreeSlots;
     }
 
+    public ResourceProfile getDefaultSlotResourceProfile() {
+        return defaultSlotResourceProfile;
+    }
+
+    public ResourceProfile getTotalResource() {
+        return totalResource;
+    }
+
     public void freeSlot() {
         Preconditions.checkState(
                 numberFreeSlots < slots.size(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImplTest.java
index 59b92b7..0191ced 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImplTest.java
@@ -1504,6 +1504,69 @@ public class SlotManagerImplTest extends TestLogger {
         }
     }
 
+    @Test
+    public void testGetResourceOverview() throws Exception {
+        final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
+        final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
+
+        final TaskExecutorGateway taskExecutorGateway =
+                new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
+        final ResourceID resourceId1 = ResourceID.generate();
+        final ResourceID resourceId2 = ResourceID.generate();
+        final TaskExecutorConnection taskManagerConnection1 =
+                new TaskExecutorConnection(resourceId1, taskExecutorGateway);
+        final TaskExecutorConnection taskManagerConnection2 =
+                new TaskExecutorConnection(resourceId2, taskExecutorGateway);
+
+        final SlotID slotId1 = new SlotID(resourceId1, 0);
+        final SlotID slotId2 = new SlotID(resourceId1, 1);
+        final SlotID slotId3 = new SlotID(resourceId2, 0);
+        final SlotID slotId4 = new SlotID(resourceId2, 1);
+        final ResourceProfile resourceProfile1 = ResourceProfile.fromResources(1, 10);
+        final ResourceProfile resourceProfile2 = ResourceProfile.fromResources(2, 20);
+        final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile1);
+        final SlotStatus slotStatus2 =
+                new SlotStatus(slotId2, resourceProfile1, new JobID(), new AllocationID());
+        final SlotStatus slotStatus3 = new SlotStatus(slotId3, resourceProfile2);
+        final SlotStatus slotStatus4 =
+                new SlotStatus(slotId4, resourceProfile2, new JobID(), new AllocationID());
+        final SlotReport slotReport1 = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
+        final SlotReport slotReport2 = new SlotReport(Arrays.asList(slotStatus3, slotStatus4));
+
+        try (SlotManagerImpl slotManager =
+                createSlotManager(resourceManagerId, resourceManagerActions)) {
+            slotManager.registerTaskManager(
+                    taskManagerConnection1,
+                    slotReport1,
+                    resourceProfile1.multiply(2),
+                    resourceProfile1);
+            slotManager.registerTaskManager(
+                    taskManagerConnection2,
+                    slotReport2,
+                    resourceProfile2.multiply(2),
+                    resourceProfile2);
+
+            assertThat(
+                    slotManager.getFreeResource(),
+                    equalTo(resourceProfile1.merge(resourceProfile2)));
+            assertThat(
+                    slotManager.getFreeResourceOf(taskManagerConnection1.getInstanceID()),
+                    equalTo(resourceProfile1));
+            assertThat(
+                    slotManager.getFreeResourceOf(taskManagerConnection2.getInstanceID()),
+                    equalTo(resourceProfile2));
+            assertThat(
+                    slotManager.getRegisteredResource(),
+                    equalTo(resourceProfile1.merge(resourceProfile2).multiply(2)));
+            assertThat(
+                    slotManager.getRegisteredResourceOf(taskManagerConnection1.getInstanceID()),
+                    equalTo(resourceProfile1.multiply(2)));
+            assertThat(
+                    slotManager.getRegisteredResourceOf(taskManagerConnection2.getInstanceID()),
+                    equalTo(resourceProfile2.multiply(2)));
+        }
+    }
+
     private Set<AllocationID> extractFailedAllocationsForJob(
             JobID jobId2,
             Map<JobID, List<Tuple2<JobID, AllocationID>>> job2AndJob3FailedAllocationInfo) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
index e0a8abb..9a9dc3c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
@@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertEquals;
@@ -112,7 +113,8 @@ public class TaskExecutorManagerTest extends TestLogger {
                                     requestedSlotProfile,
                                     JobID.generate(),
                                     new AllocationID()));
-            taskExecutorManager.registerTaskManager(taskExecutorConnection, slotReport);
+            taskExecutorManager.registerTaskManager(
+                    taskExecutorConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
 
             // the slot from the task executor should be accepted, but we should still be waiting
             // for the originally requested slot
@@ -328,6 +330,41 @@ public class TaskExecutorManagerTest extends TestLogger {
         }
     }
 
+    @Test
+    public void testGetResourceOverview() {
+        final ResourceProfile resourceProfile1 = ResourceProfile.fromResources(1, 10);
+        final ResourceProfile resourceProfile2 = ResourceProfile.fromResources(2, 20);
+
+        try (final TaskExecutorManager taskExecutorManager =
+                createTaskExecutorManagerBuilder().setMaxNumSlots(4).createTaskExecutorManager()) {
+            final InstanceID instanceId1 =
+                    createAndRegisterTaskExecutor(taskExecutorManager, 2, resourceProfile1);
+            final InstanceID instanceId2 =
+                    createAndRegisterTaskExecutor(taskExecutorManager, 2, resourceProfile2);
+            taskExecutorManager.occupySlot(instanceId1);
+            taskExecutorManager.occupySlot(instanceId2);
+
+            assertThat(
+                    taskExecutorManager.getTotalFreeResources(),
+                    equalTo(resourceProfile1.merge(resourceProfile2)));
+            assertThat(
+                    taskExecutorManager.getTotalFreeResourcesOf(instanceId1),
+                    equalTo(resourceProfile1));
+            assertThat(
+                    taskExecutorManager.getTotalFreeResourcesOf(instanceId2),
+                    equalTo(resourceProfile2));
+            assertThat(
+                    taskExecutorManager.getTotalRegisteredResources(),
+                    equalTo(resourceProfile1.merge(resourceProfile2).multiply(2)));
+            assertThat(
+                    taskExecutorManager.getTotalRegisteredResourcesOf(instanceId1),
+                    equalTo(resourceProfile1.multiply(2)));
+            assertThat(
+                    taskExecutorManager.getTotalRegisteredResourcesOf(instanceId2),
+                    equalTo(resourceProfile2.multiply(2)));
+        }
+    }
+
     private static TaskExecutorManagerBuilder createTaskExecutorManagerBuilder() {
         return new TaskExecutorManagerBuilder()
                 .setResourceActions(createResourceActionsBuilder().build());
@@ -358,7 +395,11 @@ public class TaskExecutorManagerTest extends TestLogger {
 
         final SlotReport slotReport = new SlotReport(slotStatuses);
 
-        taskExecutorManager.registerTaskManager(taskExecutorConnection, slotReport);
+        taskExecutorManager.registerTaskManager(
+                taskExecutorConnection,
+                slotReport,
+                resourceProfile.multiply(numSlots),
+                resourceProfile);
 
         return taskExecutorConnection.getInstanceID();
     }