You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/12/06 11:23:44 UTC

[flink] branch master updated (c6e824c955b -> 209df810f13)

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from c6e824c955b [hotfix] Remove out-dated comment
     new a0aa4813b9f [FLINK-29870][runtime] Split ResourceActions to ResourceAllocator and ResourceEventListener.
     new 209df810f13 [FLINK-29870][runtime] move ResourceAllocator to ActiveResourceManager.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/resourcemanager/ResourceManager.java   | 81 +++++++------------
 .../resourcemanager/StandaloneResourceManager.java | 23 +++---
 .../active/ActiveResourceManager.java              | 70 +++++++++++++----
 .../slotmanager/DeclarativeSlotManager.java        | 25 +++---
 .../slotmanager/FineGrainedSlotManager.java        | 53 ++++++++-----
 .../NonSupportedResourceAllocatorImpl.java}        | 29 +++----
 ...ResourceActions.java => ResourceAllocator.java} | 26 +++----
 .../slotmanager/ResourceEventListener.java}        | 21 +++--
 .../resourcemanager/slotmanager/SlotManager.java   | 10 ++-
 .../slotmanager/TaskExecutorManager.java           | 37 +++++----
 .../resourcemanager/ResourceManagerTest.java       | 31 +++-----
 .../resourcemanager/TestingResourceManager.java    | 27 ++++---
 .../TestingResourceManagerFactory.java             | 20 +++--
 .../active/ActiveResourceManagerTest.java          | 71 ++++++++---------
 .../AbstractFineGrainedSlotManagerITCase.java      | 12 +--
 .../slotmanager/DeclarativeSlotManagerBuilder.java | 40 ++++++++--
 .../slotmanager/DeclarativeSlotManagerTest.java    | 91 +++++++++++-----------
 ...gerDefaultResourceAllocationStrategyITCase.java |  7 +-
 .../slotmanager/FineGrainedSlotManagerTest.java    | 12 +--
 .../FineGrainedSlotManagerTestBase.java            | 10 ++-
 .../slotmanager/TaskExecutorManagerBuilder.java    |  8 +-
 .../slotmanager/TaskExecutorManagerTest.java       | 52 ++++++-------
 .../slotmanager/TestingResourceActionsBuilder.java | 72 -----------------
 ...eActions.java => TestingResourceAllocator.java} | 40 ++++------
 .../TestingResourceAllocatorBuilder.java           | 47 +++++++++++
 .../slotmanager/TestingResourceEventListener.java  | 48 ++++++++++++
 .../TestingResourceEventListenerBuilder.java}      | 30 ++++---
 .../slotmanager/TestingSlotManager.java            |  3 +-
 28 files changed, 526 insertions(+), 470 deletions(-)
 copy flink-runtime/src/{test/java/org/apache/flink/runtime/io/disk/NoOpFileChannelManager.java => main/java/org/apache/flink/runtime/resourcemanager/slotmanager/NonSupportedResourceAllocatorImpl.java} (54%)
 rename flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/{ResourceActions.java => ResourceAllocator.java} (69%)
 copy flink-runtime/src/main/java/org/apache/flink/runtime/{jobmaster/slotpool/DeclarativeSlotPoolFactory.java => resourcemanager/slotmanager/ResourceEventListener.java} (63%)
 delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java
 rename flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/{TestingResourceActions.java => TestingResourceAllocator.java} (54%)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocatorBuilder.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceEventListener.java
 copy flink-runtime/src/test/java/org/apache/flink/runtime/{jobmaster/slotpool/TestingDeclarativeSlotPoolFactory.java => resourcemanager/slotmanager/TestingResourceEventListenerBuilder.java} (52%)


[flink] 01/02: [FLINK-29870][runtime] Split ResourceActions to ResourceAllocator and ResourceEventListener.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a0aa4813b9fe3db37cabe38fa47f1d1096c879dc
Author: Weihua Hu <hu...@gmail.com>
AuthorDate: Thu Nov 3 16:10:49 2022 +0800

    [FLINK-29870][runtime] Split ResourceActions to ResourceAllocator and ResourceEventListener.
---
 .../runtime/resourcemanager/ResourceManager.java   | 12 ++--
 .../slotmanager/DeclarativeSlotManager.java        | 25 +++----
 .../slotmanager/FineGrainedSlotManager.java        | 32 +++++----
 ...ResourceActions.java => ResourceAllocator.java} | 16 +----
 ...urceActions.java => ResourceEventListener.java} | 30 ++------
 .../resourcemanager/slotmanager/SlotManager.java   | 10 +--
 .../slotmanager/TaskExecutorManager.java           | 14 ++--
 .../AbstractFineGrainedSlotManagerITCase.java      | 12 ++--
 .../slotmanager/DeclarativeSlotManagerBuilder.java | 40 +++++++++--
 .../slotmanager/DeclarativeSlotManagerTest.java    | 83 ++++++++++++----------
 ...gerDefaultResourceAllocationStrategyITCase.java |  2 +-
 .../slotmanager/FineGrainedSlotManagerTest.java    | 12 ++--
 .../FineGrainedSlotManagerTestBase.java            | 10 ++-
 .../slotmanager/TaskExecutorManagerBuilder.java    |  8 +--
 .../slotmanager/TaskExecutorManagerTest.java       | 38 +++++-----
 ...eActions.java => TestingResourceAllocator.java} | 25 ++-----
 ...r.java => TestingResourceAllocatorBuilder.java} | 28 ++------
 .../slotmanager/TestingResourceEventListener.java  | 48 +++++++++++++
 .../TestingResourceEventListenerBuilder.java       | 41 +++++++++++
 .../slotmanager/TestingSlotManager.java            |  3 +-
 20 files changed, 284 insertions(+), 205 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 12684c25247..3abd6b6b68d 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -56,7 +56,8 @@ import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerExcept
 import org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException;
 import org.apache.flink.runtime.resourcemanager.registration.JobManagerRegistration;
 import org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration;
-import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions;
+import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocator;
+import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceEventListener;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rest.messages.LogInfo;
 import org.apache.flink.runtime.rest.messages.ThreadDumpInfo;
@@ -266,7 +267,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
             slotManager.start(
                     getFencingToken(),
                     getMainThreadExecutor(),
-                    new ResourceActionsImpl(),
+                    new ResourceAllocatorImpl(),
+                    new ResourceEventListenerImpl(),
                     blocklistHandler::isBlockedTaskManager);
 
             delegationTokenManager.start(this);
@@ -1334,7 +1336,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
         }
     }
 
