You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/12/06 11:23:45 UTC

[flink] 01/02: [FLINK-29870][runtime] Split ResourceActions to ResourceAllocator and ResourceEventListener.

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a0aa4813b9fe3db37cabe38fa47f1d1096c879dc
Author: Weihua Hu <hu...@gmail.com>
AuthorDate: Thu Nov 3 16:10:49 2022 +0800

    [FLINK-29870][runtime] Split ResourceActions to ResourceAllocator and ResourceEventListener.
---
 .../runtime/resourcemanager/ResourceManager.java   | 12 ++--
 .../slotmanager/DeclarativeSlotManager.java        | 25 +++----
 .../slotmanager/FineGrainedSlotManager.java        | 32 +++++----
 ...ResourceActions.java => ResourceAllocator.java} | 16 +----
 ...urceActions.java => ResourceEventListener.java} | 30 ++------
 .../resourcemanager/slotmanager/SlotManager.java   | 10 +--
 .../slotmanager/TaskExecutorManager.java           | 14 ++--
 .../AbstractFineGrainedSlotManagerITCase.java      | 12 ++--
 .../slotmanager/DeclarativeSlotManagerBuilder.java | 40 +++++++++--
 .../slotmanager/DeclarativeSlotManagerTest.java    | 83 ++++++++++++----------
 ...gerDefaultResourceAllocationStrategyITCase.java |  2 +-
 .../slotmanager/FineGrainedSlotManagerTest.java    | 12 ++--
 .../FineGrainedSlotManagerTestBase.java            | 10 ++-
 .../slotmanager/TaskExecutorManagerBuilder.java    |  8 +--
 .../slotmanager/TaskExecutorManagerTest.java       | 38 +++++-----
 ...eActions.java => TestingResourceAllocator.java} | 25 ++-----
 ...r.java => TestingResourceAllocatorBuilder.java} | 28 ++------
 .../slotmanager/TestingResourceEventListener.java  | 48 +++++++++++++
 .../TestingResourceEventListenerBuilder.java       | 41 +++++++++++
 .../slotmanager/TestingSlotManager.java            |  3 +-
 20 files changed, 284 insertions(+), 205 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 12684c25247..3abd6b6b68d 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -56,7 +56,8 @@ import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerExcept
 import org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException;
 import org.apache.flink.runtime.resourcemanager.registration.JobManagerRegistration;
 import org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration;
-import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions;
+import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocator;
+import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceEventListener;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rest.messages.LogInfo;
 import org.apache.flink.runtime.rest.messages.ThreadDumpInfo;
@@ -266,7 +267,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
             slotManager.start(
                     getFencingToken(),
                     getMainThreadExecutor(),
-                    new ResourceActionsImpl(),
+                    new ResourceAllocatorImpl(),
+                    new ResourceEventListenerImpl(),
                     blocklistHandler::isBlockedTaskManager);
 
             delegationTokenManager.start(this);
@@ -1334,7 +1336,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
         }
     }
 
