You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2022/07/11 15:37:57 UTC

[flink] 03/05: [FLINK-28144][runtime] Introduce SlotPoolService#releaseFreeSlotsOnTaskManager to release free slots eagerly when blocking nodes

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 04f2f0c2660b312449419a3acb58a46a38d84f64
Author: Lijie Wang <wa...@gmail.com>
AuthorDate: Mon Jul 4 14:49:10 2022 +0800

    [FLINK-28144][runtime] Introduce SlotPoolService#releaseFreeSlotsOnTaskManager to release free slots eagerly when blocking nodes
---
 .../slotpool/DeclarativeSlotPoolService.java       | 26 ++++++++++++
 .../jobmaster/slotpool/SlotPoolService.java        |  8 ++++
 .../flink/runtime/jobmaster/JobMasterTest.java     |  6 +++
 .../slotpool/DeclarativeSlotPoolServiceTest.java   | 47 ++++++++++++++++++++++
 .../jobmaster/slotpool/TestingSlotPoolService.java |  6 +++
 5 files changed, 93 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
index ec1a97005b6..5379c96cfea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
@@ -49,6 +49,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /** {@link SlotPoolService} implementation for the {@link DeclarativeSlotPool}. */
 public class DeclarativeSlotPoolService implements SlotPoolService {
@@ -234,6 +235,31 @@ public class DeclarativeSlotPoolService implements SlotPoolService {
         return false;
     }
 
+    @Override
+    public void releaseFreeSlotsOnTaskManager(ResourceID taskManagerId, Exception cause) {
+        assertHasBeenStarted();
+        if (isTaskManagerRegistered(taskManagerId)) {
+
+            Collection<AllocationID> freeSlots =
+                    declarativeSlotPool.getFreeSlotsInformation().stream()
+                            .filter(
+                                    slotInfo ->
+                                            slotInfo.getTaskManagerLocation()
+                                                    .getResourceID()
+                                                    .equals(taskManagerId))
+                            .map(SlotInfoWithUtilization::getAllocationId)
+                            .collect(Collectors.toSet());
+
+            for (AllocationID allocationId : freeSlots) {
+                final ResourceCounter previouslyFulfilledRequirement =
+                        declarativeSlotPool.releaseSlot(allocationId, cause);
+                // release free slots, previously fulfilled requirement should be empty.
+                Preconditions.checkState(
+                        previouslyFulfilledRequirement.equals(ResourceCounter.empty()));
+            }
+        }
+    }
+
     private void releaseAllTaskManagers(Exception cause) {
         for (ResourceID registeredTaskManager : registeredTaskManagers) {
             internalReleaseTaskManager(registeredTaskManager, cause);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolService.java
index fc8b3671ee7..b0322dd2c5f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolService.java
@@ -113,6 +113,14 @@ public interface SlotPoolService extends AutoCloseable {
      */
     boolean releaseTaskManager(ResourceID taskManagerId, Exception cause);
 
+    /**
+     * Releases all free slots belonging to the owning TaskExecutor if it has been registered.
+     *
+     * @param taskManagerId identifying the TaskExecutor
+     * @param cause cause for failing the slots
+     */
+    void releaseFreeSlotsOnTaskManager(ResourceID taskManagerId, Exception cause);
+
     /**
      * Connects the SlotPool to the given ResourceManager. After this method is called, the SlotPool
      * will be able to request resources from the given ResourceManager.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index d7a3afbb688..e07ad72516d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -518,6 +518,12 @@ class JobMasterTest {
             return true;
         }
 
+        @Override
+        public void releaseFreeSlotsOnTaskManager(ResourceID taskManagerId, Exception cause) {
+            throw new UnsupportedOperationException(
+                    "TestingSlotPool does not support this operation.");
+        }
+
         @Override
         public Collection<SlotOffer> offerSlots(
                 TaskManagerLocation taskManagerLocation,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java
index 21d47552cfb..9d540128848 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java
@@ -43,6 +43,8 @@ import org.apache.flink.runtime.util.ResourceCounter;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.clock.SystemClock;
 
+import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
+
 import org.hamcrest.Description;
 import org.hamcrest.Matcher;
 import org.hamcrest.TypeSafeMatcher;
@@ -306,6 +308,51 @@ class DeclarativeSlotPoolServiceTest {
         }
     }
 
+    @Test
+    void testReleaseFreeSlotsOnTaskManager() throws Exception {
+        try (DeclarativeSlotPoolService slotPoolService = createDeclarativeSlotPoolService()) {
+            final LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+            slotPoolService.registerTaskManager(taskManagerLocation.getResourceID());
+
+            final ResourceProfile resourceProfile =
+                    ResourceProfile.newBuilder().setCpuCores(1).build();
+
+            SlotOffer slotOffer1 = new SlotOffer(new AllocationID(), 0, resourceProfile);
+            SlotOffer slotOffer2 = new SlotOffer(new AllocationID(), 1, resourceProfile);
+
+            final DeclarativeSlotPool slotPool = slotPoolService.getDeclarativeSlotPool();
+            slotPool.setResourceRequirements(ResourceCounter.withResource(resourceProfile, 2));
+
+            final DefaultDeclarativeSlotPoolTest.FreeSlotConsumer freeSlotConsumer =
+                    new DefaultDeclarativeSlotPoolTest.FreeSlotConsumer();
+
+            final Collection<SlotOffer> slotOffers = Arrays.asList(slotOffer1, slotOffer2);
+
+            slotPoolService.offerSlots(
+                    taskManagerLocation,
+                    new RpcTaskManagerGateway(
+                            new TestingTaskExecutorGatewayBuilder()
+                                    .setFreeSlotFunction(freeSlotConsumer)
+                                    .createTestingTaskExecutorGateway(),
+                            jobMasterId),
+                    slotOffers);
+
+            // slot1 is reserved, slot2 is free.
+            slotPool.reserveFreeSlot(slotOffer1.getAllocationId(), resourceProfile);
+
+            slotPoolService.releaseFreeSlotsOnTaskManager(
+                    taskManagerLocation.getResourceID(), new FlinkException("Test cause"));
+
+            assertThat(slotPool.getFreeSlotsInformation()).isEmpty();
+            assertThat(
+                            Iterables.getOnlyElement(slotPool.getAllSlotsInformation())
+                                    .getAllocationId())
+                    .isEqualTo(slotOffer1.getAllocationId());
+            assertThat(Iterables.getOnlyElement(freeSlotConsumer.drainFreedSlots()))
+                    .isEqualTo(slotOffer2.getAllocationId());
+        }
+    }
+
     private DeclarativeSlotPoolService createDeclarativeSlotPoolService() throws Exception {
         return createDeclarativeSlotPoolService(new DefaultDeclarativeSlotPoolFactory());
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolService.java
index 295f4059e24..5d10289686b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingSlotPoolService.java
@@ -147,6 +147,12 @@ public class TestingSlotPoolService implements SlotPoolService {
         return releaseTaskManagerFunction.apply(taskManagerId, cause);
     }
 
+    @Override
+    public void releaseFreeSlotsOnTaskManager(ResourceID taskManagerId, Exception cause) {
+        throw new UnsupportedOperationException(
+                "TestingSlotPoolService does not support this operation.");
+    }
+
     @Override
     public void connectToResourceManager(ResourceManagerGateway resourceManagerGateway) {
         connectToResourceManagerConsumer.accept(resourceManagerGateway);