-    private class ResourceActionsImpl implements ResourceActions {
+    private class ResourceAllocatorImpl implements ResourceAllocator {
 
         @Override
         public void releaseResource(InstanceID instanceId, Exception cause) {
@@ -1348,9 +1350,11 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
             validateRunsInMainThread();
             return startNewWorker(workerResourceSpec);
         }
+    }
 
+    private class ResourceEventListenerImpl implements ResourceEventListener {
         @Override
-        public void notifyNotEnoughResourcesAvailable(
+        public void notEnoughResourceAvailable(
                 JobID jobId, Collection<ResourceRequirement> acquiredResources) {
             validateRunsInMainThread();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
index 71898ae747d..f5e210de073 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
@@ -69,7 +69,7 @@ public class DeclarativeSlotManager implements SlotManager {
 
     private final SlotTracker slotTracker;
     private final ResourceTracker resourceTracker;
-    private final BiFunction<Executor, ResourceActions, TaskExecutorManager>
+    private final BiFunction<Executor, ResourceAllocator, TaskExecutorManager>
             taskExecutorManagerFactory;
     @Nullable private TaskExecutorManager taskExecutorManager;
 
@@ -97,8 +97,8 @@ public class DeclarativeSlotManager implements SlotManager {
     /** Executor for future callbacks which have to be "synchronized". */
     @Nullable private Executor mainThreadExecutor;
 
-    /** Callbacks for resource (de-)allocations. */
-    @Nullable private ResourceActions resourceActions;
+    /** Callbacks for resource not enough. */
+    @Nullable private ResourceEventListener resourceEventListener;
 
     /** The future of the requirements delay check. */
     @Nullable private CompletableFuture<Void> requirementsCheckFuture;
@@ -131,7 +131,7 @@ public class DeclarativeSlotManager implements SlotManager {
         slotMatchingStrategy = slotManagerConfiguration.getSlotMatchingStrategy();
 
         taskExecutorManagerFactory =
-                (executor, resourceActions) ->
+                (executor, resourceAllocator) ->
                         new TaskExecutorManager(
                                 slotManagerConfiguration.getDefaultWorkerResourceSpec(),
                                 slotManagerConfiguration.getNumSlotsPerWorker(),
@@ -141,10 +141,10 @@ public class DeclarativeSlotManager implements SlotManager {
                                 slotManagerConfiguration.getTaskManagerTimeout(),
                                 scheduledExecutor,
                                 executor,
-                                resourceActions);
+                                resourceAllocator);
 
         resourceManagerId = null;
-        resourceActions = null;
+        resourceEventListener = null;
         mainThreadExecutor = null;
         taskExecutorManager = null;
         blockedTaskManagerChecker = null;
@@ -199,22 +199,23 @@ public class DeclarativeSlotManager implements SlotManager {
      *
      * @param newResourceManagerId to use for communication with the task managers
      * @param newMainThreadExecutor to use to run code in the ResourceManager's main thread
-     * @param newResourceActions to use for resource (de-)allocations
+     * @param newResourceAllocator to use for resource (de-)allocations
      * @param newBlockedTaskManagerChecker to query whether a task manager is blocked
      */
     @Override
     public void start(
             ResourceManagerId newResourceManagerId,
             Executor newMainThreadExecutor,
-            ResourceActions newResourceActions,
+            ResourceAllocator newResourceAllocator,
+            ResourceEventListener newResourceEventListener,
             BlockedTaskManagerChecker newBlockedTaskManagerChecker) {
         LOG.debug("Starting the slot manager.");
 
         this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
         mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
-        resourceActions = Preconditions.checkNotNull(newResourceActions);
+        resourceEventListener = Preconditions.checkNotNull(newResourceEventListener);
         taskExecutorManager =
-                taskExecutorManagerFactory.apply(newMainThreadExecutor, newResourceActions);
+                taskExecutorManagerFactory.apply(newMainThreadExecutor, newResourceAllocator);
         blockedTaskManagerChecker = Preconditions.checkNotNull(newBlockedTaskManagerChecker);
 
         started = true;
@@ -253,7 +254,7 @@ public class DeclarativeSlotManager implements SlotManager {
 
         taskExecutorManager = null;
         resourceManagerId = null;
-        resourceActions = null;
+        resourceEventListener = null;
         blockedTaskManagerChecker = null;
         started = false;
     }
@@ -723,7 +724,7 @@ public class DeclarativeSlotManager implements SlotManager {
                                 "Could not fulfill resource requirements of job {}. Free slots: {}",
                                 jobId,
                                 slotTracker.getFreeSlots().size());
-                        resourceActions.notifyNotEnoughResourcesAvailable(
+                        resourceEventListener.notEnoughResourceAvailable(
                                 jobId, resourceTracker.getAcquiredResources(jobId));
                         return pendingSlots;
                     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
index 4833c7ad1a6..c999d9883de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
@@ -108,7 +108,10 @@ public class FineGrainedSlotManager implements SlotManager {
     @Nullable private Executor mainThreadExecutor;
 
     /** Callbacks for resource (de-)allocations. */
-    @Nullable private ResourceActions resourceActions;
+    @Nullable private ResourceAllocator resourceAllocator;
+
+    /** Callbacks for resource not enough. */
+    @Nullable private ResourceEventListener resourceEventListener;
 
     @Nullable private ScheduledFuture<?> taskManagerTimeoutsCheck;
 
@@ -149,7 +152,8 @@ public class FineGrainedSlotManager implements SlotManager {
         this.maxTotalMem = Preconditions.checkNotNull(slotManagerConfiguration.getMaxTotalMem());
 
         resourceManagerId = null;
-        resourceActions = null;
+        resourceAllocator = null;
+        resourceEventListener = null;
         mainThreadExecutor = null;
         taskManagerTimeoutsCheck = null;
         requirementsCheckFuture = null;
@@ -166,7 +170,7 @@ public class FineGrainedSlotManager implements SlotManager {
 
         if (failUnfulfillableRequest && !unfulfillableJobs.isEmpty()) {
             for (JobID jobId : unfulfillableJobs) {
-                resourceActions.notifyNotEnoughResourcesAvailable(
+                resourceEventListener.notEnoughResourceAvailable(
                         jobId, resourceTracker.getAcquiredResources(jobId));
             }
         }
@@ -186,20 +190,22 @@ public class FineGrainedSlotManager implements SlotManager {
      *
      * @param newResourceManagerId to use for communication with the task managers
      * @param newMainThreadExecutor to use to run code in the ResourceManager's main thread
-     * @param newResourceActions to use for resource (de-)allocations
+     * @param newResourceAllocator to use for resource (de-)allocations
      * @param newBlockedTaskManagerChecker to query whether a task manager is blocked
      */
     @Override
     public void start(
             ResourceManagerId newResourceManagerId,
             Executor newMainThreadExecutor,
-            ResourceActions newResourceActions,
+            ResourceAllocator newResourceAllocator,
+            ResourceEventListener newResourceEventListener,
             BlockedTaskManagerChecker newBlockedTaskManagerChecker) {
         LOG.info("Starting the slot manager.");
 
         resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
         mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
-        resourceActions = Preconditions.checkNotNull(newResourceActions);
+        resourceAllocator = Preconditions.checkNotNull(newResourceAllocator);
+        resourceEventListener = Preconditions.checkNotNull(newResourceEventListener);
         slotStatusSyncer.initialize(
                 taskManagerTracker, resourceTracker, resourceManagerId, mainThreadExecutor);
         blockedTaskManagerChecker = Preconditions.checkNotNull(newBlockedTaskManagerChecker);
@@ -246,7 +252,8 @@ public class FineGrainedSlotManager implements SlotManager {
 
         unfulfillableJobs.clear();
         resourceManagerId = null;
-        resourceActions = null;
+        resourceAllocator = null;
+        resourceEventListener = null;
         started = false;
     }
 
@@ -346,7 +353,7 @@ public class FineGrainedSlotManager implements SlotManager {
                         taskExecutorConnection.getResourceID(),
                         maxTotalCpu,
                         maxTotalMem.toHumanReadableString());
-                resourceActions.releaseResource(
+                resourceAllocator.releaseResource(
                         taskExecutorConnection.getInstanceID(),
                         new FlinkExpectedException(
                                 "The max total resource limitation is reached."));
@@ -571,7 +578,7 @@ public class FineGrainedSlotManager implements SlotManager {
         if (sendNotEnoughResourceNotifications) {
             for (JobID jobId : unfulfillableJobs) {
                 LOG.warn("Could not fulfill resource requirements of job {}.", jobId);
-                resourceActions.notifyNotEnoughResourcesAvailable(
+                resourceEventListener.notEnoughResourceAvailable(
                         jobId, resourceTracker.getAcquiredResources(jobId));
             }
         }
@@ -730,7 +737,7 @@ public class FineGrainedSlotManager implements SlotManager {
     private void releaseIdleTaskExecutor(InstanceID timedOutTaskManagerId) {
         final FlinkExpectedException cause =
                 new FlinkExpectedException("TaskManager exceeded the idle timeout.");
-        resourceActions.releaseResource(timedOutTaskManagerId, cause);
+        resourceAllocator.releaseResource(timedOutTaskManagerId, cause);
     }
 
     private boolean allocateResource(PendingTaskManager pendingTaskManager) {
@@ -743,7 +750,7 @@ public class FineGrainedSlotManager implements SlotManager {
             return false;
         }
 
-        if (!resourceActions.allocateResource(
+        if (!resourceAllocator.allocateResource(
                 WorkerResourceSpec.fromTotalResourceProfile(
                         pendingTaskManager.getTotalResourceProfile(),
                         pendingTaskManager.getNumSlots()))) {
@@ -771,7 +778,8 @@ public class FineGrainedSlotManager implements SlotManager {
         Preconditions.checkState(started, "The slot manager has not been started.");
         Preconditions.checkNotNull(resourceManagerId);
         Preconditions.checkNotNull(mainThreadExecutor);
-        Preconditions.checkNotNull(resourceActions);
+        Preconditions.checkNotNull(resourceAllocator);
+        Preconditions.checkNotNull(resourceEventListener);
     }
 
     private boolean isMaxTotalResourceExceededAfterAdding(ResourceProfile newResource) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocator.java
similarity index 73%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocator.java
index 639f8cb53fa..179a2ad5ae1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocator.java
@@ -18,15 +18,11 @@
 
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
-import org.apache.flink.runtime.slots.ResourceRequirement;
-
-import java.util.Collection;
 
 /** Resource related actions which the {@link SlotManager} can perform. */
-public interface ResourceActions {
+public interface ResourceAllocator {
 
     /**
      * Releases the resource with the given instance id.
@@ -43,14 +39,4 @@ public interface ResourceActions {
      * @return whether the resource can be allocated
      */
     boolean allocateResource(WorkerResourceSpec workerResourceSpec);
-
-    /**
-     * Notifies that not enough resources are available to fulfill the resource requirements of a
-     * job.
-     *
-     * @param jobId job for which not enough resources are available
-     * @param acquiredResources the resources that have been acquired for the job
-     */
-    void notifyNotEnoughResourcesAvailable(
-            JobID jobId, Collection<ResourceRequirement> acquiredResources);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceEventListener.java
similarity index 53%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceEventListener.java
index 639f8cb53fa..1106b378a18 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceEventListener.java
@@ -19,38 +19,16 @@
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 
 import java.util.Collection;
 
-/** Resource related actions which the {@link SlotManager} can perform. */
-public interface ResourceActions {
-
-    /**
-     * Releases the resource with the given instance id.
-     *
-     * @param instanceId identifying which resource to release
-     * @param cause why the resource is released
-     */
-    void releaseResource(InstanceID instanceId, Exception cause);
-
-    /**
-     * Requests to allocate a resource with the given {@link WorkerResourceSpec}.
-     *
-     * @param workerResourceSpec for the to be allocated worker
-     * @return whether the resource can be allocated
-     */
-    boolean allocateResource(WorkerResourceSpec workerResourceSpec);
-
+/** Listener for resource events of {@link SlotManager}. */
+@FunctionalInterface
+public interface ResourceEventListener {
     /**
-     * Notifies that not enough resources are available to fulfill the resource requirements of a
-     * job.
-     *
      * @param jobId job for which not enough resources are available
      * @param acquiredResources the resources that have been acquired for the job
      */
-    void notifyNotEnoughResourcesAvailable(
-            JobID jobId, Collection<ResourceRequirement> acquiredResources);
+    void notEnoughResourceAvailable(JobID jobId, Collection<ResourceRequirement> acquiredResources);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index cf706515b02..9ea06c1478b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -40,7 +40,7 @@ import java.util.concurrent.Executor;
  * their allocation and all pending slot requests. Whenever a new slot is registered or an allocated
  * slot is freed, then it tries to fulfill another pending slot request. Whenever there are not
  * enough slots available the slot manager will notify the resource manager about it via {@link
- * ResourceActions#allocateResource(WorkerResourceSpec)}.
+ * ResourceAllocator#allocateResource(WorkerResourceSpec)}.
  *
  * <p>In order to free resources and avoid resource leaks, idling task managers (task managers whose
  * slots are currently not used) and pending slot requests time out triggering their release and
@@ -56,7 +56,7 @@ public interface SlotManager extends AutoCloseable {
     int getNumberFreeSlotsOf(InstanceID instanceId);
 
     /**
-     * Get number of workers SlotManager requested from {@link ResourceActions} that are not yet
+     * Get number of workers SlotManager requested from {@link ResourceAllocator} that are not yet
      * fulfilled.
      *
      * @return a map whose key set is all the unique resource specs of the pending workers, and the
@@ -79,13 +79,15 @@ public interface SlotManager extends AutoCloseable {
      *
      * @param newResourceManagerId to use for communication with the task managers
      * @param newMainThreadExecutor to use to run code in the ResourceManager's main thread
-     * @param newResourceActions to use for resource (de-)allocations
+     * @param newResourceAllocator to use for resource (de-)allocations
+     * @param resourceEventListener to use for notify resource not enough
      * @param newBlockedTaskManagerChecker to query whether a task manager is blocked
      */
     void start(
             ResourceManagerId newResourceManagerId,
             Executor newMainThreadExecutor,
-            ResourceActions newResourceActions,
+            ResourceAllocator newResourceAllocator,
+            ResourceEventListener resourceEventListener,
             BlockedTaskManagerChecker newBlockedTaskManagerChecker);
 
     /** Suspends the component. This clears the internal state of the slot manager. */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
index 1df5a872ee4..d1515a45eea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
@@ -90,7 +90,7 @@ class TaskExecutorManager implements AutoCloseable {
     private final Time taskManagerTimeout;
 
     /** Callbacks for resource (de-)allocations. */
-    private final ResourceActions resourceActions;
+    private final ResourceAllocator resourceAllocator;
 
     /** All currently registered task managers. */
     private final Map<InstanceID, TaskManagerRegistration> taskManagerRegistrations =
@@ -111,7 +111,7 @@ class TaskExecutorManager implements AutoCloseable {
             Time taskManagerTimeout,
             ScheduledExecutor scheduledExecutor,
             Executor mainThreadExecutor,
-            ResourceActions resourceActions) {
+            ResourceAllocator resourceAllocator) {
 
         this.defaultWorkerResourceSpec = defaultWorkerResourceSpec;
         this.numSlotsPerWorker = numSlotsPerWorker;
@@ -123,7 +123,7 @@ class TaskExecutorManager implements AutoCloseable {
                 SlotManagerUtils.generateDefaultSlotResourceProfile(
                         defaultWorkerResourceSpec, numSlotsPerWorker);
 
-        this.resourceActions = Preconditions.checkNotNull(resourceActions);
+        this.resourceAllocator = Preconditions.checkNotNull(resourceAllocator);
         this.mainThreadExecutor = mainThreadExecutor;
         taskManagerTimeoutsAndRedundancyCheck =
                 scheduledExecutor.scheduleWithFixedDelay(
@@ -157,7 +157,7 @@ class TaskExecutorManager implements AutoCloseable {
             LOG.info(
                     "The total number of slots exceeds the max limitation {}, releasing the excess task executor.",
                     maxSlotNum);
-            resourceActions.releaseResource(
+            resourceAllocator.releaseResource(
                     taskExecutorConnection.getInstanceID(),
                     new FlinkExpectedException(
                             "The total number of slots exceeds the max limitation."));
@@ -270,7 +270,7 @@ class TaskExecutorManager implements AutoCloseable {
             return Optional.empty();
         }
 
-        if (!resourceActions.allocateResource(defaultWorkerResourceSpec)) {
+        if (!resourceAllocator.allocateResource(defaultWorkerResourceSpec)) {
             // resource cannot be allocated
             return Optional.empty();
         }
@@ -321,7 +321,7 @@ class TaskExecutorManager implements AutoCloseable {
                         >= taskManagerTimeout.toMilliseconds()) {
                     // we collect the instance ids first in order to avoid concurrent modifications
                     // by the
-                    // ResourceActions.releaseResource call
+                    // ResourceAllocator.releaseResource call
                     timedOutTaskManagers.add(taskManagerRegistration);
                 }
             }
@@ -406,7 +406,7 @@ class TaskExecutorManager implements AutoCloseable {
         LOG.debug(
                 "Release TaskExecutor {} because it exceeded the idle timeout.",
                 timedOutTaskManagerId);
-        resourceActions.releaseResource(timedOutTaskManagerId, cause);
+        resourceAllocator.releaseResource(timedOutTaskManagerId, cause);
     }
 
     // ---------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java
index fd86e99efb1..e472b718539 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java
@@ -67,7 +67,7 @@ abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSlotManag
                 new CompletableFuture<>();
         new Context() {
             {
-                resourceActionsBuilder.setAllocateResourceConsumer(
+                resourceAllocatorBuilder.setAllocateResourceConsumer(
                         allocateResourceFuture::complete);
                 runTest(
                         () -> {
@@ -224,7 +224,7 @@ abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSlotManag
         new Context() {
             {
                 setBlockedTaskManagerChecker(blockedTaskManager::equals);
-                resourceActionsBuilder.setAllocateResourceConsumer(
+                resourceAllocatorBuilder.setAllocateResourceConsumer(
                         allocateResourceFuture::complete);
                 runTest(
                         () -> {
@@ -259,7 +259,7 @@ abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSlotManag
 
         new Context() {
             {
-                resourceActionsBuilder.setAllocateResourceConsumer(
+                resourceAllocatorBuilder.setAllocateResourceConsumer(
                         ignored -> {
                             if (allocateResourceFutures.get(0).isDone()) {
                                 allocateResourceFutures.get(1).complete(null);
@@ -430,7 +430,7 @@ abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSlotManag
         final SlotReport slotReport = new SlotReport();
         new Context() {
             {
-                resourceActionsBuilder.setAllocateResourceConsumer(
+                resourceAllocatorBuilder.setAllocateResourceConsumer(
                         ignored -> allocateResourceFutures.complete(null));
                 runTest(
                         () -> {
@@ -481,7 +481,7 @@ abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSlotManag
         final SlotReport slotReport = new SlotReport();
         new Context() {
             {
-                resourceActionsBuilder.setAllocateResourceConsumer(
+                resourceAllocatorBuilder.setAllocateResourceConsumer(
                         ignored -> allocateResourceFutures.complete(null));
                 runTest(
                         () -> {
@@ -528,7 +528,7 @@ abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSlotManag
 
         new Context() {
             {
-                resourceActionsBuilder.setAllocateResourceConsumer(
+                resourceAllocatorBuilder.setAllocateResourceConsumer(
                         ignored -> {
                             if (allocateResourceFutures.get(0).isDone()) {
                                 allocateResourceFutures.get(1).complete(null);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java
index ee0b811350b..959f5250018 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java
@@ -170,30 +170,56 @@ public class DeclarativeSlotManagerBuilder {
 
     public DeclarativeSlotManager buildAndStartWithDirectExec() {
         return buildAndStartWithDirectExec(
-                ResourceManagerId.generate(), new TestingResourceActionsBuilder().build());
+                ResourceManagerId.generate(),
+                new TestingResourceAllocatorBuilder().build(),
+                new TestingResourceEventListenerBuilder().build());
     }
 
     public DeclarativeSlotManager buildAndStartWithDirectExec(
-            ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions) {
-        return buildAndStart(resourceManagerId, Executors.directExecutor(), resourceManagerActions);
+            ResourceManagerId resourceManagerId, ResourceAllocator resourceAllocator) {
+        return buildAndStartWithDirectExec(
+                resourceManagerId,
+                resourceAllocator,
+                new TestingResourceEventListenerBuilder().build());
+    }
+
+    public DeclarativeSlotManager buildAndStartWithDirectExec(
+            ResourceManagerId resourceManagerId,
+            ResourceAllocator resourceAllocator,
+            ResourceEventListener resourceEventListener) {
+        return buildAndStart(
+                resourceManagerId,
+                Executors.directExecutor(),
+                resourceAllocator,
+                resourceEventListener);
     }
 
     public DeclarativeSlotManager buildAndStart(
             ResourceManagerId resourceManagerId,
             Executor executor,
-            ResourceActions resourceManagerActions) {
+            ResourceAllocator resourceAllocator,
+            ResourceEventListener resourceEventListener) {
         return buildAndStart(
-                resourceManagerId, executor, resourceManagerActions, resourceID -> false);
+                resourceManagerId,
+                executor,
+                resourceAllocator,
+                resourceEventListener,
+                resourceID -> false);
     }
 
     public DeclarativeSlotManager buildAndStart(
             ResourceManagerId resourceManagerId,
             Executor executor,
-            ResourceActions resourceManagerActions,
+            ResourceAllocator resourceAllocator,
+            ResourceEventListener resourceEventListener,
             BlockedTaskManagerChecker blockedTaskManagerChecker) {
         final DeclarativeSlotManager slotManager = build();
         slotManager.start(
-                resourceManagerId, executor, resourceManagerActions, blockedTaskManagerChecker);
+                resourceManagerId,
+                executor,
+                resourceAllocator,
+                resourceEventListener,
+                blockedTaskManagerChecker);
         return slotManager;
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
index 6ae3cdeffba..684c4145cea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
@@ -194,13 +194,12 @@ class DeclarativeSlotManagerTest {
         final ResourceRequirements resourceRequirements = createResourceRequirementsForSingleSlot();
 
         CompletableFuture<WorkerResourceSpec> allocateResourceFuture = new CompletableFuture<>();
-        ResourceActions resourceManagerActions =
-                new TestingResourceActionsBuilder()
+        ResourceAllocator resourceAllocator =
+                new TestingResourceAllocatorBuilder()
                         .setAllocateResourceConsumer(allocateResourceFuture::complete)
                         .build();
 
-        try (SlotManager slotManager =
-                createSlotManager(resourceManagerId, resourceManagerActions)) {
+        try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceAllocator)) {
 
             slotManager.processResourceRequirements(resourceRequirements);
 
@@ -218,8 +217,8 @@ class DeclarativeSlotManagerTest {
         final ResourceRequirements resourceRequirements = createResourceRequirementsForSingleSlot();
 
         CompletableFuture<WorkerResourceSpec> allocateResourceFuture = new CompletableFuture<>();
-        ResourceActions resourceManagerActions =
-                new TestingResourceActionsBuilder()
+        ResourceAllocator resourceAllocator =
+                new TestingResourceAllocatorBuilder()
                         .setAllocateResourceConsumer(allocateResourceFuture::complete)
                         .build();
 
@@ -230,7 +229,8 @@ class DeclarativeSlotManagerTest {
                         .buildAndStart(
                                 resourceManagerId,
                                 Executors.directExecutor(),
-                                resourceManagerActions,
+                                resourceAllocator,
+                                new TestingResourceEventListenerBuilder().build(),
                                 blockedTaskManager::equals)) {
 
             final TaskExecutorGateway taskExecutorGateway =
@@ -258,8 +258,8 @@ class DeclarativeSlotManagerTest {
     void testRequirementDeclarationWithResourceAllocationFailure() throws Exception {
         final ResourceRequirements resourceRequirements = createResourceRequirementsForSingleSlot();
 
-        ResourceActions resourceManagerActions =
-                new TestingResourceActionsBuilder()
+        ResourceAllocator resourceAllocator =
+                new TestingResourceAllocatorBuilder()
                         .setAllocateResourceFunction(value -> false)
                         .build();
 
@@ -269,7 +269,7 @@ class DeclarativeSlotManagerTest {
                 createDeclarativeSlotManagerBuilder()
                         .setResourceTracker(resourceTracker)
                         .buildAndStartWithDirectExec(
-                                ResourceManagerId.generate(), resourceManagerActions)) {
+                                ResourceManagerId.generate(), resourceAllocator)) {
 
             slotManager.processResourceRequirements(resourceRequirements);
 
@@ -347,7 +347,7 @@ class DeclarativeSlotManagerTest {
                 createDeclarativeSlotManagerBuilder()
                         .setSlotTracker(slotTracker)
                         .buildAndStartWithDirectExec(
-                                resourceManagerId, new TestingResourceActionsBuilder().build())) {
+                                resourceManagerId, new TestingResourceAllocatorBuilder().build())) {
 
             if (scenario
                     == RequirementDeclarationScenario
@@ -432,8 +432,8 @@ class DeclarativeSlotManagerTest {
     void testDuplicateResourceRequirementDeclarationAfterSuccessfulAllocation() throws Exception {
         final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
         final AtomicInteger allocateResourceCalls = new AtomicInteger(0);
-        final ResourceActions resourceManagerActions =
-                new TestingResourceActionsBuilder()
+        final ResourceAllocator resourceAllocator =
+                new TestingResourceAllocatorBuilder()
                         .setAllocateResourceConsumer(
                                 ignored -> allocateResourceCalls.incrementAndGet())
                         .build();
@@ -454,7 +454,7 @@ class DeclarativeSlotManagerTest {
         try (DeclarativeSlotManager slotManager =
                 createDeclarativeSlotManagerBuilder()
                         .setSlotTracker(slotTracker)
-                        .buildAndStartWithDirectExec(resourceManagerId, resourceManagerActions)) {
+                        .buildAndStartWithDirectExec(resourceManagerId, resourceAllocator)) {
 
             slotManager.registerTaskManager(
                     taskManagerConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
@@ -549,14 +549,13 @@ class DeclarativeSlotManagerTest {
     @Test
     void testReceivingUnknownSlotReport() throws Exception {
         final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
-        final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
+        final ResourceAllocator resourceAllocator = new TestingResourceAllocatorBuilder().build();
 
         final InstanceID unknownInstanceID = new InstanceID();
         final SlotID unknownSlotId = new SlotID(ResourceID.generate(), 0);
         final SlotReport unknownSlotReport = new SlotReport(createFreeSlotStatus(unknownSlotId));
 
-        try (SlotManager slotManager =
-                createSlotManager(resourceManagerId, resourceManagerActions)) {
+        try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceAllocator)) {
             // check that we don't have any slots registered
             assertThat(slotManager.getNumberRegisteredSlots()).isEqualTo(0);
 
@@ -656,7 +655,8 @@ class DeclarativeSlotManagerTest {
                         .buildAndStart(
                                 ResourceManagerId.generate(),
                                 mainThreadExecutor,
-                                new TestingResourceActionsBuilder().build())) {
+                                new TestingResourceAllocatorBuilder().build(),
+                                new TestingResourceEventListenerBuilder().build())) {
 
             CompletableFuture.runAsync(
                             () ->
@@ -790,7 +790,8 @@ class DeclarativeSlotManagerTest {
                         .buildAndStart(
                                 ResourceManagerId.generate(),
                                 mainThreadExecutor,
-                                new TestingResourceActionsBuilder().build())) {
+                                new TestingResourceAllocatorBuilder().build(),
+                                new TestingResourceEventListenerBuilder().build())) {
 
             slotManager.registerTaskManager(
                     taskExecutorConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
@@ -1073,8 +1074,8 @@ class DeclarativeSlotManagerTest {
     void testRequestNewResources() throws Exception {
         final int numberSlots = 2;
         final AtomicInteger resourceRequests = new AtomicInteger(0);
-        final TestingResourceActions testingResourceActions =
-                new TestingResourceActionsBuilder()
+        final TestingResourceAllocator testingResourceAllocator =
+                new TestingResourceAllocatorBuilder()
                         .setAllocateResourceFunction(
                                 ignored -> {
                                     resourceRequests.incrementAndGet();
@@ -1084,7 +1085,7 @@ class DeclarativeSlotManagerTest {
 
         try (final DeclarativeSlotManager slotManager =
                 createSlotManager(
-                        ResourceManagerId.generate(), testingResourceActions, numberSlots)) {
+                        ResourceManagerId.generate(), testingResourceAllocator, numberSlots)) {
 
             final JobID jobId = new JobID();
 
@@ -1190,10 +1191,14 @@ class DeclarativeSlotManagerTest {
 
         List<Tuple2<JobID, Collection<ResourceRequirement>>> notEnoughResourceNotifications =
                 new ArrayList<>();
-        ResourceActions resourceManagerActions =
-                new TestingResourceActionsBuilder()
+        ResourceAllocator resourceAllocator =
+                new TestingResourceAllocatorBuilder()
                         .setAllocateResourceFunction(ignored -> false)
-                        .setNotEnoughResourcesConsumer(
+                        .build();
+
+        ResourceEventListener resourceEventListener =
+                new TestingResourceEventListenerBuilder()
+                        .setNotEnoughResourceAvailableConsumer(
                                 (jobId1, acquiredResources) ->
                                         notEnoughResourceNotifications.add(
                                                 Tuple2.of(jobId1, acquiredResources)))
@@ -1204,7 +1209,8 @@ class DeclarativeSlotManagerTest {
                         .buildAndStart(
                                 ResourceManagerId.generate(),
                                 new ManuallyTriggeredScheduledExecutor(),
-                                resourceManagerActions)) {
+                                resourceAllocator,
+                                resourceEventListener)) {
 
             if (withNotificationGracePeriod) {
                 // this should disable notifications
@@ -1271,7 +1277,8 @@ class DeclarativeSlotManagerTest {
                         .buildAndStart(
                                 ResourceManagerId.generate(),
                                 executor,
-                                new TestingResourceActionsBuilder().build())) {
+                                new TestingResourceAllocatorBuilder().build(),
+                                new TestingResourceEventListenerBuilder().build())) {
 
             JobID jobId = new JobID();
             slotManager.processResourceRequirements(createResourceRequirements(jobId, 1));
@@ -1318,7 +1325,8 @@ class DeclarativeSlotManagerTest {
                         .buildAndStart(
                                 ResourceManagerId.generate(),
                                 executor,
-                                new TestingResourceActionsBuilder().build())) {
+                                new TestingResourceAllocatorBuilder().build(),
+                                new TestingResourceEventListenerBuilder().build())) {
 
             JobID jobId = new JobID();
             slotManager.processResourceRequirements(createResourceRequirements(jobId, 1));
@@ -1372,7 +1380,8 @@ class DeclarativeSlotManagerTest {
                         .buildAndStart(
                                 ResourceManagerId.generate(),
                                 ComponentMainThreadExecutorServiceAdapter.forMainThread(),
-                                new TestingResourceActionsBuilder().build())) {
+                                new TestingResourceAllocatorBuilder().build(),
+                                new TestingResourceEventListenerBuilder().build())) {
 
             final TaskExecutorConnection taskExecutionConnection =
                     createTaskExecutorConnection(taskExecutorGateway);
@@ -1422,7 +1431,8 @@ class DeclarativeSlotManagerTest {
                         .buildAndStart(
                                 ResourceManagerId.generate(),
                                 ComponentMainThreadExecutorServiceAdapter.forMainThread(),
-                                new TestingResourceActionsBuilder().build())) {
+                                new TestingResourceAllocatorBuilder().build(),
+                                new TestingResourceEventListenerBuilder().build())) {
 
             final JobID jobId = new JobID();
 
@@ -1461,7 +1471,7 @@ class DeclarativeSlotManagerTest {
                         .setRequirementCheckDelay(delay)
                         .buildAndStartWithDirectExec(
                                 ResourceManagerId.generate(),
-                                new TestingResourceActionsBuilder()
+                                new TestingResourceAllocatorBuilder()
                                         .setAllocateResourceConsumer(
                                                 workerResourceSpec ->
                                                         allocatedResourceCounter.getAndIncrement())
@@ -1502,7 +1512,8 @@ class DeclarativeSlotManagerTest {
                         .buildAndStart(
                                 ResourceManagerId.generate(),
                                 ComponentMainThreadExecutorServiceAdapter.forMainThread(),
-                                new TestingResourceActionsBuilder().build())) {
+                                new TestingResourceAllocatorBuilder().build(),
+                                new TestingResourceEventListenerBuilder().build())) {
 
             final JobID jobId = new JobID();
 
@@ -1585,19 +1596,19 @@ class DeclarativeSlotManagerTest {
     }
 
     private DeclarativeSlotManager createSlotManager(
-            ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions) {
-        return createSlotManager(resourceManagerId, resourceManagerActions, 1);
+            ResourceManagerId resourceManagerId, ResourceAllocator resourceAllocator) {
+        return createSlotManager(resourceManagerId, resourceAllocator, 1);
     }
 
     private DeclarativeSlotManager createSlotManager(
             ResourceManagerId resourceManagerId,
-            ResourceActions resourceManagerActions,
+            ResourceAllocator resourceAllocator,
             int numSlotsPerWorker) {
         return createDeclarativeSlotManagerBuilder(
                         new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()))
                 .setNumSlotsPerWorker(numSlotsPerWorker)
                 .setRedundantTaskManagerNum(0)
-                .buildAndStartWithDirectExec(resourceManagerId, resourceManagerActions);
+                .buildAndStartWithDirectExec(resourceManagerId, resourceAllocator);
     }
 
     private static DeclarativeSlotManagerBuilder createDeclarativeSlotManagerBuilder() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
index d68320761df..73e0355bd31 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
@@ -52,7 +52,7 @@ class FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase
 
         new Context() {
             {
-                resourceActionsBuilder.setAllocateResourceFunction(
+                resourceAllocatorBuilder.setAllocateResourceFunction(
                         ignored -> {
                             resourceRequests.incrementAndGet();
                             return true;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
index bfc438c4316..0c85473d13a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
@@ -378,7 +378,7 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
         allocateResourceFutures.add(new CompletableFuture<>());
         new Context() {
             {
-                resourceActionsBuilder.setAllocateResourceConsumer(
+                resourceAllocatorBuilder.setAllocateResourceConsumer(
                         ignored -> {
                             if (allocateResourceFutures.get(0).isDone()) {
                                 allocateResourceFutures.get(1).complete(null);
@@ -445,7 +445,7 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
                                                 pendingTaskManager.getPendingTaskManagerId(),
                                                 DEFAULT_SLOT_RESOURCE_PROFILE)
                                         .build()));
-                resourceActionsBuilder.setAllocateResourceConsumer(
+                resourceAllocatorBuilder.setAllocateResourceConsumer(
                         ignored -> requestResourceFuture.complete(null));
                 runTest(
                         () -> {
@@ -496,7 +496,7 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
         final CompletableFuture<Void> notifyNotEnoughResourceFuture = new CompletableFuture<>();
         new Context() {
             {
-                resourceActionsBuilder.setNotEnoughResourcesConsumer(
+                resourceEventListenerBuilder.setNotEnoughResourceAvailableConsumer(
                         (jobId1, acquiredResources) -> {
                             notEnoughResourceNotifications.add(
                                     Tuple2.of(jobId1, acquiredResources));
@@ -638,7 +638,7 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
         final InstanceID instanceId = taskExecutionConnection.getInstanceID();
         new Context() {
             {
-                resourceActionsBuilder.setReleaseResourceConsumer(
+                resourceAllocatorBuilder.setReleaseResourceConsumer(
                         (instanceID, e) -> releaseResourceFuture.complete(instanceID));
                 slotManagerConfigurationBuilder.setTaskManagerTimeout(taskManagerTimeout);
                 runTest(
@@ -827,7 +827,7 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
             {
                 maxTotalResourceSetter.accept(slotManagerConfigurationBuilder);
 
-                resourceActionsBuilder.setAllocateResourceConsumer(
+                resourceAllocatorBuilder.setAllocateResourceConsumer(
                         ignored -> {
                             if (allocateResourceFutures.get(0).isDone()) {
                                 allocateResourceFutures.get(1).complete(null);
@@ -876,7 +876,7 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
             {
                 maxTotalResourceSetter.accept(slotManagerConfigurationBuilder);
 
-                resourceActionsBuilder.setReleaseResourceConsumer(
+                resourceAllocatorBuilder.setReleaseResourceConsumer(
                         (instanceId, ignore) -> releaseResourceFuture.complete(instanceId));
 
                 runTest(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
index 049ef97bc0e..f2e8e4f9d76 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
@@ -157,8 +157,11 @@ abstract class FineGrainedSlotManagerTestBase {
         final TestingResourceAllocationStrategy.Builder resourceAllocationStrategyBuilder =
                 TestingResourceAllocationStrategy.newBuilder();
 
-        final TestingResourceActionsBuilder resourceActionsBuilder =
-                new TestingResourceActionsBuilder();
+        final TestingResourceAllocatorBuilder resourceAllocatorBuilder =
+                new TestingResourceAllocatorBuilder();
+
+        final TestingResourceEventListenerBuilder resourceEventListenerBuilder =
+                new TestingResourceEventListenerBuilder();
         final SlotManagerConfigurationBuilder slotManagerConfigurationBuilder =
                 SlotManagerConfigurationBuilder.newBuilder();
 
@@ -221,7 +224,8 @@ abstract class FineGrainedSlotManagerTestBase {
                             slotManager.start(
                                     resourceManagerId,
                                     mainThreadExecutor,
-                                    resourceActionsBuilder.build(),
+                                    resourceAllocatorBuilder.build(),
+                                    resourceEventListenerBuilder.build(),
                                     blockedTaskManagerChecker));
 
             testMethod.run();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerBuilder.java
index 157e894aea1..88e37e1ba16 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerBuilder.java
@@ -35,7 +35,7 @@ public class TaskExecutorManagerBuilder {
     private Time taskManagerTimeout = Time.seconds(5);
     private final ScheduledExecutor scheduledExecutor;
     private Executor mainThreadExecutor = Executors.directExecutor();
-    private ResourceActions newResourceActions = new TestingResourceActionsBuilder().build();
+    private ResourceAllocator newResourceAllocator = new TestingResourceAllocatorBuilder().build();
 
     public TaskExecutorManagerBuilder(ScheduledExecutor scheduledExecutor) {
         this.scheduledExecutor = scheduledExecutor;
@@ -78,8 +78,8 @@ public class TaskExecutorManagerBuilder {
         return this;
     }
 
-    public TaskExecutorManagerBuilder setResourceActions(ResourceActions newResourceActions) {
-        this.newResourceActions = newResourceActions;
+    public TaskExecutorManagerBuilder setResourceAllocator(ResourceAllocator newResourceAllocator) {
+        this.newResourceAllocator = newResourceAllocator;
         return this;
     }
 
@@ -93,6 +93,6 @@ public class TaskExecutorManagerBuilder {
                 taskManagerTimeout,
                 scheduledExecutor,
                 mainThreadExecutor,
-                newResourceActions);
+                newResourceAllocator);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
index 28593e31f76..ed01b63b4c7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
@@ -133,7 +133,7 @@ public class TaskExecutorManagerTest extends TestLogger {
 
     /**
      * Tests that a task manager timeout does not remove the slots from the SlotManager. A timeout
-     * should only trigger the {@link ResourceActions#releaseResource(InstanceID, Exception)}
+     * should only trigger the {@link ResourceAllocator#releaseResource(InstanceID, Exception)}
      * callback. The receiver of the callback can then decide what to do with the TaskManager.
      *
      * <p>See FLINK-7793
@@ -143,8 +143,8 @@ public class TaskExecutorManagerTest extends TestLogger {
         final Time taskManagerTimeout = Time.milliseconds(10L);
 
         final CompletableFuture<InstanceID> releaseResourceFuture = new CompletableFuture<>();
-        final ResourceActions resourceActions =
-                createResourceActionsBuilder()
+        final ResourceAllocator resourceAllocator =
+                createResourceAllocatorBuilder()
                         .setReleaseResourceConsumer(
                                 (instanceId, ignored) -> releaseResourceFuture.complete(instanceId))
                         .build();
@@ -154,7 +154,7 @@ public class TaskExecutorManagerTest extends TestLogger {
         try (final TaskExecutorManager taskExecutorManager =
                 createTaskExecutorManagerBuilder()
                         .setTaskManagerTimeout(taskManagerTimeout)
-                        .setResourceActions(resourceActions)
+                        .setResourceAllocator(resourceAllocator)
                         .setMainThreadExecutor(mainThreadExecutor)
                         .createTaskExecutorManager()) {
 
@@ -195,8 +195,8 @@ public class TaskExecutorManagerTest extends TestLogger {
         final Time taskManagerTimeout = Time.milliseconds(50L);
 
         final CompletableFuture<InstanceID> releaseResourceFuture = new CompletableFuture<>();
-        final ResourceActions resourceManagerActions =
-                new TestingResourceActionsBuilder()
+        final ResourceAllocator resourceAllocator =
+                new TestingResourceAllocatorBuilder()
                         .setReleaseResourceConsumer(
                                 (instanceID, e) -> releaseResourceFuture.complete(instanceID))
                         .build();
@@ -207,7 +207,7 @@ public class TaskExecutorManagerTest extends TestLogger {
                 createTaskExecutorManagerBuilder()
                         .setTaskManagerTimeout(taskManagerTimeout)
                         .setDefaultWorkerResourceSpec(workerResourceSpec)
-                        .setResourceActions(resourceManagerActions)
+                        .setResourceAllocator(resourceAllocator)
                         .setMainThreadExecutor(mainThreadExecutor)
                         .createTaskExecutorManager()) {
 
@@ -248,8 +248,8 @@ public class TaskExecutorManagerTest extends TestLogger {
                 ResourceProfile.newBuilder().setCpuCores(numCoresPerWorker + 1).build();
 
         final AtomicInteger resourceRequests = new AtomicInteger(0);
-        ResourceActions resourceActions =
-                createResourceActionsBuilder()
+        ResourceAllocator resourceAllocator =
+                createResourceAllocatorBuilder()
                         .setAllocateResourceFunction(
                                 ignored -> {
                                     resourceRequests.incrementAndGet();
@@ -262,7 +262,7 @@ public class TaskExecutorManagerTest extends TestLogger {
                         .setDefaultWorkerResourceSpec(workerResourceSpec)
                         .setNumSlotsPerWorker(1)
                         .setMaxNumSlots(1)
-                        .setResourceActions(resourceActions)
+                        .setResourceAllocator(resourceAllocator)
                         .createTaskExecutorManager()) {
 
             assertThat(
@@ -281,8 +281,8 @@ public class TaskExecutorManagerTest extends TestLogger {
         final int maxSlotNum = 1;
 
         final AtomicInteger resourceRequests = new AtomicInteger(0);
-        ResourceActions resourceActions =
-                createResourceActionsBuilder()
+        ResourceAllocator resourceAllocator =
+                createResourceAllocatorBuilder()
                         .setAllocateResourceFunction(
                                 ignored -> {
                                     resourceRequests.incrementAndGet();
@@ -294,7 +294,7 @@ public class TaskExecutorManagerTest extends TestLogger {
                 createTaskExecutorManagerBuilder()
                         .setNumSlotsPerWorker(numberSlots)
                         .setMaxNumSlots(maxSlotNum)
-                        .setResourceActions(resourceActions)
+                        .setResourceAllocator(resourceAllocator)
                         .createTaskExecutorManager()) {
 
             assertThat(resourceRequests.get(), is(0));
@@ -317,8 +317,8 @@ public class TaskExecutorManagerTest extends TestLogger {
         final int maxSlotNum = 1;
 
         final CompletableFuture<InstanceID> releasedResourceFuture = new CompletableFuture<>();
-        ResourceActions resourceActions =
-                createResourceActionsBuilder()
+        ResourceAllocator resourceAllocator =
+                createResourceAllocatorBuilder()
                         .setReleaseResourceConsumer(
                                 (instanceID, e) -> releasedResourceFuture.complete(instanceID))
                         .build();
@@ -327,7 +327,7 @@ public class TaskExecutorManagerTest extends TestLogger {
                 createTaskExecutorManagerBuilder()
                         .setNumSlotsPerWorker(numberSlots)
                         .setMaxNumSlots(maxSlotNum)
-                        .setResourceActions(resourceActions)
+                        .setResourceAllocator(resourceAllocator)
                         .createTaskExecutorManager()) {
 
             createAndRegisterTaskExecutor(taskExecutorManager, 1, ResourceProfile.ANY);
@@ -376,11 +376,11 @@ public class TaskExecutorManagerTest extends TestLogger {
     private static TaskExecutorManagerBuilder createTaskExecutorManagerBuilder() {
         return new TaskExecutorManagerBuilder(
                         new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()))
-                .setResourceActions(createResourceActionsBuilder().build());
+                .setResourceAllocator(createResourceAllocatorBuilder().build());
     }
 
-    private static TestingResourceActionsBuilder createResourceActionsBuilder() {
-        return new TestingResourceActionsBuilder()
+    private static TestingResourceAllocatorBuilder createResourceAllocatorBuilder() {
+        return new TestingResourceAllocatorBuilder()
                 // ensures we do something when excess resource are requested
                 .setAllocateResourceFunction(ignored -> true);
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java
similarity index 67%
rename from flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java
rename to flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java
index a302b2001e7..83207ce41cd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java
@@ -18,37 +18,26 @@
 
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
-import org.apache.flink.runtime.slots.ResourceRequirement;
 
 import javax.annotation.Nonnull;
 
-import java.util.Collection;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 
-/** Testing implementation of the {@link ResourceActions}. */
-public class TestingResourceActions implements ResourceActions {
+/** Testing implementation of the {@link ResourceAllocator}. */
+public class TestingResourceAllocator implements ResourceAllocator {
 
     @Nonnull private final BiConsumer<InstanceID, Exception> releaseResourceConsumer;
 
     @Nonnull private final Function<WorkerResourceSpec, Boolean> allocateResourceFunction;
 
-    @Nonnull
-    private final BiConsumer<JobID, Collection<ResourceRequirement>>
-            notifyNotEnoughResourcesConsumer;
-
-    public TestingResourceActions(
+    public TestingResourceAllocator(
             @Nonnull BiConsumer<InstanceID, Exception> releaseResourceConsumer,
-            @Nonnull Function<WorkerResourceSpec, Boolean> allocateResourceFunction,
-            @Nonnull
-                    BiConsumer<JobID, Collection<ResourceRequirement>>
-                            notifyNotEnoughResourcesConsumer) {
+            @Nonnull Function<WorkerResourceSpec, Boolean> allocateResourceFunction) {
         this.releaseResourceConsumer = releaseResourceConsumer;
         this.allocateResourceFunction = allocateResourceFunction;
-        this.notifyNotEnoughResourcesConsumer = notifyNotEnoughResourcesConsumer;
     }
 
     @Override
@@ -60,10 +49,4 @@ public class TestingResourceActions implements ResourceActions {
     public boolean allocateResource(WorkerResourceSpec workerResourceSpec) {
         return allocateResourceFunction.apply(workerResourceSpec);
     }
-
-    @Override
-    public void notifyNotEnoughResourcesAvailable(
-            JobID jobId, Collection<ResourceRequirement> acquiredResources) {
-        notifyNotEnoughResourcesConsumer.accept(jobId, acquiredResources);
-    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocatorBuilder.java
similarity index 64%
rename from flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java
rename to flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocatorBuilder.java
index a2c1a07c30a..8dc5abd2a81 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocatorBuilder.java
@@ -18,36 +18,31 @@
 
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
-import org.apache.flink.runtime.slots.ResourceRequirement;
 
-import java.util.Collection;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
-/** Builder for the {@link TestingResourceActions}. */
-public class TestingResourceActionsBuilder {
+/** Builder for the {@link TestingResourceAllocator}. */
+public class TestingResourceAllocatorBuilder {
     private BiConsumer<InstanceID, Exception> releaseResourceConsumer = (ignoredA, ignoredB) -> {};
     private Function<WorkerResourceSpec, Boolean> allocateResourceFunction = (ignored) -> true;
-    private BiConsumer<JobID, Collection<ResourceRequirement>> notifyNotEnoughResourcesConsumer =
-            (ignoredA, ignoredB) -> {};
 
-    public TestingResourceActionsBuilder setReleaseResourceConsumer(
+    public TestingResourceAllocatorBuilder setReleaseResourceConsumer(
             BiConsumer<InstanceID, Exception> releaseResourceConsumer) {
         this.releaseResourceConsumer = releaseResourceConsumer;
         return this;
     }
 
-    public TestingResourceActionsBuilder setAllocateResourceFunction(
+    public TestingResourceAllocatorBuilder setAllocateResourceFunction(
             Function<WorkerResourceSpec, Boolean> allocateResourceFunction) {
         this.allocateResourceFunction = allocateResourceFunction;
         return this;
     }
 
-    public TestingResourceActionsBuilder setAllocateResourceConsumer(
+    public TestingResourceAllocatorBuilder setAllocateResourceConsumer(
             Consumer<WorkerResourceSpec> allocateResourceConsumer) {
         this.allocateResourceFunction =
                 workerRequest -> {
@@ -57,16 +52,7 @@ public class TestingResourceActionsBuilder {
         return this;
     }
 
-    public TestingResourceActionsBuilder setNotEnoughResourcesConsumer(
-            BiConsumer<JobID, Collection<ResourceRequirement>> notifyNotEnoughResourcesConsumer) {
-        this.notifyNotEnoughResourcesConsumer = notifyNotEnoughResourcesConsumer;
-        return this;
-    }
-
-    public TestingResourceActions build() {
-        return new TestingResourceActions(
-                releaseResourceConsumer,
-                allocateResourceFunction,
-                notifyNotEnoughResourcesConsumer);
+    public TestingResourceAllocator build() {
+        return new TestingResourceAllocator(releaseResourceConsumer, allocateResourceFunction);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceEventListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceEventListener.java
new file mode 100644
index 00000000000..bff5f30f04a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceEventListener.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.function.BiConsumer;
+
+/** Testing implementation of the {@link ResourceEventListener}. */
+public class TestingResourceEventListener implements ResourceEventListener {
+
+    @Nonnull
+    private final BiConsumer<JobID, Collection<ResourceRequirement>>
+            notEnoughResourceAvailableConsumer;
+
+    public TestingResourceEventListener(
+            @Nonnull
+                    BiConsumer<JobID, Collection<ResourceRequirement>>
+                            notEnoughResourceAvailableConsumer) {
+        this.notEnoughResourceAvailableConsumer = notEnoughResourceAvailableConsumer;
+    }
+
+    @Override
+    public void notEnoughResourceAvailable(
+            JobID jobId, Collection<ResourceRequirement> acquiredResources) {
+        notEnoughResourceAvailableConsumer.accept(jobId, acquiredResources);
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceEventListenerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceEventListenerBuilder.java
new file mode 100644
index 00000000000..5e2e8d884ec
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceEventListenerBuilder.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+
+import java.util.Collection;
+import java.util.function.BiConsumer;
+
+/** Builder for the {@link TestingResourceEventListener}. */
+public class TestingResourceEventListenerBuilder {
+    private BiConsumer<JobID, Collection<ResourceRequirement>> notEnoughResourceAvailableConsumer =
+            (ignoredA, ignoredB) -> {};
+
+    public TestingResourceEventListenerBuilder setNotEnoughResourceAvailableConsumer(
+            BiConsumer<JobID, Collection<ResourceRequirement>> notEnoughResourceAvailableConsumer) {
+        this.notEnoughResourceAvailableConsumer = notEnoughResourceAvailableConsumer;
+        return this;
+    }
+
+    public TestingResourceEventListener build() {
+        return new TestingResourceEventListener(notEnoughResourceAvailableConsumer);
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java
index 4494320d488..bb740f8fb59 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java
@@ -114,7 +114,8 @@ public class TestingSlotManager implements SlotManager {
     public void start(
             ResourceManagerId newResourceManagerId,
             Executor newMainThreadExecutor,
-            ResourceActions newResourceActions,
+            ResourceAllocator newResourceAllocator,
+            ResourceEventListener resourceEventListener,
             BlockedTaskManagerChecker newBlockedTaskManagerChecker) {}
 
     @Override


[flink] 02/02: [FLINK-29870][runtime] move ResourceAllocator to ActiveResourceManager.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 209df810f13e2fcbe5a4ca8bb015a8a5f662adc4
Author: Weihua Hu <hu...@gmail.com>
AuthorDate: Thu Nov 3 21:13:36 2022 +0800

    [FLINK-29870][runtime] move ResourceAllocator to ActiveResourceManager.
    
    This closes #21233
---
 .../runtime/resourcemanager/ResourceManager.java   | 75 ++++++++--------------
 .../resourcemanager/StandaloneResourceManager.java | 23 +++----
 .../active/ActiveResourceManager.java              | 70 +++++++++++++++-----
 .../slotmanager/FineGrainedSlotManager.java        | 25 +++++---
 .../NonSupportedResourceAllocatorImpl.java}        | 34 +++++-----
 .../slotmanager/ResourceAllocator.java             | 14 +++-
 .../slotmanager/TaskExecutorManager.java           | 27 ++++----
 .../resourcemanager/ResourceManagerTest.java       | 31 +++------
 .../resourcemanager/TestingResourceManager.java    | 27 ++++----
 .../TestingResourceManagerFactory.java             | 20 +++---
 .../active/ActiveResourceManagerTest.java          | 71 ++++++++++----------
 .../slotmanager/DeclarativeSlotManagerTest.java    | 18 ++----
 ...gerDefaultResourceAllocationStrategyITCase.java |  7 +-
 .../slotmanager/TaskExecutorManagerTest.java       | 14 +---
 .../slotmanager/TestingResourceAllocator.java      | 23 +++++--
 .../TestingResourceAllocatorBuilder.java           | 17 +----
 16 files changed, 246 insertions(+), 250 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 3abd6b6b68d..5d38143974d 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -169,6 +169,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 
     private final AtomicReference<byte[]> latestTokens = new AtomicReference<>();
 
+    private final ResourceAllocator resourceAllocator;
+
     public ResourceManager(
             RpcService rpcService,
             UUID leaderSessionId,
@@ -234,6 +236,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
         this.startedFuture = new CompletableFuture<>();
 
         this.delegationTokenManager = delegationTokenManager;
+
+        this.resourceAllocator = getResourceAllocator();
     }
 
     // ------------------------------------------------------------------------
@@ -267,7 +271,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
             slotManager.start(
                     getFencingToken(),
                     getMainThreadExecutor(),
-                    new ResourceAllocatorImpl(),
+                    resourceAllocator,
                     new ResourceEventListenerImpl(),
                     blocklistHandler::isBlockedTaskManager);
 
@@ -533,7 +537,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 
     @Override
     public void disconnectTaskManager(final ResourceID resourceId, final Exception cause) {
-        closeTaskManagerConnection(resourceId, cause).ifPresent(ResourceManager.this::stopWorker);
+        closeTaskManagerConnection(resourceId, cause)
+                .ifPresent(ResourceManager.this::stopWorkerIfSupported);
     }
 
     @Override
@@ -984,10 +989,11 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
                                     taskExecutorResourceId.getStringWithMetadata())));
         }
 
-        final WorkerType newWorker = workerStarted(taskExecutorResourceId);
+        final Optional<WorkerType> newWorkerOptional =
+                getWorkerNodeIfAcceptRegistration(taskExecutorResourceId);
 
         String taskExecutorAddress = taskExecutorRegistration.getTaskExecutorAddress();
-        if (newWorker == null) {
+        if (!newWorkerOptional.isPresent()) {
             log.warn(
                     "Discard registration from TaskExecutor {} at ({}) because the framework did "
                             + "not recognize it",
@@ -996,6 +1002,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
             return new TaskExecutorRegistrationRejection(
                     "The ResourceManager does not recognize this TaskExecutor.");
         } else {
+            WorkerType newWorker = newWorkerOptional.get();
             WorkerRegistration<WorkerType> registration =
                     new WorkerRegistration<>(
                             taskExecutorGateway,
@@ -1159,9 +1166,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
         }
     }
 
-    protected void releaseResource(InstanceID instanceId, Exception cause) {
+    protected WorkerType getWorkerByInstanceId(InstanceID instanceId) {
         WorkerType worker = null;
-
         // TODO: Improve performance by having an index on the instanceId
         for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> entry :
                 taskExecutors.entrySet()) {
@@ -1171,18 +1177,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
             }
         }
 
-        if (worker != null) {
-            if (stopWorker(worker)) {
-                closeTaskManagerConnection(worker.getResourceID(), cause);
-            } else {
-                log.debug(
-                        "Worker {} could not be stopped.",
-                        worker.getResourceID().getStringWithMetadata());
-            }
-        } else {
-            // unregister in order to clean up potential left over state
-            slotManager.unregisterTaskManager(instanceId, cause);
-        }
+        return worker;
     }
 
     private enum ResourceRequirementHandling {
@@ -1265,29 +1260,23 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
             throws ResourceManagerException;
 
     /**
-     * Allocates a resource using the worker resource specification.
-     *
-     * @param workerResourceSpec workerResourceSpec specifies the size of the to be allocated
-     *     resource
-     * @return whether the resource can be allocated
-     */
-    @VisibleForTesting
-    public abstract boolean startNewWorker(WorkerResourceSpec workerResourceSpec);
-
-    /**
-     * Callback when a worker was started.
+     * Get worker node if the worker resource is accepted.
      *
      * @param resourceID The worker resource id
      */
-    protected abstract WorkerType workerStarted(ResourceID resourceID);
+    protected abstract Optional<WorkerType> getWorkerNodeIfAcceptRegistration(
+            ResourceID resourceID);
 
     /**
-     * Stops the given worker.
+     * Stops the given worker if supported.
      *
      * @param worker The worker.
-     * @return True if the worker was stopped, otherwise false
      */
-    public abstract boolean stopWorker(WorkerType worker);
+    public void stopWorkerIfSupported(WorkerType worker) {
+        if (resourceAllocator.isSupported()) {
+            resourceAllocator.releaseResource(worker.getResourceID());
+        }
+    }
 
     /**
      * Get the ready to serve future of the resource manager.
@@ -1297,6 +1286,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
      */
     protected abstract CompletableFuture<Void> getReadyToServeFuture();
 
+    protected abstract ResourceAllocator getResourceAllocator();
+
     /**
      * Set {@link SlotManager} whether to fail unfulfillable slot requests.
      *
@@ -1336,22 +1327,6 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
         }
     }
 
-    private class ResourceAllocatorImpl implements ResourceAllocator {
-
-        @Override
-        public void releaseResource(InstanceID instanceId, Exception cause) {
-            validateRunsInMainThread();
-
-            ResourceManager.this.releaseResource(instanceId, cause);
-        }
-
-        @Override
-        public boolean allocateResource(WorkerResourceSpec workerResourceSpec) {
-            validateRunsInMainThread();
-            return startNewWorker(workerResourceSpec);
-        }
-    }
-
     private class ResourceEventListenerImpl implements ResourceEventListener {
         @Override
         public void notEnoughResourceAvailable(
@@ -1419,7 +1394,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
         private void handleTaskManagerConnectionLoss(ResourceID resourceID, Exception cause) {
             validateRunsInMainThread();
             closeTaskManagerConnection(resourceID, cause)
-                    .ifPresent(ResourceManager.this::stopWorker);
+                    .ifPresent(ResourceManager.this::stopWorkerIfSupported);
         }
 
         @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index f41f5bb4d79..e7d431702dd 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -27,6 +27,8 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerFactory;
 import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.NonSupportedResourceAllocatorImpl;
+import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocator;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -35,6 +37,7 @@ import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -100,19 +103,8 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
             ApplicationStatus finalStatus, @Nullable String diagnostics) {}
 
     @Override
-    public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) {
-        return false;
-    }
-
-    @Override
-    public boolean stopWorker(ResourceID resourceID) {
-        // standalone resource manager cannot stop workers
-        return false;
-    }
-
-    @Override
-    protected ResourceID workerStarted(ResourceID resourceID) {
-        return resourceID;
+    protected Optional<ResourceID> getWorkerNodeIfAcceptRegistration(ResourceID resourceID) {
+        return Optional.of(resourceID);
     }
 
     private void startStartupPeriod() {
@@ -132,4 +124,9 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
     public CompletableFuture<Void> getReadyToServeFuture() {
         return CompletableFuture.completedFuture(null);
     }
+
+    @Override
+    protected ResourceAllocator getResourceAllocator() {
+        return NonSupportedResourceAllocatorImpl.INSTANCE;
+    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
index e5cb5315d63..61e2ea234ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerFactory;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.ThresholdMeter;
@@ -38,6 +39,7 @@ import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
 import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocator;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -54,6 +56,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
@@ -199,20 +202,8 @@ public class ActiveResourceManager<WorkerType extends ResourceIDRetrievable>
     }
 
     @Override
-    public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) {
-        requestNewWorker(workerResourceSpec);
-        return true;
-    }
-
-    @Override
-    protected WorkerType workerStarted(ResourceID resourceID) {
-        return workerNodeMap.get(resourceID);
-    }
-
-    @Override
-    public boolean stopWorker(WorkerType worker) {
-        internalStopWorker(worker.getResourceID());
-        return true;
+    protected Optional<WorkerType> getWorkerNodeIfAcceptRegistration(ResourceID resourceID) {
+        return Optional.ofNullable(workerNodeMap.get(resourceID));
     }
 
     @Override
@@ -299,7 +290,24 @@ public class ActiveResourceManager<WorkerType extends ResourceIDRetrievable>
     //  Internal
     // ------------------------------------------------------------------------
 
-    private void requestNewWorker(WorkerResourceSpec workerResourceSpec) {
+    private void releaseResource(InstanceID instanceId, Exception cause) {
+        WorkerType worker = getWorkerByInstanceId(instanceId);
+        if (worker != null) {
+            internalStopWorker(worker.getResourceID());
+            closeTaskManagerConnection(worker.getResourceID(), cause);
+        } else {
+            log.debug("Instance {} not found in ResourceManager.", instanceId);
+        }
+    }
+
+    /**
+     * Allocates a resource using the worker resource specification.
+     *
+     * @param workerResourceSpec workerResourceSpec specifies the size of the to be allocated
+     *     resource
+     */
+    @VisibleForTesting
+    public void requestNewWorker(WorkerResourceSpec workerResourceSpec) {
         final TaskExecutorProcessSpec taskExecutorProcessSpec =
                 TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(
                         flinkConfig, workerResourceSpec);
@@ -451,6 +459,11 @@ public class ActiveResourceManager<WorkerType extends ResourceIDRetrievable>
         return readyToServeFuture;
     }
 
+    @Override
+    protected ResourceAllocator getResourceAllocator() {
+        return new ResourceAllocatorImpl();
+    }
+
     private void tryRemovePreviousPendingRecoveryTaskManager(ResourceID resourceID) {
         long sizeBeforeRemove = previousAttemptUnregisteredWorkers.size();
         if (previousAttemptUnregisteredWorkers.remove(resourceID)) {
@@ -499,6 +512,33 @@ public class ActiveResourceManager<WorkerType extends ResourceIDRetrievable>
         }
     }
 
+    private class ResourceAllocatorImpl implements ResourceAllocator {
+
+        @Override
+        public boolean isSupported() {
+            return true;
+        }
+
+        @Override
+        public void releaseResource(InstanceID instanceId, Exception cause) {
+            validateRunsInMainThread();
+
+            ActiveResourceManager.this.releaseResource(instanceId, cause);
+        }
+
+        @Override
+        public void releaseResource(ResourceID resourceID) {
+            validateRunsInMainThread();
+            internalStopWorker(resourceID);
+        }
+
+        @Override
+        public void allocateResource(WorkerResourceSpec workerResourceSpec) {
+            validateRunsInMainThread();
+            requestNewWorker(workerResourceSpec);
+        }
+    }
+
     // ------------------------------------------------------------------------
     //  Testing
     // ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
index c999d9883de..125679cedef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
@@ -346,8 +346,10 @@ public class FineGrainedSlotManager implements SlotManager {
                             : findMatchingPendingTaskManager(
                                     totalResourceProfile, defaultSlotResourceProfile);
 
-            if (!matchedPendingTaskManagerOptional.isPresent()
+            if (resourceAllocator.isSupported()
+                    && !matchedPendingTaskManagerOptional.isPresent()
                     && isMaxTotalResourceExceededAfterAdding(totalResourceProfile)) {
+
                 LOG.info(
                         "Releasing task manager {}. The max total resource limitation <{}, {}> is reached.",
                         taskExecutorConnection.getResourceID(),
@@ -357,6 +359,7 @@ public class FineGrainedSlotManager implements SlotManager {
                         taskExecutorConnection.getInstanceID(),
                         new FlinkExpectedException(
                                 "The max total resource limitation is reached."));
+
                 return false;
             }
 
@@ -735,12 +738,19 @@ public class FineGrainedSlotManager implements SlotManager {
     }
 
     private void releaseIdleTaskExecutor(InstanceID timedOutTaskManagerId) {
-        final FlinkExpectedException cause =
-                new FlinkExpectedException("TaskManager exceeded the idle timeout.");
-        resourceAllocator.releaseResource(timedOutTaskManagerId, cause);
+        if (resourceAllocator.isSupported()) {
+            final FlinkExpectedException cause =
+                    new FlinkExpectedException("TaskManager exceeded the idle timeout.");
+            resourceAllocator.releaseResource(timedOutTaskManagerId, cause);
+        }
     }
 
     private boolean allocateResource(PendingTaskManager pendingTaskManager) {
+        if (!resourceAllocator.isSupported()) {
+            // resource cannot be allocated
+            return false;
+        }
+
         if (isMaxTotalResourceExceededAfterAdding(pendingTaskManager.getTotalResourceProfile())) {
             LOG.info(
                     "Could not allocate {}. Max total resource limitation <{}, {}> is reached.",
@@ -750,13 +760,10 @@ public class FineGrainedSlotManager implements SlotManager {
             return false;
         }
 
-        if (!resourceAllocator.allocateResource(
+        resourceAllocator.allocateResource(
                 WorkerResourceSpec.fromTotalResourceProfile(
                         pendingTaskManager.getTotalResourceProfile(),
-                        pendingTaskManager.getNumSlots()))) {
-            // resource cannot be allocated
-            return false;
-        }
+                        pendingTaskManager.getNumSlots()));
 
         taskManagerTracker.addPendingTaskManager(pendingTaskManager);
         return true;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/NonSupportedResourceAllocatorImpl.java
similarity index 53%
copy from flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/NonSupportedResourceAllocatorImpl.java
index 83207ce41cd..52c22ec4ad8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/NonSupportedResourceAllocatorImpl.java
@@ -18,35 +18,33 @@
 
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
 
-import javax.annotation.Nonnull;
+/** ResourceAllocator that not support to allocate/release resources. */
+public class NonSupportedResourceAllocatorImpl implements ResourceAllocator {
+    public static final NonSupportedResourceAllocatorImpl INSTANCE =
+            new NonSupportedResourceAllocatorImpl();
 
-import java.util.function.BiConsumer;
-import java.util.function.Function;
+    private NonSupportedResourceAllocatorImpl() {}
 
-/** Testing implementation of the {@link ResourceAllocator}. */
-public class TestingResourceAllocator implements ResourceAllocator {
-
-    @Nonnull private final BiConsumer<InstanceID, Exception> releaseResourceConsumer;
-
-    @Nonnull private final Function<WorkerResourceSpec, Boolean> allocateResourceFunction;
-
-    public TestingResourceAllocator(
-            @Nonnull BiConsumer<InstanceID, Exception> releaseResourceConsumer,
-            @Nonnull Function<WorkerResourceSpec, Boolean> allocateResourceFunction) {
-        this.releaseResourceConsumer = releaseResourceConsumer;
-        this.allocateResourceFunction = allocateResourceFunction;
+    @Override
+    public boolean isSupported() {
+        return false;
     }
 
     @Override
     public void releaseResource(InstanceID instanceId, Exception cause) {
-        releaseResourceConsumer.accept(instanceId, cause);
+        throw new UnsupportedOperationException();
     }
 
     @Override
-    public boolean allocateResource(WorkerResourceSpec workerResourceSpec) {
-        return allocateResourceFunction.apply(workerResourceSpec);
+    public void releaseResource(ResourceID resourceID) {
+        throw new UnsupportedOperationException();
+    }
+
+    public void allocateResource(WorkerResourceSpec workerResourceSpec) {
+        throw new UnsupportedOperationException();
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocator.java
index 179a2ad5ae1..071242f419f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocator.java
@@ -18,12 +18,16 @@
 
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
 
 /** Resource related actions which the {@link SlotManager} can perform. */
 public interface ResourceAllocator {
 
+    /** Whether allocate/release resources are supported. */
+    boolean isSupported();
+
     /**
      * Releases the resource with the given instance id.
      *
@@ -32,11 +36,17 @@ public interface ResourceAllocator {
      */
     void releaseResource(InstanceID instanceId, Exception cause);
 
+    /**
+     * Releases the resource with the given resource id.
+     *
+     * @param resourceID identifying which resource to release
+     */
+    void releaseResource(ResourceID resourceID);
+
     /**
      * Requests to allocate a resource with the given {@link WorkerResourceSpec}.
      *
      * @param workerResourceSpec for the to be allocated worker
-     * @return whether the resource can be allocated
      */
-    boolean allocateResource(WorkerResourceSpec workerResourceSpec);
+    void allocateResource(WorkerResourceSpec workerResourceSpec);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
index d1515a45eea..6755342938b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
@@ -153,7 +153,8 @@ class TaskExecutorManager implements AutoCloseable {
             SlotReport initialSlotReport,
             ResourceProfile totalResourceProfile,
             ResourceProfile defaultSlotResourceProfile) {
-        if (isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {
+        if (resourceAllocator.isSupported()
+                && isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {
             LOG.info(
                     "The total number of slots exceeds the max limitation {}, releasing the excess task executor.",
                     maxSlotNum);
@@ -254,6 +255,11 @@ class TaskExecutorManager implements AutoCloseable {
      */
     public Optional<ResourceRequirement> allocateWorker(
             ResourceProfile requestedSlotResourceProfile) {
+        if (!resourceAllocator.isSupported()) {
+            // resource cannot be allocated
+            return Optional.empty();
+        }
+
         final int numRegisteredSlots = getNumberRegisteredSlots();
         final int numPendingSlots = getNumberPendingTaskManagerSlots();
         if (isMaxSlotNumExceededAfterAdding(numSlotsPerWorker)) {
@@ -270,10 +276,7 @@ class TaskExecutorManager implements AutoCloseable {
             return Optional.empty();
         }
 
-        if (!resourceAllocator.allocateResource(defaultWorkerResourceSpec)) {
-            // resource cannot be allocated
-            return Optional.empty();
-        }
+        resourceAllocator.allocateResource(defaultWorkerResourceSpec);
 
         for (int i = 0; i < numSlotsPerWorker; ++i) {
             PendingTaskManagerSlot pendingTaskManagerSlot =
@@ -401,12 +404,14 @@ class TaskExecutorManager implements AutoCloseable {
     }
 
     private void releaseIdleTaskExecutor(InstanceID timedOutTaskManagerId) {
-        final FlinkExpectedException cause =
-                new FlinkExpectedException("TaskExecutor exceeded the idle timeout.");
-        LOG.debug(
-                "Release TaskExecutor {} because it exceeded the idle timeout.",
-                timedOutTaskManagerId);
-        resourceAllocator.releaseResource(timedOutTaskManagerId, cause);
+        if (resourceAllocator.isSupported()) {
+            final FlinkExpectedException cause =
+                    new FlinkExpectedException("TaskExecutor exceeded the idle timeout.");
+            LOG.debug(
+                    "Release TaskExecutor {} because it exceeded the idle timeout.",
+                    timedOutTaskManagerId);
+            resourceAllocator.releaseResource(timedOutTaskManagerId, cause);
+        }
     }
 
     // ---------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index 05087e8b148..a4b1131ff15 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -84,7 +84,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
-import java.util.function.Function;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -491,12 +490,7 @@ class ResourceManagerTest {
         rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway);
 
         runHeartbeatTimeoutTest(
-                builder ->
-                        builder.withStopWorkerFunction(
-                                (worker) -> {
-                                    stopWorkerFuture.complete(worker);
-                                    return true;
-                                }),
+                builder -> builder.withStopWorkerConsumer(stopWorkerFuture::complete),
                 resourceManagerGateway ->
                         registerTaskExecutor(
                                 resourceManagerGateway,
@@ -538,12 +532,7 @@ class ResourceManagerTest {
         rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway);
 
         runHeartbeatTargetBecomesUnreachableTest(
-                builder ->
-                        builder.withStopWorkerFunction(
-                                (worker) -> {
-                                    stopWorkerFuture.complete(worker);
-                                    return true;
-                                }),
+                builder -> builder.withStopWorkerConsumer(stopWorkerFuture::complete),
                 resourceManagerGateway ->
                         registerTaskExecutor(
                                 resourceManagerGateway,
@@ -579,7 +568,7 @@ class ResourceManagerTest {
 
         resourceManager =
                 new ResourceManagerBuilder()
-                        .withStopWorkerFunction(stopWorkerFuture::complete)
+                        .withStopWorkerConsumer(stopWorkerFuture::complete)
                         .buildAndStart();
 
         registerTaskExecutor(resourceManager, taskExecutorId, taskExecutorGateway.getAddress());
@@ -884,7 +873,7 @@ class ResourceManagerTest {
         private SlotManager slotManager = null;
         private BlocklistHandler.Factory blocklistHandlerFactory =
                 new NoOpBlocklistHandler.Factory();
-        private Function<ResourceID, Boolean> stopWorkerFunction = null;
+        private Consumer<ResourceID> stopWorkerConsumer = null;
         private CompletableFuture<Void> readyToServeFuture =
                 CompletableFuture.completedFuture(null);
 
@@ -910,9 +899,9 @@ class ResourceManagerTest {
             return this;
         }
 
-        private ResourceManagerBuilder withStopWorkerFunction(
-                Function<ResourceID, Boolean> stopWorkerFunction) {
-            this.stopWorkerFunction = stopWorkerFunction;
+        private ResourceManagerBuilder withStopWorkerConsumer(
+                Consumer<ResourceID> stopWorkerConsumer) {
+            this.stopWorkerConsumer = stopWorkerConsumer;
             return this;
         }
 
@@ -941,8 +930,8 @@ class ResourceManagerTest {
                                 .build();
             }
 
-            if (stopWorkerFunction == null) {
-                stopWorkerFunction = (ignore) -> false;
+            if (stopWorkerConsumer == null) {
+                stopWorkerConsumer = (ignore) -> {};
             }
 
             resourceManagerId = ResourceManagerId.generate();
@@ -959,7 +948,7 @@ class ResourceManagerTest {
                             jobLeaderIdService,
                             testingFatalErrorHandler,
                             UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup(),
-                            stopWorkerFunction,
+                            stopWorkerConsumer,
                             readyToServeFuture);
 
             resourceManager.start();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
index f73d4df3c0a..4289de57636 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
@@ -27,6 +27,8 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerFactory;
 import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.NonSupportedResourceAllocatorImpl;
+import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocator;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -36,16 +38,17 @@ import org.apache.flink.util.TimeUtils;
 
 import javax.annotation.Nullable;
 
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ForkJoinPool;
-import java.util.function.Function;
+import java.util.function.Consumer;
 
 /** Simple {@link ResourceManager} implementation for testing purposes. */
 public class TestingResourceManager extends ResourceManager<ResourceID> {
 
-    private final Function<ResourceID, Boolean> stopWorkerFunction;
+    private final Consumer<ResourceID> stopWorkerConsumer;
     private final CompletableFuture<Void> readyToServeFuture;
 
     public TestingResourceManager(
@@ -60,7 +63,7 @@ public class TestingResourceManager extends ResourceManager<ResourceID> {
             JobLeaderIdService jobLeaderIdService,
             FatalErrorHandler fatalErrorHandler,
             ResourceManagerMetricGroup resourceManagerMetricGroup,
-            Function<ResourceID, Boolean> stopWorkerFunction,
+            Consumer<ResourceID> stopWorkerConsumer,
             CompletableFuture<Void> readyToServeFuture) {
         super(
                 rpcService,
@@ -78,7 +81,7 @@ public class TestingResourceManager extends ResourceManager<ResourceID> {
                 RpcUtils.INF_TIMEOUT,
                 ForkJoinPool.commonPool());
 
-        this.stopWorkerFunction = stopWorkerFunction;
+        this.stopWorkerConsumer = stopWorkerConsumer;
         this.readyToServeFuture = readyToServeFuture;
     }
 
@@ -100,23 +103,23 @@ public class TestingResourceManager extends ResourceManager<ResourceID> {
     }
 
     @Override
-    public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) {
-        return false;
+    protected Optional<ResourceID> getWorkerNodeIfAcceptRegistration(ResourceID resourceID) {
+        return Optional.of(resourceID);
     }
 
     @Override
-    protected ResourceID workerStarted(ResourceID resourceID) {
-        return resourceID;
+    public void stopWorkerIfSupported(ResourceID worker) {
+        stopWorkerConsumer.accept(worker);
     }
 
     @Override
-    public boolean stopWorker(ResourceID worker) {
-        return stopWorkerFunction.apply(worker);
+    public CompletableFuture<Void> getReadyToServeFuture() {
+        return readyToServeFuture;
     }
 
     @Override
-    public CompletableFuture<Void> getReadyToServeFuture() {
-        return readyToServeFuture;
+    protected ResourceAllocator getResourceAllocator() {
+        return NonSupportedResourceAllocatorImpl.INSTANCE;
     }
 
     public <T> CompletableFuture<T> runInMainThread(Callable<T> callable, Time timeout) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
index 181bc8c9618..67bc11a4266 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
@@ -30,6 +30,8 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerFactory;
 import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerImpl;
 import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.slotmanager.NonSupportedResourceAllocatorImpl;
+import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocator;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -39,6 +41,7 @@ import org.apache.flink.util.function.TriConsumer;
 
 import javax.annotation.Nullable;
 
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -219,17 +222,7 @@ public class TestingResourceManagerFactory extends ResourceManagerFactory<Resour
         }
 
         @Override
-        public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        protected ResourceID workerStarted(ResourceID resourceID) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public boolean stopWorker(ResourceID worker) {
+        protected Optional<ResourceID> getWorkerNodeIfAcceptRegistration(ResourceID resourceID) {
             throw new UnsupportedOperationException();
         }
 
@@ -243,5 +236,10 @@ public class TestingResourceManagerFactory extends ResourceManagerFactory<Resour
         public CompletableFuture<Void> getReadyToServeFuture() {
             return CompletableFuture.completedFuture(null);
         }
+
+        @Override
+        protected ResourceAllocator getResourceAllocator() {
+            return NonSupportedResourceAllocatorImpl.INSTANCE;
+        }
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
index 764147d405b..b685d57f7c4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
@@ -107,18 +107,17 @@ public class ActiveResourceManagerTest extends TestLogger {
                 runTest(
                         () -> {
                             // received worker request, verify requesting from driver
-                            CompletableFuture<Boolean> startNewWorkerFuture =
+                            CompletableFuture<Void> startNewWorkerFuture =
                                     runInMainThread(
                                             () ->
                                                     getResourceManager()
-                                                            .startNewWorker(WORKER_RESOURCE_SPEC));
+                                                            .requestNewWorker(
+                                                                    WORKER_RESOURCE_SPEC));
                             TaskExecutorProcessSpec taskExecutorProcessSpec =
                                     requestWorkerFromDriverFuture.get(
                                             TIMEOUT_SEC, TimeUnit.SECONDS);
 
-                            assertThat(
-                                    startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    is(true));
+                            startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
                             assertThat(
                                     taskExecutorProcessSpec,
                                     is(
@@ -171,19 +170,18 @@ public class ActiveResourceManagerTest extends TestLogger {
                 runTest(
                         () -> {
                             // received worker request, verify requesting from driver
-                            CompletableFuture<Boolean> startNewWorkerFuture =
+                            CompletableFuture<Void> startNewWorkerFuture =
                                     runInMainThread(
                                             () ->
                                                     getResourceManager()
-                                                            .startNewWorker(WORKER_RESOURCE_SPEC));
+                                                            .requestNewWorker(
+                                                                    WORKER_RESOURCE_SPEC));
                             TaskExecutorProcessSpec taskExecutorProcessSpec1 =
                                     requestWorkerFromDriverFutures
                                             .get(0)
                                             .get(TIMEOUT_SEC, TimeUnit.SECONDS);
 
-                            assertThat(
-                                    startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    is(true));
+                            startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
                             assertThat(
                                     taskExecutorProcessSpec1,
                                     is(
@@ -250,19 +248,18 @@ public class ActiveResourceManagerTest extends TestLogger {
                 runTest(
                         () -> {
                             // received worker request, verify requesting from driver
-                            CompletableFuture<Boolean> startNewWorkerFuture =
+                            CompletableFuture<Void> startNewWorkerFuture =
                                     runInMainThread(
                                             () ->
                                                     getResourceManager()
-                                                            .startNewWorker(WORKER_RESOURCE_SPEC));
+                                                            .requestNewWorker(
+                                                                    WORKER_RESOURCE_SPEC));
                             TaskExecutorProcessSpec taskExecutorProcessSpec1 =
                                     requestWorkerFromDriverFutures
                                             .get(0)
                                             .get(TIMEOUT_SEC, TimeUnit.SECONDS);
 
-                            assertThat(
-                                    startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    is(true));
+                            startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
                             assertThat(
                                     taskExecutorProcessSpec1,
                                     is(
@@ -329,19 +326,18 @@ public class ActiveResourceManagerTest extends TestLogger {
                 runTest(
                         () -> {
                             // received worker request, verify requesting from driver
-                            CompletableFuture<Boolean> startNewWorkerFuture =
+                            CompletableFuture<Void> startNewWorkerFuture =
                                     runInMainThread(
                                             () ->
                                                     getResourceManager()
-                                                            .startNewWorker(WORKER_RESOURCE_SPEC));
+                                                            .requestNewWorker(
+                                                                    WORKER_RESOURCE_SPEC));
                             TaskExecutorProcessSpec taskExecutorProcessSpec1 =
                                     requestWorkerFromDriverFutures
                                             .get(0)
                                             .get(TIMEOUT_SEC, TimeUnit.SECONDS);
 
-                            assertThat(
-                                    startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    is(true));
+                            startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
                             assertThat(
                                     taskExecutorProcessSpec1,
                                     is(
@@ -408,19 +404,18 @@ public class ActiveResourceManagerTest extends TestLogger {
                 runTest(
                         () -> {
                             // received worker request, verify requesting from driver
-                            CompletableFuture<Boolean> startNewWorkerFuture =
+                            CompletableFuture<Void> startNewWorkerFuture =
                                     runInMainThread(
                                             () ->
                                                     getResourceManager()
-                                                            .startNewWorker(WORKER_RESOURCE_SPEC));
+                                                            .requestNewWorker(
+                                                                    WORKER_RESOURCE_SPEC));
                             TaskExecutorProcessSpec taskExecutorProcessSpec =
                                     requestWorkerFromDriverFutures
                                             .get(0)
                                             .get(TIMEOUT_SEC, TimeUnit.SECONDS);
 
-                            assertThat(
-                                    startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    is(true));
+                            startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
                             assertThat(
                                     taskExecutorProcessSpec,
                                     is(
@@ -482,7 +477,7 @@ public class ActiveResourceManagerTest extends TestLogger {
                             runInMainThread(
                                             () ->
                                                     getResourceManager()
-                                                            .startNewWorker(WORKER_RESOURCE_SPEC))
+                                                            .requestNewWorker(WORKER_RESOURCE_SPEC))
                                     .thenCompose(
                                             (ignore) ->
                                                     registerTaskExecutor(
@@ -539,18 +534,18 @@ public class ActiveResourceManagerTest extends TestLogger {
                 runTest(
                         () -> {
                             // received worker request, verify requesting from driver
-                            CompletableFuture<Boolean> startNewWorkerFuture =
+                            CompletableFuture<Void> startNewWorkerFuture =
                                     runInMainThread(
                                             () ->
                                                     getResourceManager()
-                                                            .startNewWorker(WORKER_RESOURCE_SPEC));
+                                                            .requestNewWorker(
+                                                                    WORKER_RESOURCE_SPEC));
                             long t1 =
                                     requestWorkerFromDriverFutures
                                             .get(0)
                                             .get(TIMEOUT_SEC, TimeUnit.SECONDS);
-                            assertThat(
-                                    startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    is(true));
+
+                            startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
 
                             // first worker failed before register, verify requesting another worker
                             // from driver
@@ -619,14 +614,14 @@ public class ActiveResourceManagerTest extends TestLogger {
                 runTest(
                         () -> {
                             // received worker request, verify requesting from driver
-                            CompletableFuture<Boolean> startNewWorkerFuture =
+                            CompletableFuture<Void> startNewWorkerFuture =
                                     runInMainThread(
                                             () ->
                                                     getResourceManager()
-                                                            .startNewWorker(WORKER_RESOURCE_SPEC));
-                            assertThat(
-                                    startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    is(true));
+                                                            .requestNewWorker(
+                                                                    WORKER_RESOURCE_SPEC));
+
+                            startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
 
                             long t1 =
                                     requestWorkerFromDriverFutures
@@ -745,7 +740,7 @@ public class ActiveResourceManagerTest extends TestLogger {
                             runInMainThread(
                                     () ->
                                             getResourceManager()
-                                                    .startNewWorker(WORKER_RESOURCE_SPEC));
+                                                    .requestNewWorker(WORKER_RESOURCE_SPEC));
 
                             // verify worker is released due to not registered in time
                             assertThat(
@@ -781,7 +776,7 @@ public class ActiveResourceManagerTest extends TestLogger {
                             runInMainThread(
                                     () ->
                                             getResourceManager()
-                                                    .startNewWorker(WORKER_RESOURCE_SPEC));
+                                                    .requestNewWorker(WORKER_RESOURCE_SPEC));
 
                             // resource allocation takes longer than worker registration timeout
                             try {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
index 684c4145cea..83a55c69857 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
@@ -258,12 +258,8 @@ class DeclarativeSlotManagerTest {
     void testRequirementDeclarationWithResourceAllocationFailure() throws Exception {
         final ResourceRequirements resourceRequirements = createResourceRequirementsForSingleSlot();
 
-        ResourceAllocator resourceAllocator =
-                new TestingResourceAllocatorBuilder()
-                        .setAllocateResourceFunction(value -> false)
-                        .build();
-
         final ResourceTracker resourceTracker = new DefaultResourceTracker();
+        final ResourceAllocator resourceAllocator = NonSupportedResourceAllocatorImpl.INSTANCE;
 
         try (DeclarativeSlotManager slotManager =
                 createDeclarativeSlotManagerBuilder()
@@ -1076,11 +1072,7 @@ class DeclarativeSlotManagerTest {
         final AtomicInteger resourceRequests = new AtomicInteger(0);
         final TestingResourceAllocator testingResourceAllocator =
                 new TestingResourceAllocatorBuilder()
-                        .setAllocateResourceFunction(
-                                ignored -> {
-                                    resourceRequests.incrementAndGet();
-                                    return true;
-                                })
+                        .setAllocateResourceConsumer(ignored -> resourceRequests.incrementAndGet())
                         .build();
 
         try (final DeclarativeSlotManager slotManager =
@@ -1191,10 +1183,6 @@ class DeclarativeSlotManagerTest {
 
         List<Tuple2<JobID, Collection<ResourceRequirement>>> notEnoughResourceNotifications =
                 new ArrayList<>();
-        ResourceAllocator resourceAllocator =
-                new TestingResourceAllocatorBuilder()
-                        .setAllocateResourceFunction(ignored -> false)
-                        .build();
 
         ResourceEventListener resourceEventListener =
                 new TestingResourceEventListenerBuilder()
@@ -1204,6 +1192,8 @@ class DeclarativeSlotManagerTest {
                                                 Tuple2.of(jobId1, acquiredResources)))
                         .build();
 
+        ResourceAllocator resourceAllocator = NonSupportedResourceAllocatorImpl.INSTANCE;
+
         try (DeclarativeSlotManager slotManager =
                 createDeclarativeSlotManagerBuilder()
                         .buildAndStart(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
index 73e0355bd31..4d208f5290a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
@@ -52,11 +52,8 @@ class FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase
 
         new Context() {
             {
-                resourceAllocatorBuilder.setAllocateResourceFunction(
-                        ignored -> {
-                            resourceRequests.incrementAndGet();
-                            return true;
-                        });
+                resourceAllocatorBuilder.setAllocateResourceConsumer(
+                        ignored -> resourceRequests.incrementAndGet());
                 runTest(
                         () -> {
                             runInMainThread(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
index ed01b63b4c7..848647b12dc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
@@ -250,11 +250,7 @@ public class TaskExecutorManagerTest extends TestLogger {
         final AtomicInteger resourceRequests = new AtomicInteger(0);
         ResourceAllocator resourceAllocator =
                 createResourceAllocatorBuilder()
-                        .setAllocateResourceFunction(
-                                ignored -> {
-                                    resourceRequests.incrementAndGet();
-                                    return true;
-                                })
+                        .setAllocateResourceConsumer(ignored -> resourceRequests.incrementAndGet())
                         .build();
 
         try (final TaskExecutorManager taskExecutorManager =
@@ -283,11 +279,7 @@ public class TaskExecutorManagerTest extends TestLogger {
         final AtomicInteger resourceRequests = new AtomicInteger(0);
         ResourceAllocator resourceAllocator =
                 createResourceAllocatorBuilder()
-                        .setAllocateResourceFunction(
-                                ignored -> {
-                                    resourceRequests.incrementAndGet();
-                                    return true;
-                                })
+                        .setAllocateResourceConsumer(ignored -> resourceRequests.incrementAndGet())
                         .build();
 
         try (final TaskExecutorManager taskExecutorManager =
@@ -382,7 +374,7 @@ public class TaskExecutorManagerTest extends TestLogger {
     private static TestingResourceAllocatorBuilder createResourceAllocatorBuilder() {
         return new TestingResourceAllocatorBuilder()
                 // ensures we do something when excess resource are requested
-                .setAllocateResourceFunction(ignored -> true);
+                .setAllocateResourceConsumer(ignored -> {});
     }
 
     private static InstanceID createAndRegisterTaskExecutor(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java
index 83207ce41cd..c375fba4d83 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java
@@ -18,26 +18,32 @@
 
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
 
 import javax.annotation.Nonnull;
 
 import java.util.function.BiConsumer;
-import java.util.function.Function;
+import java.util.function.Consumer;
 
 /** Testing implementation of the {@link ResourceAllocator}. */
 public class TestingResourceAllocator implements ResourceAllocator {
 
     @Nonnull private final BiConsumer<InstanceID, Exception> releaseResourceConsumer;
 
-    @Nonnull private final Function<WorkerResourceSpec, Boolean> allocateResourceFunction;
+    @Nonnull private final Consumer<WorkerResourceSpec> allocateResourceConsumer;
 
     public TestingResourceAllocator(
             @Nonnull BiConsumer<InstanceID, Exception> releaseResourceConsumer,
-            @Nonnull Function<WorkerResourceSpec, Boolean> allocateResourceFunction) {
+            @Nonnull Consumer<WorkerResourceSpec> allocateResourceConsumer) {
         this.releaseResourceConsumer = releaseResourceConsumer;
-        this.allocateResourceFunction = allocateResourceFunction;
+        this.allocateResourceConsumer = allocateResourceConsumer;
+    }
+
+    @Override
+    public boolean isSupported() {
+        return true;
     }
 
     @Override
@@ -46,7 +52,12 @@ public class TestingResourceAllocator implements ResourceAllocator {
     }
 
     @Override
-    public boolean allocateResource(WorkerResourceSpec workerResourceSpec) {
-        return allocateResourceFunction.apply(workerResourceSpec);
+    public void releaseResource(ResourceID resourceID) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void allocateResource(WorkerResourceSpec workerResourceSpec) {
+        allocateResourceConsumer.accept(workerResourceSpec);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocatorBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocatorBuilder.java
index 8dc5abd2a81..795da0c2d69 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocatorBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocatorBuilder.java
@@ -23,12 +23,11 @@ import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
 
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
-import java.util.function.Function;
 
 /** Builder for the {@link TestingResourceAllocator}. */
 public class TestingResourceAllocatorBuilder {
     private BiConsumer<InstanceID, Exception> releaseResourceConsumer = (ignoredA, ignoredB) -> {};
-    private Function<WorkerResourceSpec, Boolean> allocateResourceFunction = (ignored) -> true;
+    private Consumer<WorkerResourceSpec> allocateResourceConsumer = (ignored) -> {};
 
     public TestingResourceAllocatorBuilder setReleaseResourceConsumer(
             BiConsumer<InstanceID, Exception> releaseResourceConsumer) {
@@ -36,23 +35,13 @@ public class TestingResourceAllocatorBuilder {
         return this;
     }
 
-    public TestingResourceAllocatorBuilder setAllocateResourceFunction(
-            Function<WorkerResourceSpec, Boolean> allocateResourceFunction) {
-        this.allocateResourceFunction = allocateResourceFunction;
-        return this;
-    }
-
     public TestingResourceAllocatorBuilder setAllocateResourceConsumer(
             Consumer<WorkerResourceSpec> allocateResourceConsumer) {
-        this.allocateResourceFunction =
-                workerRequest -> {
-                    allocateResourceConsumer.accept(workerRequest);
-                    return true;
-                };
+        this.allocateResourceConsumer = allocateResourceConsumer;
         return this;
     }
 
     public TestingResourceAllocator build() {
-        return new TestingResourceAllocator(releaseResourceConsumer, allocateResourceFunction);
+        return new TestingResourceAllocator(releaseResourceConsumer, allocateResourceConsumer);
     }
 }