-    private class ResourceActionsImpl implements ResourceActions {
+    private class ResourceAllocatorImpl implements ResourceAllocator {
 
         @Override
         public void releaseResource(InstanceID instanceId, Exception cause) {
@@ -1348,9 +1350,11 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
             validateRunsInMainThread();
             return startNewWorker(workerResourceSpec);
         }
+    }
 
+    private class ResourceEventListenerImpl implements ResourceEventListener {
         @Override
-        public void notifyNotEnoughResourcesAvailable(
+        public void notEnoughResourceAvailable(
                 JobID jobId, Collection<ResourceRequirement> acquiredResources) {
             validateRunsInMainThread();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
index 71898ae747d..f5e210de073 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
@@ -69,7 +69,7 @@ public class DeclarativeSlotManager implements SlotManager {
 
     private final SlotTracker slotTracker;
     private final ResourceTracker resourceTracker;
-    private final BiFunction<Executor, ResourceActions, TaskExecutorManager>
+    private final BiFunction<Executor, ResourceAllocator, TaskExecutorManager>
             taskExecutorManagerFactory;
     @Nullable private TaskExecutorManager taskExecutorManager;
 
@@ -97,8 +97,8 @@ public class DeclarativeSlotManager implements SlotManager {
     /** Executor for future callbacks which have to be "synchronized". */
     @Nullable private Executor mainThreadExecutor;
 
-    /** Callbacks for resource (de-)allocations. */
-    @Nullable private ResourceActions resourceActions;
+    /** Callbacks for resource not enough. */
+    @Nullable private ResourceEventListener resourceEventListener;
 
     /** The future of the requirements delay check. */
     @Nullable private CompletableFuture<Void> requirementsCheckFuture;
@@ -131,7 +131,7 @@ public class DeclarativeSlotManager implements SlotManager {
         slotMatchingStrategy = slotManagerConfiguration.getSlotMatchingStrategy();
 
         taskExecutorManagerFactory =
-                (executor, resourceActions) ->
+                (executor, resourceAllocator) ->
                         new TaskExecutorManager(
                                 slotManagerConfiguration.getDefaultWorkerResourceSpec(),
                                 slotManagerConfiguration.getNumSlotsPerWorker(),
@@ -141,10 +141,10 @@ public class DeclarativeSlotManager implements SlotManager {
                                 slotManagerConfiguration.getTaskManagerTimeout(),
                                 scheduledExecutor,
                                 executor,
-                                resourceActions);
+                                resourceAllocator);
 
         resourceManagerId = null;
-        resourceActions = null;
+        resourceEventListener = null;
         mainThreadExecutor = null;
         taskExecutorManager = null;
         blockedTaskManagerChecker = null;
@@ -199,22 +199,23 @@ public class DeclarativeSlotManager implements SlotManager {
      *
      * @param newResourceManagerId to use for communication with the task managers
      * @param newMainThreadExecutor to use to run code in the ResourceManager's main thread
-     * @param newResourceActions to use for resource (de-)allocations
+     * @param newResourceAllocator to use for resource (de-)allocations
      * @param newBlockedTaskManagerChecker to query whether a task manager is blocked
      */
     @Override
     public void start(
             ResourceManagerId newResourceManagerId,
             Executor newMainThreadExecutor,
-            ResourceActions newResourceActions,
+            ResourceAllocator newResourceAllocator,
+            ResourceEventListener newResourceEventListener,
             BlockedTaskManagerChecker newBlockedTaskManagerChecker) {
         LOG.debug("Starting the slot manager.");
 
         this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
         mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
-        resourceActions = Preconditions.checkNotNull(newResourceActions);
+        resourceEventListener = Preconditions.checkNotNull(newResourceEventListener);
         taskExecutorManager =
-                taskExecutorManagerFactory.apply(newMainThreadExecutor, newResourceActions);
+                taskExecutorManagerFactory.apply(newMainThreadExecutor, newResourceAllocator);
         blockedTaskManagerChecker = Preconditions.checkNotNull(newBlockedTaskManagerChecker);
 
         started = true;
@@ -253,7 +254,7 @@ public class DeclarativeSlotManager implements SlotManager {
 
         taskExecutorManager = null;
         resourceManagerId = null;
-        resourceActions = null;
+        resourceEventListener = null;
         blockedTaskManagerChecker = null;
         started = false;
     }
@@ -723,7 +724,7 @@ public class DeclarativeSlotManager implements SlotManager {
                                 "Could not fulfill resource requirements of job {}. Free slots: {}",
                                 jobId,
                                 slotTracker.getFreeSlots().size());
-                        resourceActions.notifyNotEnoughResourcesAvailable(
+                        resourceEventListener.notEnoughResourceAvailable(
                                 jobId, resourceTracker.getAcquiredResources(jobId));
                         return pendingSlots;
                     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
index 4833c7ad1a6..c999d9883de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
@@ -108,7 +108,10 @@ public class FineGrainedSlotManager implements SlotManager {
     @Nullable private Executor mainThreadExecutor;
 
     /** Callbacks for resource (de-)allocations. */
-    @Nullable private ResourceActions resourceActions;
+    @Nullable private ResourceAllocator resourceAllocator;
+
+    /** Callbacks for resource not enough. */
+    @Nullable private ResourceEventListener resourceEventListener;
 
     @Nullable private ScheduledFuture<?> taskManagerTimeoutsCheck;
 
@@ -149,7 +152,8 @@ public class FineGrainedSlotManager implements SlotManager {
         this.maxTotalMem = Preconditions.checkNotNull(slotManagerConfiguration.getMaxTotalMem());
 
         resourceManagerId = null;
-        resourceActions = null;
+        resourceAllocator = null;
+        resourceEventListener = null;
         mainThreadExecutor = null;
         taskManagerTimeoutsCheck = null;
         requirementsCheckFuture = null;
@@ -166,7 +170,7 @@ public class FineGrainedSlotManager implements SlotManager {
 
         if (failUnfulfillableRequest && !unfulfillableJobs.isEmpty()) {
             for (JobID jobId : unfulfillableJobs) {
-                resourceActions.notifyNotEnoughResourcesAvailable(
+                resourceEventListener.notEnoughResourceAvailable(
                         jobId, resourceTracker.getAcquiredResources(jobId));
             }
         }
@@ -186,20 +190,22 @@ public class FineGrainedSlotManager implements SlotManager {
      *
      * @param newResourceManagerId to use for communication with the task managers
      * @param newMainThreadExecutor to use to run code in the ResourceManager's main thread
-     * @param newResourceActions to use for resource (de-)allocations
+     * @param newResourceAllocator to use for resource (de-)allocations
      * @param newBlockedTaskManagerChecker to query whether a task manager is blocked
      */
     @Override
     public void start(
             ResourceManagerId newResourceManagerId,
             Executor newMainThreadExecutor,
-            ResourceActions newResourceActions,
+            ResourceAllocator newResourceAllocator,
+            ResourceEventListener newResourceEventListener,
             BlockedTaskManagerChecker newBlockedTaskManagerChecker) {
         LOG.info("Starting the slot manager.");
 
         resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
         mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
-        resourceActions = Preconditions.checkNotNull(newResourceActions);
+        resourceAllocator = Preconditions.checkNotNull(newResourceAllocator);
+        resourceEventListener = Preconditions.checkNotNull(newResourceEventListener);
         slotStatusSyncer.initialize(
                 taskManagerTracker, resourceTracker, resourceManagerId, mainThreadExecutor);
         blockedTaskManagerChecker = Preconditions.checkNotNull(newBlockedTaskManagerChecker);
@@ -246,7 +252,8 @@ public class FineGrainedSlotManager implements SlotManager {
 
         unfulfillableJobs.clear();
         resourceManagerId = null;
-        resourceActions = null;
+        resourceAllocator = null;
+        resourceEventListener = null;
         started = false;
     }
 
@@ -346,7 +353,7 @@ public class FineGrainedSlotManager implements SlotManager {
                         taskExecutorConnection.getResourceID(),
                         maxTotalCpu,
                         maxTotalMem.toHumanReadableString());
-                resourceActions.releaseResource(
+                resourceAllocator.releaseResource(
                         taskExecutorConnection.getInstanceID(),
                         new FlinkExpectedException(
                                 "The max total resource limitation is reached."));
@@ -571,7 +578,7 @@ public class FineGrainedSlotManager implements SlotManager {
         if (sendNotEnoughResourceNotifications) {
             for (JobID jobId : unfulfillableJobs) {
                 LOG.warn("Could not fulfill resource requirements of job {}.", jobId);
-                resourceActions.notifyNotEnoughResourcesAvailable(
+                resourceEventListener.notEnoughResourceAvailable(
                         jobId, resourceTracker.getAcquiredResources(jobId));
             }
         }
@@ -730,7 +737,7 @@ public class FineGrainedSlotManager implements SlotManager {
     private void releaseIdleTaskExecutor(InstanceID timedOutTaskManagerId) {
         final FlinkExpectedException cause =
                 new FlinkExpectedException("TaskManager exceeded the idle timeout.");
-        resourceActions.releaseResource(timedOutTaskManagerId, cause);
+        resourceAllocator.releaseResource(timedOutTaskManagerId, cause);
     }
 
     private boolean allocateResource(PendingTaskManager pendingTaskManager) {
@@ -743,7 +750,7 @@ public class FineGrainedSlotManager implements SlotManager {
             return false;
         }
 
-        if (!resourceActions.allocateResource(
+        if (!resourceAllocator.allocateResource(
                 WorkerResourceSpec.fromTotalResourceProfile(
                         pendingTaskManager.getTotalResourceProfile(),
                         pendingTaskManager.getNumSlots()))) {
@@ -771,7 +778,8 @@ public class FineGrainedSlotManager implements SlotManager {
         Preconditions.checkState(started, "The slot manager has not been started.");
         Preconditions.checkNotNull(resourceManagerId);
         Preconditions.checkNotNull(mainThreadExecutor);
-        Preconditions.checkNotNull(resourceActions);
+        Preconditions.checkNotNull(resourceAllocator);
+        Preconditions.checkNotNull(resourceEventListener);
     }
 
     private boolean isMaxTotalResourceExceededAfterAdding(ResourceProfile newResource) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocator.java
similarity index 73%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocator.java
index 639f8cb53fa..179a2ad5ae1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocator.java
@@ -18,15 +18,11 @@
 
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
-import org.apache.flink.runtime.slots.ResourceRequirement;
-
-import java.util.Collection;
 
 /** Resource related actions which the {@link SlotManager} can perform. */
-public interface ResourceActions {
+public interface ResourceAllocator {
 
     /**
      * Releases the resource with the given instance id.
@@ -43,14 +39,4 @@ public interface ResourceActions {
      * @return whether the resource can be allocated
      */
     boolean allocateResource(WorkerResourceSpec workerResourceSpec);
-
-    /**
-     * Notifies that not enough resources are available to fulfill the resource requirements of a
-     * job.
-     *
-     * @param jobId job for which not enough resources are available
-     * @param acquiredResources the resources that have been acquired for the job
-     */
-    void notifyNotEnoughResourcesAvailable(
-            JobID jobId, Collection<ResourceRequirement> acquiredResources);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceEventListener.java
similarity index 53%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceEventListener.java
index 639f8cb53fa..1106b378a18 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceEventListener.java
@@ -19,38 +19,16 @@
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 
 import java.util.Collection;
 
-/** Resource related actions which the {@link SlotManager} can perform. */
-public interface ResourceActions {
-
-    /**
-     * Releases the resource with the given instance id.
-     *
-     * @param instanceId identifying which resource to release
-     * @param cause why the resource is released
-     */
-    void releaseResource(InstanceID instanceId, Exception cause);
-
-    /**
-     * Requests to allocate a resource with the given {@link WorkerResourceSpec}.
-     *
-     * @param workerResourceSpec for the to be allocated worker
-     * @return whether the resource can be allocated
-     */
-    boolean allocateResource(WorkerResourceSpec workerResourceSpec);
-
+/** Listener for resource events of {@link SlotManager}. */
+@FunctionalInterface
+public interface ResourceEventListener {
     /**
-     * Notifies that not enough resources are available to fulfill the resource requirements of a
-     * job.
-     *
      * @param jobId job for which not enough resources are available
      * @param acquiredResources the resources that have been acquired for the job
      */
-    void notifyNotEnoughResourcesAvailable(
-            JobID jobId, Collection<ResourceRequirement> acquiredResources);
+    void notEnoughResourceAvailable(JobID jobId, Collection<ResourceRequirement> acquiredResources);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index cf706515b02..9ea06c1478b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -40,7 +40,7 @@ import java.util.concurrent.Executor;
  * their allocation and all pending slot requests. Whenever a new slot is registered or an allocated
  * slot is freed, then it tries to fulfill another pending slot request. Whenever there are not
  * enough slots available the slot manager will notify the resource manager about it via {@link
- * ResourceActions#allocateResource(WorkerResourceSpec)}.
+ * ResourceAllocator#allocateResource(WorkerResourceSpec)}.
  *
  * <p>In order to free resources and avoid resource leaks, idling task managers (task managers whose
  * slots are currently not used) and pending slot requests time out triggering their release and
@@ -56,7 +56,7 @@ public interface SlotManager extends AutoCloseable {
     int getNumberFreeSlotsOf(InstanceID instanceId);
 
     /**
-     * Get number of workers SlotManager requested from {@link ResourceActions} that are not yet
+     * Get number of workers SlotManager requested from {@link ResourceAllocator} that are not yet
      * fulfilled.
      *
      * @return a map whose key set is all the unique resource specs of the pending workers, and the
@@ -79,13 +79,15 @@ public interface SlotManager extends AutoCloseable {
      *
      * @param newResourceManagerId to use for communication with the task managers
      * @param newMainThreadExecutor to use to run code in the ResourceManager's main thread
-     * @param newResourceActions to use for resource (de-)allocations
+     * @param newResourceAllocator to use for resource (de-)allocations
+     * @param resourceEventListener to use for notify resource not enough
      * @param newBlockedTaskManagerChecker to query whether a task manager is blocked
      */
     void start(
             ResourceManagerId newResourceManagerId,
             Executor newMainThreadExecutor,
-            ResourceActions newResourceActions,
+            ResourceAllocator newResourceAllocator,
+            ResourceEventListener resourceEventListener,
             BlockedTaskManagerChecker newBlockedTaskManagerChecker);
 
     /** Suspends the component. This clears the internal state of the slot manager. */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
index 1df5a872ee4..d1515a45eea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
@@ -90,7 +90,7 @@ class TaskExecutorManager implements AutoCloseable {
     private final Time taskManagerTimeout;
 
     /** Callbacks for resource (de-)allocations. */
-    private final ResourceActions resourceActions;
+    private final ResourceAllocator resourceAllocator;
 
     /** All currently registered task managers. */
     private final Map<InstanceID, TaskManagerRegistration> taskManagerRegistrations =
@@ -111,7 +111,7 @@ class TaskExecutorManager implements AutoCloseable {
             Time taskManagerTimeout,
             ScheduledExecutor scheduledExecutor,
             Executor mainThreadExecutor,
-            ResourceActions resourceActions) {
+            ResourceAllocator resourceAllocator) {
 
         this.defaultWorkerResourceSpec = defaultWorkerResourceSpec;
         this.numSlotsPerWorker = numSlotsPerWorker;
@@ -123,7 +123,7 @@ class TaskExecutorManager implements AutoCloseable {
                 SlotManagerUtils.generateDefaultSlotResourceProfile(
                         defaultWorkerResourceSpec, numSlotsPerWorker);
 
-        this.resourceActions = Preconditions.checkNotNull(resourceActions);
+        this.resourceAllocator = Preconditions.checkNotNull(resourceAllocator);
         this.mainThreadExecutor = mainThreadExecutor;
         taskManagerTimeoutsAndRedundancyCheck =
                 scheduledExecutor.scheduleWithFixedDelay(
@@ -157,7 +157,7 @@ class TaskExecutorManager implements AutoCloseable {
             LOG.info(
                     "The total number of slots exceeds the max limitation {}, releasing the excess task executor.",
                     maxSlotNum);
-            resourceActions.releaseResource(
+            resourceAllocator.releaseResource(
                     taskExecutorConnection.getInstanceID(),
                     new FlinkExpectedException(
                             "The total number of slots exceeds the max limitation."));
@@ -270,7 +270,7 @@ class TaskExecutorManager implements AutoCloseable {
             return Optional.empty();
         }
 
-        if (!resourceActions.allocateResource(defaultWorkerResourceSpec)) {
+        if (!resourceAllocator.allocateResource(defaultWorkerResourceSpec)) {
             // resource cannot be allocated
             return Optional.empty();
         }
@@ -321,7 +321,7 @@ class TaskExecutorManager implements AutoCloseable {
                         >= taskManagerTimeout.toMilliseconds()) {
                     // we collect the instance ids first in order to avoid concurrent modifications
                     // by the
-                    // ResourceActions.releaseResource call
+                    // ResourceAllocator.releaseResource call
                     timedOutTaskManagers.add(taskManagerRegistration);
                 }
             }
@@ -406,7 +406,7 @@ class TaskExecutorManager implements AutoCloseable {
         LOG.debug(
                 "Release TaskExecutor {} because it exceeded the idle timeout.",
                 timedOutTaskManagerId);
-        resourceActions.releaseResource(timedOutTaskManagerId, cause);
+        resourceAllocator.releaseResource(timedOutTaskManagerId, cause);
     }
 
     // ---------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java
index fd86e99efb1..e472b718539 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java
@@ -67,7 +67,7 @@ abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSlotManag
                 new CompletableFuture<>();
         new Context() {
             {
-                resourceActionsBuilder.setAllocateResourceConsumer(
+                resourceAllocatorBuilder.setAllocateResourceConsumer(
                         allocateResourceFuture::complete);
                 runTest(
                         () -> {
@@ -224,7 +224,7 @@ abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSlotManag
         new Context() {
             {
                 setBlockedTaskManagerChecker(blockedTaskManager::equals);
-                resourceActionsBuilder.setAllocateResourceConsumer(
+                resourceAllocatorBuilder.setAllocateResourceConsumer(
                         allocateResourceFuture::complete);
                 runTest(
                         () -> {
@@ -259,7 +259,7 @@ abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSlotManag
 
         new Context() {
             {
-                resourceActionsBuilder.setAllocateResourceConsumer(
+                resourceAllocatorBuilder.setAllocateResourceConsumer(
                         ignored -> {
                             if (allocateResourceFutures.get(0).isDone()) {
                                 allocateResourceFutures.get(1).complete(null);
@@ -430,7 +430,7 @@ abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSlotManag
         final SlotReport slotReport = new SlotReport();
         new Context() {
             {
-                resourceActionsBuilder.setAllocateResourceConsumer(
+                resourceAllocatorBuilder.setAllocateResourceConsumer(
                         ignored -> allocateResourceFutures.complete(null));
                 runTest(
                         () -> {
@@ -481,7 +481,7 @@ abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSlotManag
         final SlotReport slotReport = new SlotReport();
         new Context() {
             {
-                resourceActionsBuilder.setAllocateResourceConsumer(
+                resourceAllocatorBuilder.setAllocateResourceConsumer(
                         ignored -> allocateResourceFutures.complete(null));
                 runTest(
                         () -> {
@@ -528,7 +528,7 @@ abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSlotManag
 
         new Context() {
             {
-                resourceActionsBuilder.setAllocateResourceConsumer(
+                resourceAllocatorBuilder.setAllocateResourceConsumer(
                         ignored -> {
                             if (allocateResourceFutures.get(0).isDone()) {
                                 allocateResourceFutures.get(1).complete(null);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java
index ee0b811350b..959f5250018 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java
@@ -170,30 +170,56 @@ public class DeclarativeSlotManagerBuilder {
 
     public DeclarativeSlotManager buildAndStartWithDirectExec() {
         return buildAndStartWithDirectExec(
-                ResourceManagerId.generate(), new TestingResourceActionsBuilder().build());
+                ResourceManagerId.generate(),
+                new TestingResourceAllocatorBuilder().build(),
+                new TestingResourceEventListenerBuilder().build());
     }
 
     public DeclarativeSlotManager buildAndStartWithDirectExec(
-            ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions) {
-        return buildAndStart(resourceManagerId, Executors.directExecutor(), resourceManagerActions);
+            ResourceManagerId resourceManagerId, ResourceAllocator resourceAllocator) {
+        return buildAndStartWithDirectExec(
+                resourceManagerId,
+                resourceAllocator,
+                new TestingResourceEventListenerBuilder().build());
+    }
+
+    public DeclarativeSlotManager buildAndStartWithDirectExec(
+            ResourceManagerId resourceManagerId,
+            ResourceAllocator resourceAllocator,
+            ResourceEventListener resourceEventListener) {
+        return buildAndStart(
+                resourceManagerId,
+                Executors.directExecutor(),
+                resourceAllocator,
+                resourceEventListener);
     }
 
     public DeclarativeSlotManager buildAndStart(
             ResourceManagerId resourceManagerId,
             Executor executor,
-            ResourceActions resourceManagerActions) {
+            ResourceAllocator resourceAllocator,
+            ResourceEventListener resourceEventListener) {
         return buildAndStart(
-                resourceManagerId, executor, resourceManagerActions, resourceID -> false);
+                resourceManagerId,
+                executor,
+                resourceAllocator,
+                resourceEventListener,
+                resourceID -> false);
     }
 
     public DeclarativeSlotManager buildAndStart(
             ResourceManagerId resourceManagerId,
             Executor executor,
-            ResourceActions resourceManagerActions,
+            ResourceAllocator resourceAllocator,
+            ResourceEventListener resourceEventListener,
             BlockedTaskManagerChecker blockedTaskManagerChecker) {
         final DeclarativeSlotManager slotManager = build();
         slotManager.start(
-                resourceManagerId, executor, resourceManagerActions, blockedTaskManagerChecker);
+                resourceManagerId,
+                executor,
+                resourceAllocator,
+                resourceEventListener,
+                blockedTaskManagerChecker);
         return slotManager;
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
index 6ae3cdeffba..684c4145cea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
@@ -194,13 +194,12 @@ class DeclarativeSlotManagerTest {
         final ResourceRequirements resourceRequirements = createResourceRequirementsForSingleSlot();
 
         CompletableFuture<WorkerResourceSpec> allocateResourceFuture = new CompletableFuture<>();
-        ResourceActions resourceManagerActions =
-                new TestingResourceActionsBuilder()
+        ResourceAllocator resourceAllocator =
+                new TestingResourceAllocatorBuilder()
                         .setAllocateResourceConsumer(allocateResourceFuture::complete)
                         .build();
 
-        try (SlotManager slotManager =
-                createSlotManager(resourceManagerId, resourceManagerActions)) {
+        try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceAllocator)) {
 
             slotManager.processResourceRequirements(resourceRequirements);
 
@@ -218,8 +217,8 @@ class DeclarativeSlotManagerTest {
         final ResourceRequirements resourceRequirements = createResourceRequirementsForSingleSlot();
 
         CompletableFuture<WorkerResourceSpec> allocateResourceFuture = new CompletableFuture<>();
-        ResourceActions resourceManagerActions =
-                new TestingResourceActionsBuilder()
+        ResourceAllocator resourceAllocator =
+                new TestingResourceAllocatorBuilder()
                         .setAllocateResourceConsumer(allocateResourceFuture::complete)
                         .build();
 
@@ -230,7 +229,8 @@ class DeclarativeSlotManagerTest {
                         .buildAndStart(
                                 resourceManagerId,
                                 Executors.directExecutor(),
-                                resourceManagerActions,
+                                resourceAllocator,
+                                new TestingResourceEventListenerBuilder().build(),
                                 blockedTaskManager::equals)) {
 
             final TaskExecutorGateway taskExecutorGateway =
@@ -258,8 +258,8 @@ class DeclarativeSlotManagerTest {
     void testRequirementDeclarationWithResourceAllocationFailure() throws Exception {
         final ResourceRequirements resourceRequirements = createResourceRequirementsForSingleSlot();
 
-        ResourceActions resourceManagerActions =
-                new TestingResourceActionsBuilder()
+        ResourceAllocator resourceAllocator =
+                new TestingResourceAllocatorBuilder()
                         .setAllocateResourceFunction(value -> false)
                         .build();
 
@@ -269,7 +269,7 @@ class DeclarativeSlotManagerTest {
                 createDeclarativeSlotManagerBuilder()
                         .setResourceTracker(resourceTracker)
                         .buildAndStartWithDirectExec(
-                                ResourceManagerId.generate(), resourceManagerActions)) {
+                                ResourceManagerId.generate(), resourceAllocator)) {
 
             slotManager.processResourceRequirements(resourceRequirements);
 
@@ -347,7 +347,7 @@ class DeclarativeSlotManagerTest {
                 createDeclarativeSlotManagerBuilder()
                         .setSlotTracker(slotTracker)
                         .buildAndStartWithDirectExec(
-                                resourceManagerId, new TestingResourceActionsBuilder().build())) {
+                                resourceManagerId, new TestingResourceAllocatorBuilder().build())) {
 
             if (scenario
                     == RequirementDeclarationScenario
@@ -432,8 +432,8 @@ class DeclarativeSlotManagerTest {
     void testDuplicateResourceRequirementDeclarationAfterSuccessfulAllocation() throws Exception {
         final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
         final AtomicInteger allocateResourceCalls = new AtomicInteger(0);
-        final ResourceActions resourceManagerActions =
-                new TestingResourceActionsBuilder()
+        final ResourceAllocator resourceAllocator =
+                new TestingResourceAllocatorBuilder()
                         .setAllocateResourceConsumer(
                                 ignored -> allocateResourceCalls.incrementAndGet())
                         .build();
@@ -454,7 +454,7 @@ class DeclarativeSlotManagerTest {
         try (DeclarativeSlotManager slotManager =
                 createDeclarativeSlotManagerBuilder()
                         .setSlotTracker(slotTracker)
-                        .buildAndStartWithDirectExec(resourceManagerId, resourceManagerActions)) {
+                        .buildAndStartWithDirectExec(resourceManagerId, resourceAllocator)) {
 
             slotManager.registerTaskManager(
                     taskManagerConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
@@ -549,14 +549,13 @@ class DeclarativeSlotManagerTest {
     @Test
     void testReceivingUnknownSlotReport() throws Exception {
         final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
-        final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
+        final ResourceAllocator resourceAllocator = new TestingResourceAllocatorBuilder().build();
 
         final InstanceID unknownInstanceID = new InstanceID();
         final SlotID unknownSlotId = new SlotID(ResourceID.generate(), 0);
         final SlotReport unknownSlotReport = new SlotReport(createFreeSlotStatus(unknownSlotId));
 
-        try (SlotManager slotManager =
-                createSlotManager(resourceManagerId, resourceManagerActions)) {
+        try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceAllocator)) {
             // check that we don't have any slots registered
             assertThat(slotManager.getNumberRegisteredSlots()).isEqualTo(0);
 
@@ -656,7 +655,8 @@ class DeclarativeSlotManagerTest {
                         .buildAndStart(
                                 ResourceManagerId.generate(),
                                 mainThreadExecutor,
-                                new TestingResourceActionsBuilder().build())) {
+                                new TestingResourceAllocatorBuilder().build(),
+                                new TestingResourceEventListenerBuilder().build())) {
 
             CompletableFuture.runAsync(
                             () ->
@@ -790,7 +790,8 @@ class DeclarativeSlotManagerTest {
                         .buildAndStart(
                                 ResourceManagerId.generate(),
                                 mainThreadExecutor,
-                                new TestingResourceActionsBuilder().build())) {
+                                new TestingResourceAllocatorBuilder().build(),
+                                new TestingResourceEventListenerBuilder().build())) {
 
             slotManager.registerTaskManager(
                     taskExecutorConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
@@ -1073,8 +1074,8 @@ class DeclarativeSlotManagerTest {
     void testRequestNewResources() throws Exception {
         final int numberSlots = 2;
         final AtomicInteger resourceRequests = new AtomicInteger(0);
-        final TestingResourceActions testingResourceActions =
-                new TestingResourceActionsBuilder()
+        final TestingResourceAllocator testingResourceAllocator =
+                new TestingResourceAllocatorBuilder()
                         .setAllocateResourceFunction(
                                 ignored -> {
                                     resourceRequests.incrementAndGet();
@@ -1084,7 +1085,7 @@ class DeclarativeSlotManagerTest {
 
         try (final DeclarativeSlotManager slotManager =
                 createSlotManager(
-                        ResourceManagerId.generate(), testingResourceActions, numberSlots)) {
+                        ResourceManagerId.generate(), testingResourceAllocator, numberSlots)) {
 
             final JobID jobId = new JobID();
 
@@ -1190,10 +1191,14 @@ class DeclarativeSlotManagerTest {
 
         List<Tuple2<JobID, Collection<ResourceRequirement>>> notEnoughResourceNotifications =
                 new ArrayList<>();
-        ResourceActions resourceManagerActions =
-                new TestingResourceActionsBuilder()
+        ResourceAllocator resourceAllocator =
+                new TestingResourceAllocatorBuilder()
                         .setAllocateResourceFunction(ignored -> false)
-                        .setNotEnoughResourcesConsumer(
+                        .build();
+
+        ResourceEventListener resourceEventListener =
+                new TestingResourceEventListenerBuilder()
+                        .setNotEnoughResourceAvailableConsumer(
                                 (jobId1, acquiredResources) ->
                                         notEnoughResourceNotifications.add(
                                                 Tuple2.of(jobId1, acquiredResources)))
@@ -1204,7 +1209,8 @@ class DeclarativeSlotManagerTest {
                         .buildAndStart(
                                 ResourceManagerId.generate(),
                                 new ManuallyTriggeredScheduledExecutor(),
-                                resourceManagerActions)) {
+                                resourceAllocator,
+                                resourceEventListener)) {
 
             if (withNotificationGracePeriod) {
                 // this should disable notifications
@@ -1271,7 +1277,8 @@ class DeclarativeSlotManagerTest {
                         .buildAndStart(
                                 ResourceManagerId.generate(),
                                 executor,
-                                new TestingResourceActionsBuilder().build())) {
+                                new TestingResourceAllocatorBuilder().build(),
+                                new TestingResourceEventListenerBuilder().build())) {
 
             JobID jobId = new JobID();
             slotManager.processResourceRequirements(createResourceRequirements(jobId, 1));
@@ -1318,7 +1325,8 @@ class DeclarativeSlotManagerTest {
                         .buildAndStart(
                                 ResourceManagerId.generate(),
                                 executor,
-                                new TestingResourceActionsBuilder().build())) {
+                                new TestingResourceAllocatorBuilder().build(),
+                                new TestingResourceEventListenerBuilder().build())) {
 
             JobID jobId = new JobID();
             slotManager.processResourceRequirements(createResourceRequirements(jobId, 1));
@@ -1372,7 +1380,8 @@ class DeclarativeSlotManagerTest {
                         .buildAndStart(
                                 ResourceManagerId.generate(),
                                 ComponentMainThreadExecutorServiceAdapter.forMainThread(),
-                                new TestingResourceActionsBuilder().build())) {
+                                new TestingResourceAllocatorBuilder().build(),
+                                new TestingResourceEventListenerBuilder().build())) {
 
             final TaskExecutorConnection taskExecutionConnection =
                     createTaskExecutorConnection(taskExecutorGateway);
@@ -1422,7 +1431,8 @@ class DeclarativeSlotManagerTest {
                         .buildAndStart(
                                 ResourceManagerId.generate(),
                                 ComponentMainThreadExecutorServiceAdapter.forMainThread(),
-                                new TestingResourceActionsBuilder().build())) {
+                                new TestingResourceAllocatorBuilder().build(),
+                                new TestingResourceEventListenerBuilder().build())) {
 
             final JobID jobId = new JobID();
 
@@ -1461,7 +1471,7 @@ class DeclarativeSlotManagerTest {
                         .setRequirementCheckDelay(delay)
                         .buildAndStartWithDirectExec(
                                 ResourceManagerId.generate(),
-                                new TestingResourceActionsBuilder()
+                                new TestingResourceAllocatorBuilder()
                                         .setAllocateResourceConsumer(
                                                 workerResourceSpec ->
                                                         allocatedResourceCounter.getAndIncrement())
@@ -1502,7 +1512,8 @@ class DeclarativeSlotManagerTest {
                         .buildAndStart(
                                 ResourceManagerId.generate(),
                                 ComponentMainThreadExecutorServiceAdapter.forMainThread(),
-                                new TestingResourceActionsBuilder().build())) {
+                                new TestingResourceAllocatorBuilder().build(),
+                                new TestingResourceEventListenerBuilder().build())) {
 
             final JobID jobId = new JobID();
 
@@ -1585,19 +1596,19 @@ class DeclarativeSlotManagerTest {
     }
 
     private DeclarativeSlotManager createSlotManager(
-            ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions) {
-        return createSlotManager(resourceManagerId, resourceManagerActions, 1);
+            ResourceManagerId resourceManagerId, ResourceAllocator resourceAllocator) {
+        return createSlotManager(resourceManagerId, resourceAllocator, 1);
     }
 
     private DeclarativeSlotManager createSlotManager(
             ResourceManagerId resourceManagerId,
-            ResourceActions resourceManagerActions,
+            ResourceAllocator resourceAllocator,
             int numSlotsPerWorker) {
         return createDeclarativeSlotManagerBuilder(
                         new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()))
                 .setNumSlotsPerWorker(numSlotsPerWorker)
                 .setRedundantTaskManagerNum(0)
-                .buildAndStartWithDirectExec(resourceManagerId, resourceManagerActions);
+                .buildAndStartWithDirectExec(resourceManagerId, resourceAllocator);
     }
 
     private static DeclarativeSlotManagerBuilder createDeclarativeSlotManagerBuilder() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
index d68320761df..73e0355bd31 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
@@ -52,7 +52,7 @@ class FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase
 
         new Context() {
             {
-                resourceActionsBuilder.setAllocateResourceFunction(
+                resourceAllocatorBuilder.setAllocateResourceFunction(
                         ignored -> {
                             resourceRequests.incrementAndGet();
                             return true;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
index bfc438c4316..0c85473d13a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
@@ -378,7 +378,7 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
         allocateResourceFutures.add(new CompletableFuture<>());
         new Context() {
             {
-                resourceActionsBuilder.setAllocateResourceConsumer(
+                resourceAllocatorBuilder.setAllocateResourceConsumer(
                         ignored -> {
                             if (allocateResourceFutures.get(0).isDone()) {
                                 allocateResourceFutures.get(1).complete(null);
@@ -445,7 +445,7 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
                                                 pendingTaskManager.getPendingTaskManagerId(),
                                                 DEFAULT_SLOT_RESOURCE_PROFILE)
                                         .build()));
-                resourceActionsBuilder.setAllocateResourceConsumer(
+                resourceAllocatorBuilder.setAllocateResourceConsumer(
                         ignored -> requestResourceFuture.complete(null));
                 runTest(
                         () -> {
@@ -496,7 +496,7 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
         final CompletableFuture<Void> notifyNotEnoughResourceFuture = new CompletableFuture<>();
         new Context() {
             {
-                resourceActionsBuilder.setNotEnoughResourcesConsumer(
+                resourceEventListenerBuilder.setNotEnoughResourceAvailableConsumer(
                         (jobId1, acquiredResources) -> {
                             notEnoughResourceNotifications.add(
                                     Tuple2.of(jobId1, acquiredResources));
@@ -638,7 +638,7 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
         final InstanceID instanceId = taskExecutionConnection.getInstanceID();
         new Context() {
             {
-                resourceActionsBuilder.setReleaseResourceConsumer(
+                resourceAllocatorBuilder.setReleaseResourceConsumer(
                         (instanceID, e) -> releaseResourceFuture.complete(instanceID));
                 slotManagerConfigurationBuilder.setTaskManagerTimeout(taskManagerTimeout);
                 runTest(
@@ -827,7 +827,7 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
             {
                 maxTotalResourceSetter.accept(slotManagerConfigurationBuilder);
 
-                resourceActionsBuilder.setAllocateResourceConsumer(
+                resourceAllocatorBuilder.setAllocateResourceConsumer(
                         ignored -> {
                             if (allocateResourceFutures.get(0).isDone()) {
                                 allocateResourceFutures.get(1).complete(null);
@@ -876,7 +876,7 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
             {
                 maxTotalResourceSetter.accept(slotManagerConfigurationBuilder);
 
-                resourceActionsBuilder.setReleaseResourceConsumer(
+                resourceAllocatorBuilder.setReleaseResourceConsumer(
                         (instanceId, ignore) -> releaseResourceFuture.complete(instanceId));
 
                 runTest(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
index 049ef97bc0e..f2e8e4f9d76 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
@@ -157,8 +157,11 @@ abstract class FineGrainedSlotManagerTestBase {
         final TestingResourceAllocationStrategy.Builder resourceAllocationStrategyBuilder =
                 TestingResourceAllocationStrategy.newBuilder();
 
-        final TestingResourceActionsBuilder resourceActionsBuilder =
-                new TestingResourceActionsBuilder();
+        final TestingResourceAllocatorBuilder resourceAllocatorBuilder =
+                new TestingResourceAllocatorBuilder();
+
+        final TestingResourceEventListenerBuilder resourceEventListenerBuilder =
+                new TestingResourceEventListenerBuilder();
         final SlotManagerConfigurationBuilder slotManagerConfigurationBuilder =
                 SlotManagerConfigurationBuilder.newBuilder();
 
@@ -221,7 +224,8 @@ abstract class FineGrainedSlotManagerTestBase {
                             slotManager.start(
                                     resourceManagerId,
                                     mainThreadExecutor,
-                                    resourceActionsBuilder.build(),
+                                    resourceAllocatorBuilder.build(),
+                                    resourceEventListenerBuilder.build(),
                                     blockedTaskManagerChecker));
 
             testMethod.run();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerBuilder.java
index 157e894aea1..88e37e1ba16 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerBuilder.java
@@ -35,7 +35,7 @@ public class TaskExecutorManagerBuilder {
     private Time taskManagerTimeout = Time.seconds(5);
     private final ScheduledExecutor scheduledExecutor;
     private Executor mainThreadExecutor = Executors.directExecutor();
-    private ResourceActions newResourceActions = new TestingResourceActionsBuilder().build();
+    private ResourceAllocator newResourceAllocator = new TestingResourceAllocatorBuilder().build();
 
     public TaskExecutorManagerBuilder(ScheduledExecutor scheduledExecutor) {
         this.scheduledExecutor = scheduledExecutor;
@@ -78,8 +78,8 @@ public class TaskExecutorManagerBuilder {
         return this;
     }
 
-    public TaskExecutorManagerBuilder setResourceActions(ResourceActions newResourceActions) {
-        this.newResourceActions = newResourceActions;
+    public TaskExecutorManagerBuilder setResourceAllocator(ResourceAllocator newResourceAllocator) {
+        this.newResourceAllocator = newResourceAllocator;
         return this;
     }
 
@@ -93,6 +93,6 @@ public class TaskExecutorManagerBuilder {
                 taskManagerTimeout,
                 scheduledExecutor,
                 mainThreadExecutor,
-                newResourceActions);
+                newResourceAllocator);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
index 28593e31f76..ed01b63b4c7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
@@ -133,7 +133,7 @@ public class TaskExecutorManagerTest extends TestLogger {
 
     /**
      * Tests that a task manager timeout does not remove the slots from the SlotManager. A timeout
-     * should only trigger the {@link ResourceActions#releaseResource(InstanceID, Exception)}
+     * should only trigger the {@link ResourceAllocator#releaseResource(InstanceID, Exception)}
      * callback. The receiver of the callback can then decide what to do with the TaskManager.
      *
      * <p>See FLINK-7793
@@ -143,8 +143,8 @@ public class TaskExecutorManagerTest extends TestLogger {
         final Time taskManagerTimeout = Time.milliseconds(10L);
 
         final CompletableFuture<InstanceID> releaseResourceFuture = new CompletableFuture<>();
-        final ResourceActions resourceActions =
-                createResourceActionsBuilder()
+        final ResourceAllocator resourceAllocator =
+                createResourceAllocatorBuilder()
                         .setReleaseResourceConsumer(
                                 (instanceId, ignored) -> releaseResourceFuture.complete(instanceId))
                         .build();
@@ -154,7 +154,7 @@ public class TaskExecutorManagerTest extends TestLogger {
         try (final TaskExecutorManager taskExecutorManager =
                 createTaskExecutorManagerBuilder()
                         .setTaskManagerTimeout(taskManagerTimeout)
-                        .setResourceActions(resourceActions)
+                        .setResourceAllocator(resourceAllocator)
                         .setMainThreadExecutor(mainThreadExecutor)
                         .createTaskExecutorManager()) {
 
@@ -195,8 +195,8 @@ public class TaskExecutorManagerTest extends TestLogger {
         final Time taskManagerTimeout = Time.milliseconds(50L);
 
         final CompletableFuture<InstanceID> releaseResourceFuture = new CompletableFuture<>();
-        final ResourceActions resourceManagerActions =
-                new TestingResourceActionsBuilder()
+        final ResourceAllocator resourceAllocator =
+                new TestingResourceAllocatorBuilder()
                         .setReleaseResourceConsumer(
                                 (instanceID, e) -> releaseResourceFuture.complete(instanceID))
                         .build();
@@ -207,7 +207,7 @@ public class TaskExecutorManagerTest extends TestLogger {
                 createTaskExecutorManagerBuilder()
                         .setTaskManagerTimeout(taskManagerTimeout)
                         .setDefaultWorkerResourceSpec(workerResourceSpec)
-                        .setResourceActions(resourceManagerActions)
+                        .setResourceAllocator(resourceAllocator)
                         .setMainThreadExecutor(mainThreadExecutor)
                         .createTaskExecutorManager()) {
 
@@ -248,8 +248,8 @@ public class TaskExecutorManagerTest extends TestLogger {
                 ResourceProfile.newBuilder().setCpuCores(numCoresPerWorker + 1).build();
 
         final AtomicInteger resourceRequests = new AtomicInteger(0);
-        ResourceActions resourceActions =
-                createResourceActionsBuilder()
+        ResourceAllocator resourceAllocator =
+                createResourceAllocatorBuilder()
                         .setAllocateResourceFunction(
                                 ignored -> {
                                     resourceRequests.incrementAndGet();
@@ -262,7 +262,7 @@ public class TaskExecutorManagerTest extends TestLogger {
                         .setDefaultWorkerResourceSpec(workerResourceSpec)
                         .setNumSlotsPerWorker(1)
                         .setMaxNumSlots(1)
-                        .setResourceActions(resourceActions)
+                        .setResourceAllocator(resourceAllocator)
                         .createTaskExecutorManager()) {
 
             assertThat(
@@ -281,8 +281,8 @@ public class TaskExecutorManagerTest extends TestLogger {
         final int maxSlotNum = 1;
 
         final AtomicInteger resourceRequests = new AtomicInteger(0);
-        ResourceActions resourceActions =
-                createResourceActionsBuilder()
+        ResourceAllocator resourceAllocator =
+                createResourceAllocatorBuilder()
                         .setAllocateResourceFunction(
                                 ignored -> {
                                     resourceRequests.incrementAndGet();
@@ -294,7 +294,7 @@ public class TaskExecutorManagerTest extends TestLogger {
                 createTaskExecutorManagerBuilder()
                         .setNumSlotsPerWorker(numberSlots)
                         .setMaxNumSlots(maxSlotNum)
-                        .setResourceActions(resourceActions)
+                        .setResourceAllocator(resourceAllocator)
                         .createTaskExecutorManager()) {
 
             assertThat(resourceRequests.get(), is(0));
@@ -317,8 +317,8 @@ public class TaskExecutorManagerTest extends TestLogger {
         final int maxSlotNum = 1;
 
         final CompletableFuture<InstanceID> releasedResourceFuture = new CompletableFuture<>();
-        ResourceActions resourceActions =
-                createResourceActionsBuilder()
+        ResourceAllocator resourceAllocator =
+                createResourceAllocatorBuilder()
                         .setReleaseResourceConsumer(
                                 (instanceID, e) -> releasedResourceFuture.complete(instanceID))
                         .build();
@@ -327,7 +327,7 @@ public class TaskExecutorManagerTest extends TestLogger {
                 createTaskExecutorManagerBuilder()
                         .setNumSlotsPerWorker(numberSlots)
                         .setMaxNumSlots(maxSlotNum)
-                        .setResourceActions(resourceActions)
+                        .setResourceAllocator(resourceAllocator)
                         .createTaskExecutorManager()) {
 
             createAndRegisterTaskExecutor(taskExecutorManager, 1, ResourceProfile.ANY);
@@ -376,11 +376,11 @@ public class TaskExecutorManagerTest extends TestLogger {
     private static TaskExecutorManagerBuilder createTaskExecutorManagerBuilder() {
         return new TaskExecutorManagerBuilder(
                         new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()))
-                .setResourceActions(createResourceActionsBuilder().build());
+                .setResourceAllocator(createResourceAllocatorBuilder().build());
     }
 
-    private static TestingResourceActionsBuilder createResourceActionsBuilder() {
-        return new TestingResourceActionsBuilder()
+    private static TestingResourceAllocatorBuilder createResourceAllocatorBuilder() {
+        return new TestingResourceAllocatorBuilder()
                 // ensures we do something when excess resource are requested
                 .setAllocateResourceFunction(ignored -> true);
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java
similarity index 67%
rename from flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java
rename to flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java
index a302b2001e7..83207ce41cd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java
@@ -18,37 +18,26 @@
 
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
-import org.apache.flink.runtime.slots.ResourceRequirement;
 
 import javax.annotation.Nonnull;
 
-import java.util.Collection;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 
-/** Testing implementation of the {@link ResourceActions}. */
-public class TestingResourceActions implements ResourceActions {
+/** Testing implementation of the {@link ResourceAllocator}. */
+public class TestingResourceAllocator implements ResourceAllocator {
 
     @Nonnull private final BiConsumer<InstanceID, Exception> releaseResourceConsumer;
 
     @Nonnull private final Function<WorkerResourceSpec, Boolean> allocateResourceFunction;
 
-    @Nonnull
-    private final BiConsumer<JobID, Collection<ResourceRequirement>>
-            notifyNotEnoughResourcesConsumer;
-
-    public TestingResourceActions(
+    public TestingResourceAllocator(
             @Nonnull BiConsumer<InstanceID, Exception> releaseResourceConsumer,
-            @Nonnull Function<WorkerResourceSpec, Boolean> allocateResourceFunction,
-            @Nonnull
-                    BiConsumer<JobID, Collection<ResourceRequirement>>
-                            notifyNotEnoughResourcesConsumer) {
+            @Nonnull Function<WorkerResourceSpec, Boolean> allocateResourceFunction) {
         this.releaseResourceConsumer = releaseResourceConsumer;
         this.allocateResourceFunction = allocateResourceFunction;
-        this.notifyNotEnoughResourcesConsumer = notifyNotEnoughResourcesConsumer;
     }
 
     @Override
@@ -60,10 +49,4 @@ public class TestingResourceActions implements ResourceActions {
     public boolean allocateResource(WorkerResourceSpec workerResourceSpec) {
         return allocateResourceFunction.apply(workerResourceSpec);
     }
-
-    @Override
-    public void notifyNotEnoughResourcesAvailable(
-            JobID jobId, Collection<ResourceRequirement> acquiredResources) {
-        notifyNotEnoughResourcesConsumer.accept(jobId, acquiredResources);
-    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocatorBuilder.java
similarity index 64%
rename from flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java
rename to flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocatorBuilder.java
index a2c1a07c30a..8dc5abd2a81 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocatorBuilder.java
@@ -18,36 +18,31 @@
 
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
-import org.apache.flink.runtime.slots.ResourceRequirement;
 
-import java.util.Collection;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
-/** Builder for the {@link TestingResourceActions}. */
-public class TestingResourceActionsBuilder {
+/** Builder for the {@link TestingResourceAllocator}. */
+public class TestingResourceAllocatorBuilder {
     private BiConsumer<InstanceID, Exception> releaseResourceConsumer = (ignoredA, ignoredB) -> {};
     private Function<WorkerResourceSpec, Boolean> allocateResourceFunction = (ignored) -> true;
-    private BiConsumer<JobID, Collection<ResourceRequirement>> notifyNotEnoughResourcesConsumer =
-            (ignoredA, ignoredB) -> {};
 
-    public TestingResourceActionsBuilder setReleaseResourceConsumer(
+    public TestingResourceAllocatorBuilder setReleaseResourceConsumer(
             BiConsumer<InstanceID, Exception> releaseResourceConsumer) {
         this.releaseResourceConsumer = releaseResourceConsumer;
         return this;
     }
 
-    public TestingResourceActionsBuilder setAllocateResourceFunction(
+    public TestingResourceAllocatorBuilder setAllocateResourceFunction(
             Function<WorkerResourceSpec, Boolean> allocateResourceFunction) {
         this.allocateResourceFunction = allocateResourceFunction;
         return this;
     }
 
-    public TestingResourceActionsBuilder setAllocateResourceConsumer(
+    public TestingResourceAllocatorBuilder setAllocateResourceConsumer(
             Consumer<WorkerResourceSpec> allocateResourceConsumer) {
         this.allocateResourceFunction =
                 workerRequest -> {
@@ -57,16 +52,7 @@ public class TestingResourceActionsBuilder {
         return this;
     }
 
-    public TestingResourceActionsBuilder setNotEnoughResourcesConsumer(
-            BiConsumer<JobID, Collection<ResourceRequirement>> notifyNotEnoughResourcesConsumer) {
-        this.notifyNotEnoughResourcesConsumer = notifyNotEnoughResourcesConsumer;
-        return this;
-    }
-
-    public TestingResourceActions build() {
-        return new TestingResourceActions(
-                releaseResourceConsumer,
-                allocateResourceFunction,
-                notifyNotEnoughResourcesConsumer);
+    public TestingResourceAllocator build() {
+        return new TestingResourceAllocator(releaseResourceConsumer, allocateResourceFunction);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceEventListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceEventListener.java
new file mode 100644
index 00000000000..bff5f30f04a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceEventListener.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.function.BiConsumer;
+
+/** Testing implementation of the {@link ResourceEventListener}. */
+public class TestingResourceEventListener implements ResourceEventListener {
+
+    @Nonnull
+    private final BiConsumer<JobID, Collection<ResourceRequirement>>
+            notEnoughResourceAvailableConsumer;
+
+    public TestingResourceEventListener(
+            @Nonnull
+                    BiConsumer<JobID, Collection<ResourceRequirement>>
+                            notEnoughResourceAvailableConsumer) {
+        this.notEnoughResourceAvailableConsumer = notEnoughResourceAvailableConsumer;
+    }
+
+    @Override
+    public void notEnoughResourceAvailable(
+            JobID jobId, Collection<ResourceRequirement> acquiredResources) {
+        notEnoughResourceAvailableConsumer.accept(jobId, acquiredResources);
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceEventListenerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceEventListenerBuilder.java
new file mode 100644
index 00000000000..5e2e8d884ec
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceEventListenerBuilder.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+
+import java.util.Collection;
+import java.util.function.BiConsumer;
+
+/** Builder for the {@link TestingResourceEventListener}. */
+public class TestingResourceEventListenerBuilder {
+    private BiConsumer<JobID, Collection<ResourceRequirement>> notEnoughResourceAvailableConsumer =
+            (ignoredA, ignoredB) -> {};
+
+    public TestingResourceEventListenerBuilder setNotEnoughResourceAvailableConsumer(
+            BiConsumer<JobID, Collection<ResourceRequirement>> notEnoughResourceAvailableConsumer) {
+        this.notEnoughResourceAvailableConsumer = notEnoughResourceAvailableConsumer;
+        return this;
+    }
+
+    public TestingResourceEventListener build() {
+        return new TestingResourceEventListener(notEnoughResourceAvailableConsumer);
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java
index 4494320d488..bb740f8fb59 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java
@@ -114,7 +114,8 @@ public class TestingSlotManager implements SlotManager {
     public void start(
             ResourceManagerId newResourceManagerId,
             Executor newMainThreadExecutor,
-            ResourceActions newResourceActions,
+            ResourceAllocator newResourceAllocator,
+            ResourceEventListener resourceEventListener,
             BlockedTaskManagerChecker newBlockedTaskManagerChecker) {}
 
     @Override