You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/12/06 11:23:46 UTC

[flink] 02/02: [FLINK-29870][runtime] move ResourceAllocator to ActiveResourceManager.

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 209df810f13e2fcbe5a4ca8bb015a8a5f662adc4
Author: Weihua Hu <hu...@gmail.com>
AuthorDate: Thu Nov 3 21:13:36 2022 +0800

    [FLINK-29870][runtime] move ResourceAllocator to ActiveResourceManager.
    
    This closes #21233
---
 .../runtime/resourcemanager/ResourceManager.java   | 75 ++++++++--------------
 .../resourcemanager/StandaloneResourceManager.java | 23 +++----
 .../active/ActiveResourceManager.java              | 70 +++++++++++++++-----
 .../slotmanager/FineGrainedSlotManager.java        | 25 +++++---
 .../NonSupportedResourceAllocatorImpl.java}        | 34 +++++-----
 .../slotmanager/ResourceAllocator.java             | 14 +++-
 .../slotmanager/TaskExecutorManager.java           | 27 ++++----
 .../resourcemanager/ResourceManagerTest.java       | 31 +++------
 .../resourcemanager/TestingResourceManager.java    | 27 ++++----
 .../TestingResourceManagerFactory.java             | 20 +++---
 .../active/ActiveResourceManagerTest.java          | 71 ++++++++++----------
 .../slotmanager/DeclarativeSlotManagerTest.java    | 18 ++----
 ...gerDefaultResourceAllocationStrategyITCase.java |  7 +-
 .../slotmanager/TaskExecutorManagerTest.java       | 14 +---
 .../slotmanager/TestingResourceAllocator.java      | 23 +++++--
 .../TestingResourceAllocatorBuilder.java           | 17 +----
 16 files changed, 246 insertions(+), 250 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 3abd6b6b68d..5d38143974d 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -169,6 +169,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 
     private final AtomicReference<byte[]> latestTokens = new AtomicReference<>();
 
+    private final ResourceAllocator resourceAllocator;
+
     public ResourceManager(
             RpcService rpcService,
             UUID leaderSessionId,
@@ -234,6 +236,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
         this.startedFuture = new CompletableFuture<>();
 
         this.delegationTokenManager = delegationTokenManager;
+
+        this.resourceAllocator = getResourceAllocator();
     }
 
     // ------------------------------------------------------------------------
@@ -267,7 +271,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
             slotManager.start(
                     getFencingToken(),
                     getMainThreadExecutor(),
-                    new ResourceAllocatorImpl(),
+                    resourceAllocator,
                     new ResourceEventListenerImpl(),
                     blocklistHandler::isBlockedTaskManager);
 
@@ -533,7 +537,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 
     @Override
     public void disconnectTaskManager(final ResourceID resourceId, final Exception cause) {
-        closeTaskManagerConnection(resourceId, cause).ifPresent(ResourceManager.this::stopWorker);
+        closeTaskManagerConnection(resourceId, cause)
+                .ifPresent(ResourceManager.this::stopWorkerIfSupported);
     }
 
     @Override
@@ -984,10 +989,11 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
                                     taskExecutorResourceId.getStringWithMetadata())));
         }
 
-        final WorkerType newWorker = workerStarted(taskExecutorResourceId);
+        final Optional<WorkerType> newWorkerOptional =
+                getWorkerNodeIfAcceptRegistration(taskExecutorResourceId);
 
         String taskExecutorAddress = taskExecutorRegistration.getTaskExecutorAddress();
-        if (newWorker == null) {
+        if (!newWorkerOptional.isPresent()) {
             log.warn(
                     "Discard registration from TaskExecutor {} at ({}) because the framework did "
                             + "not recognize it",
@@ -996,6 +1002,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
             return new TaskExecutorRegistrationRejection(
                     "The ResourceManager does not recognize this TaskExecutor.");
         } else {
+            WorkerType newWorker = newWorkerOptional.get();
             WorkerRegistration<WorkerType> registration =
                     new WorkerRegistration<>(
                             taskExecutorGateway,
@@ -1159,9 +1166,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
         }
     }
 
-    protected void releaseResource(InstanceID instanceId, Exception cause) {
+    protected WorkerType getWorkerByInstanceId(InstanceID instanceId) {
         WorkerType worker = null;
-
         // TODO: Improve performance by having an index on the instanceId
         for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> entry :
                 taskExecutors.entrySet()) {
@@ -1171,18 +1177,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
             }
         }
 
-        if (worker != null) {
-            if (stopWorker(worker)) {
-                closeTaskManagerConnection(worker.getResourceID(), cause);
-            } else {
-                log.debug(
-                        "Worker {} could not be stopped.",
-                        worker.getResourceID().getStringWithMetadata());
-            }
-        } else {
-            // unregister in order to clean up potential left over state
-            slotManager.unregisterTaskManager(instanceId, cause);
-        }
+        return worker;
     }
 
     private enum ResourceRequirementHandling {
@@ -1265,29 +1260,23 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
             throws ResourceManagerException;
 
     /**
-     * Allocates a resource using the worker resource specification.
-     *
-     * @param workerResourceSpec workerResourceSpec specifies the size of the to be allocated
-     *     resource
-     * @return whether the resource can be allocated
-     */
-    @VisibleForTesting
-    public abstract boolean startNewWorker(WorkerResourceSpec workerResourceSpec);
-
-    /**
-     * Callback when a worker was started.
+     * Get worker node if the worker resource is accepted.
      *
      * @param resourceID The worker resource id
      */
-    protected abstract WorkerType workerStarted(ResourceID resourceID);
+    protected abstract Optional<WorkerType> getWorkerNodeIfAcceptRegistration(
+            ResourceID resourceID);
 
     /**
-     * Stops the given worker.
+     * Stops the given worker if supported.
      *
      * @param worker The worker.
-     * @return True if the worker was stopped, otherwise false
      */
-    public abstract boolean stopWorker(WorkerType worker);
+    public void stopWorkerIfSupported(WorkerType worker) {
+        if (resourceAllocator.isSupported()) {
+            resourceAllocator.releaseResource(worker.getResourceID());
+        }
+    }
 
     /**
      * Get the ready to serve future of the resource manager.
@@ -1297,6 +1286,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
      */
     protected abstract CompletableFuture<Void> getReadyToServeFuture();
 
+    protected abstract ResourceAllocator getResourceAllocator();
+
     /**
      * Set {@link SlotManager} whether to fail unfulfillable slot requests.
      *
@@ -1336,22 +1327,6 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
         }
     }
 
-    private class ResourceAllocatorImpl implements ResourceAllocator {
-
-        @Override
-        public void releaseResource(InstanceID instanceId, Exception cause) {
-            validateRunsInMainThread();
-
-            ResourceManager.this.releaseResource(instanceId, cause);
-        }
-
-        @Override
-        public boolean allocateResource(WorkerResourceSpec workerResourceSpec) {
-            validateRunsInMainThread();
-            return startNewWorker(workerResourceSpec);
-        }
-    }
-
     private class ResourceEventListenerImpl implements ResourceEventListener {
         @Override
         public void notEnoughResourceAvailable(
@@ -1419,7 +1394,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
         private void handleTaskManagerConnectionLoss(ResourceID resourceID, Exception cause) {
             validateRunsInMainThread();
             closeTaskManagerConnection(resourceID, cause)
-                    .ifPresent(ResourceManager.this::stopWorker);
+                    .ifPresent(ResourceManager.this::stopWorkerIfSupported);
         }
 
         @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index f41f5bb4d79..e7d431702dd 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -27,6 +27,8 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerFactory;
 import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.NonSupportedResourceAllocatorImpl;
+import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocator;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -35,6 +37,7 @@ import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -100,19 +103,8 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
             ApplicationStatus finalStatus, @Nullable String diagnostics) {}
 
     @Override
-    public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) {
-        return false;
-    }
-
-    @Override
-    public boolean stopWorker(ResourceID resourceID) {
-        // standalone resource manager cannot stop workers
-        return false;
-    }
-
-    @Override
-    protected ResourceID workerStarted(ResourceID resourceID) {
-        return resourceID;
+    protected Optional<ResourceID> getWorkerNodeIfAcceptRegistration(ResourceID resourceID) {
+        return Optional.of(resourceID);
     }
 
     private void startStartupPeriod() {
@@ -132,4 +124,9 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
     public CompletableFuture<Void> getReadyToServeFuture() {
         return CompletableFuture.completedFuture(null);
     }
+
+    @Override
+    protected ResourceAllocator getResourceAllocator() {
+        return NonSupportedResourceAllocatorImpl.INSTANCE;
+    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
index e5cb5315d63..61e2ea234ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerFactory;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.ThresholdMeter;
@@ -38,6 +39,7 @@ import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
 import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocator;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -54,6 +56,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
@@ -199,20 +202,8 @@ public class ActiveResourceManager<WorkerType extends ResourceIDRetrievable>
     }
 
     @Override
-    public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) {
-        requestNewWorker(workerResourceSpec);
-        return true;
-    }
-
-    @Override
-    protected WorkerType workerStarted(ResourceID resourceID) {
-        return workerNodeMap.get(resourceID);
-    }
-
-    @Override
-    public boolean stopWorker(WorkerType worker) {
-        internalStopWorker(worker.getResourceID());
-        return true;
+    protected Optional<WorkerType> getWorkerNodeIfAcceptRegistration(ResourceID resourceID) {
+        return Optional.ofNullable(workerNodeMap.get(resourceID));
     }
 
     @Override
@@ -299,7 +290,24 @@ public class ActiveResourceManager<WorkerType extends ResourceIDRetrievable>
     //  Internal
     // ------------------------------------------------------------------------
 
-    private void requestNewWorker(WorkerResourceSpec workerResourceSpec) {
+    private void releaseResource(InstanceID instanceId, Exception cause) {
+        WorkerType worker = getWorkerByInstanceId(instanceId);
+        if (worker != null) {
+            internalStopWorker(worker.getResourceID());
+            closeTaskManagerConnection(worker.getResourceID(), cause);
+        } else {
+            log.debug("Instance {} not found in ResourceManager.", instanceId);
+        }
+    }
+
+    /**
+     * Allocates a resource using the worker resource specification.
+     *
+     * @param workerResourceSpec workerResourceSpec specifies the size of the to be allocated
+     *     resource
+     */
+    @VisibleForTesting
+    public void requestNewWorker(WorkerResourceSpec workerResourceSpec) {
         final TaskExecutorProcessSpec taskExecutorProcessSpec =
                 TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(
                         flinkConfig, workerResourceSpec);
@@ -451,6 +459,11 @@ public class ActiveResourceManager<WorkerType extends ResourceIDRetrievable>
         return readyToServeFuture;
     }
 
+    @Override
+    protected ResourceAllocator getResourceAllocator() {
+        return new ResourceAllocatorImpl();
+    }
+
     private void tryRemovePreviousPendingRecoveryTaskManager(ResourceID resourceID) {
         long sizeBeforeRemove = previousAttemptUnregisteredWorkers.size();
         if (previousAttemptUnregisteredWorkers.remove(resourceID)) {
@@ -499,6 +512,33 @@ public class ActiveResourceManager<WorkerType extends ResourceIDRetrievable>
         }
     }
 
+    private class ResourceAllocatorImpl implements ResourceAllocator {
+
+        @Override
+        public boolean isSupported() {
+            return true;
+        }
+
+        @Override
+        public void releaseResource(InstanceID instanceId, Exception cause) {
+            validateRunsInMainThread();
+
+            ActiveResourceManager.this.releaseResource(instanceId, cause);
+        }
+
+        @Override
+        public void releaseResource(ResourceID resourceID) {
+            validateRunsInMainThread();
+            internalStopWorker(resourceID);
+        }
+
+        @Override
+        public void allocateResource(WorkerResourceSpec workerResourceSpec) {
+            validateRunsInMainThread();
+            requestNewWorker(workerResourceSpec);
+        }
+    }
+
     // ------------------------------------------------------------------------
     //  Testing
     // ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
index c999d9883de..125679cedef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
@@ -346,8 +346,10 @@ public class FineGrainedSlotManager implements SlotManager {
                             : findMatchingPendingTaskManager(
                                     totalResourceProfile, defaultSlotResourceProfile);
 
-            if (!matchedPendingTaskManagerOptional.isPresent()
+            if (resourceAllocator.isSupported()
+                    && !matchedPendingTaskManagerOptional.isPresent()
                     && isMaxTotalResourceExceededAfterAdding(totalResourceProfile)) {
+
                 LOG.info(
                         "Releasing task manager {}. The max total resource limitation <{}, {}> is reached.",
                         taskExecutorConnection.getResourceID(),
@@ -357,6 +359,7 @@ public class FineGrainedSlotManager implements SlotManager {
                         taskExecutorConnection.getInstanceID(),
                         new FlinkExpectedException(
                                 "The max total resource limitation is reached."));
+
                 return false;
             }
 
@@ -735,12 +738,19 @@ public class FineGrainedSlotManager implements SlotManager {
     }
 
     private void releaseIdleTaskExecutor(InstanceID timedOutTaskManagerId) {
-        final FlinkExpectedException cause =
-                new FlinkExpectedException("TaskManager exceeded the idle timeout.");
-        resourceAllocator.releaseResource(timedOutTaskManagerId, cause);
+        if (resourceAllocator.isSupported()) {
+            final FlinkExpectedException cause =
+                    new FlinkExpectedException("TaskManager exceeded the idle timeout.");
+            resourceAllocator.releaseResource(timedOutTaskManagerId, cause);
+        }
     }
 
     private boolean allocateResource(PendingTaskManager pendingTaskManager) {
+        if (!resourceAllocator.isSupported()) {
+            // resource cannot be allocated
+            return false;
+        }
+
         if (isMaxTotalResourceExceededAfterAdding(pendingTaskManager.getTotalResourceProfile())) {
             LOG.info(
                     "Could not allocate {}. Max total resource limitation <{}, {}> is reached.",
@@ -750,13 +760,10 @@ public class FineGrainedSlotManager implements SlotManager {
             return false;
         }
 
-        if (!resourceAllocator.allocateResource(
+        resourceAllocator.allocateResource(
                 WorkerResourceSpec.fromTotalResourceProfile(
                         pendingTaskManager.getTotalResourceProfile(),
-                        pendingTaskManager.getNumSlots()))) {
-            // resource cannot be allocated
-            return false;
-        }
+                        pendingTaskManager.getNumSlots()));
 
         taskManagerTracker.addPendingTaskManager(pendingTaskManager);
         return true;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/NonSupportedResourceAllocatorImpl.java
similarity index 53%
copy from flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/NonSupportedResourceAllocatorImpl.java
index 83207ce41cd..52c22ec4ad8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/NonSupportedResourceAllocatorImpl.java
@@ -18,35 +18,33 @@
 
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
 
-import javax.annotation.Nonnull;
+/** ResourceAllocator that not support to allocate/release resources. */
+public class NonSupportedResourceAllocatorImpl implements ResourceAllocator {
+    public static final NonSupportedResourceAllocatorImpl INSTANCE =
+            new NonSupportedResourceAllocatorImpl();
 
-import java.util.function.BiConsumer;
-import java.util.function.Function;
+    private NonSupportedResourceAllocatorImpl() {}
 
-/** Testing implementation of the {@link ResourceAllocator}. */
-public class TestingResourceAllocator implements ResourceAllocator {
-
-    @Nonnull private final BiConsumer<InstanceID, Exception> releaseResourceConsumer;
-
-    @Nonnull private final Function<WorkerResourceSpec, Boolean> allocateResourceFunction;
-
-    public TestingResourceAllocator(
-            @Nonnull BiConsumer<InstanceID, Exception> releaseResourceConsumer,
-            @Nonnull Function<WorkerResourceSpec, Boolean> allocateResourceFunction) {
-        this.releaseResourceConsumer = releaseResourceConsumer;
-        this.allocateResourceFunction = allocateResourceFunction;
+    @Override
+    public boolean isSupported() {
+        return false;
     }
 
     @Override
     public void releaseResource(InstanceID instanceId, Exception cause) {
-        releaseResourceConsumer.accept(instanceId, cause);
+        throw new UnsupportedOperationException();
     }
 
     @Override
-    public boolean allocateResource(WorkerResourceSpec workerResourceSpec) {
-        return allocateResourceFunction.apply(workerResourceSpec);
+    public void releaseResource(ResourceID resourceID) {
+        throw new UnsupportedOperationException();
+    }
+
+    public void allocateResource(WorkerResourceSpec workerResourceSpec) {
+        throw new UnsupportedOperationException();
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocator.java
index 179a2ad5ae1..071242f419f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocator.java
@@ -18,12 +18,16 @@
 
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
 
 /** Resource related actions which the {@link SlotManager} can perform. */
 public interface ResourceAllocator {
 
+    /** Whether allocate/release resources are supported. */
+    boolean isSupported();
+
     /**
      * Releases the resource with the given instance id.
      *
@@ -32,11 +36,17 @@ public interface ResourceAllocator {
      */
     void releaseResource(InstanceID instanceId, Exception cause);
 
+    /**
+     * Releases the resource with the given resource id.
+     *
+     * @param resourceID identifying which resource to release
+     */
+    void releaseResource(ResourceID resourceID);
+
     /**
      * Requests to allocate a resource with the given {@link WorkerResourceSpec}.
      *
      * @param workerResourceSpec for the to be allocated worker
-     * @return whether the resource can be allocated
      */
-    boolean allocateResource(WorkerResourceSpec workerResourceSpec);
+    void allocateResource(WorkerResourceSpec workerResourceSpec);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
index d1515a45eea..6755342938b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
@@ -153,7 +153,8 @@ class TaskExecutorManager implements AutoCloseable {
             SlotReport initialSlotReport,
             ResourceProfile totalResourceProfile,
             ResourceProfile defaultSlotResourceProfile) {
-        if (isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {
+        if (resourceAllocator.isSupported()
+                && isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {
             LOG.info(
                     "The total number of slots exceeds the max limitation {}, releasing the excess task executor.",
                     maxSlotNum);
@@ -254,6 +255,11 @@ class TaskExecutorManager implements AutoCloseable {
      */
     public Optional<ResourceRequirement> allocateWorker(
             ResourceProfile requestedSlotResourceProfile) {
+        if (!resourceAllocator.isSupported()) {
+            // resource cannot be allocated
+            return Optional.empty();
+        }
+
         final int numRegisteredSlots = getNumberRegisteredSlots();
         final int numPendingSlots = getNumberPendingTaskManagerSlots();
         if (isMaxSlotNumExceededAfterAdding(numSlotsPerWorker)) {
@@ -270,10 +276,7 @@ class TaskExecutorManager implements AutoCloseable {
             return Optional.empty();
         }
 
-        if (!resourceAllocator.allocateResource(defaultWorkerResourceSpec)) {
-            // resource cannot be allocated
-            return Optional.empty();
-        }
+        resourceAllocator.allocateResource(defaultWorkerResourceSpec);
 
         for (int i = 0; i < numSlotsPerWorker; ++i) {
             PendingTaskManagerSlot pendingTaskManagerSlot =
@@ -401,12 +404,14 @@ class TaskExecutorManager implements AutoCloseable {
     }
 
     private void releaseIdleTaskExecutor(InstanceID timedOutTaskManagerId) {
-        final FlinkExpectedException cause =
-                new FlinkExpectedException("TaskExecutor exceeded the idle timeout.");
-        LOG.debug(
-                "Release TaskExecutor {} because it exceeded the idle timeout.",
-                timedOutTaskManagerId);
-        resourceAllocator.releaseResource(timedOutTaskManagerId, cause);
+        if (resourceAllocator.isSupported()) {
+            final FlinkExpectedException cause =
+                    new FlinkExpectedException("TaskExecutor exceeded the idle timeout.");
+            LOG.debug(
+                    "Release TaskExecutor {} because it exceeded the idle timeout.",
+                    timedOutTaskManagerId);
+            resourceAllocator.releaseResource(timedOutTaskManagerId, cause);
+        }
     }
 
     // ---------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index 05087e8b148..a4b1131ff15 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -84,7 +84,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
-import java.util.function.Function;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -491,12 +490,7 @@ class ResourceManagerTest {
         rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway);
 
         runHeartbeatTimeoutTest(
-                builder ->
-                        builder.withStopWorkerFunction(
-                                (worker) -> {
-                                    stopWorkerFuture.complete(worker);
-                                    return true;
-                                }),
+                builder -> builder.withStopWorkerConsumer(stopWorkerFuture::complete),
                 resourceManagerGateway ->
                         registerTaskExecutor(
                                 resourceManagerGateway,
@@ -538,12 +532,7 @@ class ResourceManagerTest {
         rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway);
 
         runHeartbeatTargetBecomesUnreachableTest(
-                builder ->
-                        builder.withStopWorkerFunction(
-                                (worker) -> {
-                                    stopWorkerFuture.complete(worker);
-                                    return true;
-                                }),
+                builder -> builder.withStopWorkerConsumer(stopWorkerFuture::complete),
                 resourceManagerGateway ->
                         registerTaskExecutor(
                                 resourceManagerGateway,
@@ -579,7 +568,7 @@ class ResourceManagerTest {
 
         resourceManager =
                 new ResourceManagerBuilder()
-                        .withStopWorkerFunction(stopWorkerFuture::complete)
+                        .withStopWorkerConsumer(stopWorkerFuture::complete)
                         .buildAndStart();
 
         registerTaskExecutor(resourceManager, taskExecutorId, taskExecutorGateway.getAddress());
@@ -884,7 +873,7 @@ class ResourceManagerTest {
         private SlotManager slotManager = null;
         private BlocklistHandler.Factory blocklistHandlerFactory =
                 new NoOpBlocklistHandler.Factory();
-        private Function<ResourceID, Boolean> stopWorkerFunction = null;
+        private Consumer<ResourceID> stopWorkerConsumer = null;
         private CompletableFuture<Void> readyToServeFuture =
                 CompletableFuture.completedFuture(null);
 
@@ -910,9 +899,9 @@ class ResourceManagerTest {
             return this;
         }
 
-        private ResourceManagerBuilder withStopWorkerFunction(
-                Function<ResourceID, Boolean> stopWorkerFunction) {
-            this.stopWorkerFunction = stopWorkerFunction;
+        private ResourceManagerBuilder withStopWorkerConsumer(
+                Consumer<ResourceID> stopWorkerConsumer) {
+            this.stopWorkerConsumer = stopWorkerConsumer;
             return this;
         }
 
@@ -941,8 +930,8 @@ class ResourceManagerTest {
                                 .build();
             }
 
-            if (stopWorkerFunction == null) {
-                stopWorkerFunction = (ignore) -> false;
+            if (stopWorkerConsumer == null) {
+                stopWorkerConsumer = (ignore) -> {};
             }
 
             resourceManagerId = ResourceManagerId.generate();
@@ -959,7 +948,7 @@ class ResourceManagerTest {
                             jobLeaderIdService,
                             testingFatalErrorHandler,
                             UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup(),
-                            stopWorkerFunction,
+                            stopWorkerConsumer,
                             readyToServeFuture);
 
             resourceManager.start();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
index f73d4df3c0a..4289de57636 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
@@ -27,6 +27,8 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerFactory;
 import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.NonSupportedResourceAllocatorImpl;
+import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocator;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -36,16 +38,17 @@ import org.apache.flink.util.TimeUtils;
 
 import javax.annotation.Nullable;
 
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ForkJoinPool;
-import java.util.function.Function;
+import java.util.function.Consumer;
 
 /** Simple {@link ResourceManager} implementation for testing purposes. */
 public class TestingResourceManager extends ResourceManager<ResourceID> {
 
-    private final Function<ResourceID, Boolean> stopWorkerFunction;
+    private final Consumer<ResourceID> stopWorkerConsumer;
     private final CompletableFuture<Void> readyToServeFuture;
 
     public TestingResourceManager(
@@ -60,7 +63,7 @@ public class TestingResourceManager extends ResourceManager<ResourceID> {
             JobLeaderIdService jobLeaderIdService,
             FatalErrorHandler fatalErrorHandler,
             ResourceManagerMetricGroup resourceManagerMetricGroup,
-            Function<ResourceID, Boolean> stopWorkerFunction,
+            Consumer<ResourceID> stopWorkerConsumer,
             CompletableFuture<Void> readyToServeFuture) {
         super(
                 rpcService,
@@ -78,7 +81,7 @@ public class TestingResourceManager extends ResourceManager<ResourceID> {
                 RpcUtils.INF_TIMEOUT,
                 ForkJoinPool.commonPool());
 
-        this.stopWorkerFunction = stopWorkerFunction;
+        this.stopWorkerConsumer = stopWorkerConsumer;
         this.readyToServeFuture = readyToServeFuture;
     }
 
@@ -100,23 +103,23 @@ public class TestingResourceManager extends ResourceManager<ResourceID> {
     }
 
     @Override
-    public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) {
-        return false;
+    protected Optional<ResourceID> getWorkerNodeIfAcceptRegistration(ResourceID resourceID) {
+        return Optional.of(resourceID);
     }
 
     @Override
-    protected ResourceID workerStarted(ResourceID resourceID) {
-        return resourceID;
+    public void stopWorkerIfSupported(ResourceID worker) {
+        stopWorkerConsumer.accept(worker);
     }
 
     @Override
-    public boolean stopWorker(ResourceID worker) {
-        return stopWorkerFunction.apply(worker);
+    public CompletableFuture<Void> getReadyToServeFuture() {
+        return readyToServeFuture;
     }
 
     @Override
-    public CompletableFuture<Void> getReadyToServeFuture() {
-        return readyToServeFuture;
+    protected ResourceAllocator getResourceAllocator() {
+        return NonSupportedResourceAllocatorImpl.INSTANCE;
     }
 
     public <T> CompletableFuture<T> runInMainThread(Callable<T> callable, Time timeout) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
index 181bc8c9618..67bc11a4266 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
@@ -30,6 +30,8 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerFactory;
 import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerImpl;
 import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.slotmanager.NonSupportedResourceAllocatorImpl;
+import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocator;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -39,6 +41,7 @@ import org.apache.flink.util.function.TriConsumer;
 
 import javax.annotation.Nullable;
 
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -219,17 +222,7 @@ public class TestingResourceManagerFactory extends ResourceManagerFactory<Resour
         }
 
         @Override
-        public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        protected ResourceID workerStarted(ResourceID resourceID) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public boolean stopWorker(ResourceID worker) {
+        protected Optional<ResourceID> getWorkerNodeIfAcceptRegistration(ResourceID resourceID) {
             throw new UnsupportedOperationException();
         }
 
@@ -243,5 +236,10 @@ public class TestingResourceManagerFactory extends ResourceManagerFactory<Resour
         public CompletableFuture<Void> getReadyToServeFuture() {
             return CompletableFuture.completedFuture(null);
         }
+
+        @Override
+        protected ResourceAllocator getResourceAllocator() {
+            return NonSupportedResourceAllocatorImpl.INSTANCE;
+        }
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
index 764147d405b..b685d57f7c4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
@@ -107,18 +107,17 @@ public class ActiveResourceManagerTest extends TestLogger {
                 runTest(
                         () -> {
                             // received worker request, verify requesting from driver
-                            CompletableFuture<Boolean> startNewWorkerFuture =
+                            CompletableFuture<Void> startNewWorkerFuture =
                                     runInMainThread(
                                             () ->
                                                     getResourceManager()
-                                                            .startNewWorker(WORKER_RESOURCE_SPEC));
+                                                            .requestNewWorker(
+                                                                    WORKER_RESOURCE_SPEC));
                             TaskExecutorProcessSpec taskExecutorProcessSpec =
                                     requestWorkerFromDriverFuture.get(
                                             TIMEOUT_SEC, TimeUnit.SECONDS);
 
-                            assertThat(
-                                    startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    is(true));
+                            startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
                             assertThat(
                                     taskExecutorProcessSpec,
                                     is(
@@ -171,19 +170,18 @@ public class ActiveResourceManagerTest extends TestLogger {
                 runTest(
                         () -> {
                             // received worker request, verify requesting from driver
-                            CompletableFuture<Boolean> startNewWorkerFuture =
+                            CompletableFuture<Void> startNewWorkerFuture =
                                     runInMainThread(
                                             () ->
                                                     getResourceManager()
-                                                            .startNewWorker(WORKER_RESOURCE_SPEC));
+                                                            .requestNewWorker(
+                                                                    WORKER_RESOURCE_SPEC));
                             TaskExecutorProcessSpec taskExecutorProcessSpec1 =
                                     requestWorkerFromDriverFutures
                                             .get(0)
                                             .get(TIMEOUT_SEC, TimeUnit.SECONDS);
 
-                            assertThat(
-                                    startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    is(true));
+                            startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
                             assertThat(
                                     taskExecutorProcessSpec1,
                                     is(
@@ -250,19 +248,18 @@ public class ActiveResourceManagerTest extends TestLogger {
                 runTest(
                         () -> {
                             // received worker request, verify requesting from driver
-                            CompletableFuture<Boolean> startNewWorkerFuture =
+                            CompletableFuture<Void> startNewWorkerFuture =
                                     runInMainThread(
                                             () ->
                                                     getResourceManager()
-                                                            .startNewWorker(WORKER_RESOURCE_SPEC));
+                                                            .requestNewWorker(
+                                                                    WORKER_RESOURCE_SPEC));
                             TaskExecutorProcessSpec taskExecutorProcessSpec1 =
                                     requestWorkerFromDriverFutures
                                             .get(0)
                                             .get(TIMEOUT_SEC, TimeUnit.SECONDS);
 
-                            assertThat(
-                                    startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    is(true));
+                            startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
                             assertThat(
                                     taskExecutorProcessSpec1,
                                     is(
@@ -329,19 +326,18 @@ public class ActiveResourceManagerTest extends TestLogger {
                 runTest(
                         () -> {
                             // received worker request, verify requesting from driver
-                            CompletableFuture<Boolean> startNewWorkerFuture =
+                            CompletableFuture<Void> startNewWorkerFuture =
                                     runInMainThread(
                                             () ->
                                                     getResourceManager()
-                                                            .startNewWorker(WORKER_RESOURCE_SPEC));
+                                                            .requestNewWorker(
+                                                                    WORKER_RESOURCE_SPEC));
                             TaskExecutorProcessSpec taskExecutorProcessSpec1 =
                                     requestWorkerFromDriverFutures
                                             .get(0)
                                             .get(TIMEOUT_SEC, TimeUnit.SECONDS);
 
-                            assertThat(
-                                    startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    is(true));
+                            startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
                             assertThat(
                                     taskExecutorProcessSpec1,
                                     is(
@@ -408,19 +404,18 @@ public class ActiveResourceManagerTest extends TestLogger {
                 runTest(
                         () -> {
                             // received worker request, verify requesting from driver
-                            CompletableFuture<Boolean> startNewWorkerFuture =
+                            CompletableFuture<Void> startNewWorkerFuture =
                                     runInMainThread(
                                             () ->
                                                     getResourceManager()
-                                                            .startNewWorker(WORKER_RESOURCE_SPEC));
+                                                            .requestNewWorker(
+                                                                    WORKER_RESOURCE_SPEC));
                             TaskExecutorProcessSpec taskExecutorProcessSpec =
                                     requestWorkerFromDriverFutures
                                             .get(0)
                                             .get(TIMEOUT_SEC, TimeUnit.SECONDS);
 
-                            assertThat(
-                                    startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    is(true));
+                            startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
                             assertThat(
                                     taskExecutorProcessSpec,
                                     is(
@@ -482,7 +477,7 @@ public class ActiveResourceManagerTest extends TestLogger {
                             runInMainThread(
                                             () ->
                                                     getResourceManager()
-                                                            .startNewWorker(WORKER_RESOURCE_SPEC))
+                                                            .requestNewWorker(WORKER_RESOURCE_SPEC))
                                     .thenCompose(
                                             (ignore) ->
                                                     registerTaskExecutor(
@@ -539,18 +534,18 @@ public class ActiveResourceManagerTest extends TestLogger {
                 runTest(
                         () -> {
                             // received worker request, verify requesting from driver
-                            CompletableFuture<Boolean> startNewWorkerFuture =
+                            CompletableFuture<Void> startNewWorkerFuture =
                                     runInMainThread(
                                             () ->
                                                     getResourceManager()
-                                                            .startNewWorker(WORKER_RESOURCE_SPEC));
+                                                            .requestNewWorker(
+                                                                    WORKER_RESOURCE_SPEC));
                             long t1 =
                                     requestWorkerFromDriverFutures
                                             .get(0)
                                             .get(TIMEOUT_SEC, TimeUnit.SECONDS);
-                            assertThat(
-                                    startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    is(true));
+
+                            startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
 
                             // first worker failed before register, verify requesting another worker
                             // from driver
@@ -619,14 +614,14 @@ public class ActiveResourceManagerTest extends TestLogger {
                 runTest(
                         () -> {
                             // received worker request, verify requesting from driver
-                            CompletableFuture<Boolean> startNewWorkerFuture =
+                            CompletableFuture<Void> startNewWorkerFuture =
                                     runInMainThread(
                                             () ->
                                                     getResourceManager()
-                                                            .startNewWorker(WORKER_RESOURCE_SPEC));
-                            assertThat(
-                                    startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    is(true));
+                                                            .requestNewWorker(
+                                                                    WORKER_RESOURCE_SPEC));
+
+                            startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
 
                             long t1 =
                                     requestWorkerFromDriverFutures
@@ -745,7 +740,7 @@ public class ActiveResourceManagerTest extends TestLogger {
                             runInMainThread(
                                     () ->
                                             getResourceManager()
-                                                    .startNewWorker(WORKER_RESOURCE_SPEC));
+                                                    .requestNewWorker(WORKER_RESOURCE_SPEC));
 
                             // verify worker is released due to not registered in time
                             assertThat(
@@ -781,7 +776,7 @@ public class ActiveResourceManagerTest extends TestLogger {
                             runInMainThread(
                                     () ->
                                             getResourceManager()
-                                                    .startNewWorker(WORKER_RESOURCE_SPEC));
+                                                    .requestNewWorker(WORKER_RESOURCE_SPEC));
 
                             // resource allocation takes longer than worker registration timeout
                             try {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
index 684c4145cea..83a55c69857 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
@@ -258,12 +258,8 @@ class DeclarativeSlotManagerTest {
     void testRequirementDeclarationWithResourceAllocationFailure() throws Exception {
         final ResourceRequirements resourceRequirements = createResourceRequirementsForSingleSlot();
 
-        ResourceAllocator resourceAllocator =
-                new TestingResourceAllocatorBuilder()
-                        .setAllocateResourceFunction(value -> false)
-                        .build();
-
         final ResourceTracker resourceTracker = new DefaultResourceTracker();
+        final ResourceAllocator resourceAllocator = NonSupportedResourceAllocatorImpl.INSTANCE;
 
         try (DeclarativeSlotManager slotManager =
                 createDeclarativeSlotManagerBuilder()
@@ -1076,11 +1072,7 @@ class DeclarativeSlotManagerTest {
         final AtomicInteger resourceRequests = new AtomicInteger(0);
         final TestingResourceAllocator testingResourceAllocator =
                 new TestingResourceAllocatorBuilder()
-                        .setAllocateResourceFunction(
-                                ignored -> {
-                                    resourceRequests.incrementAndGet();
-                                    return true;
-                                })
+                        .setAllocateResourceConsumer(ignored -> resourceRequests.incrementAndGet())
                         .build();
 
         try (final DeclarativeSlotManager slotManager =
@@ -1191,10 +1183,6 @@ class DeclarativeSlotManagerTest {
 
         List<Tuple2<JobID, Collection<ResourceRequirement>>> notEnoughResourceNotifications =
                 new ArrayList<>();
-        ResourceAllocator resourceAllocator =
-                new TestingResourceAllocatorBuilder()
-                        .setAllocateResourceFunction(ignored -> false)
-                        .build();
 
         ResourceEventListener resourceEventListener =
                 new TestingResourceEventListenerBuilder()
@@ -1204,6 +1192,8 @@ class DeclarativeSlotManagerTest {
                                                 Tuple2.of(jobId1, acquiredResources)))
                         .build();
 
+        ResourceAllocator resourceAllocator = NonSupportedResourceAllocatorImpl.INSTANCE;
+
         try (DeclarativeSlotManager slotManager =
                 createDeclarativeSlotManagerBuilder()
                         .buildAndStart(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
index 73e0355bd31..4d208f5290a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
@@ -52,11 +52,8 @@ class FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase
 
         new Context() {
             {
-                resourceAllocatorBuilder.setAllocateResourceFunction(
-                        ignored -> {
-                            resourceRequests.incrementAndGet();
-                            return true;
-                        });
+                resourceAllocatorBuilder.setAllocateResourceConsumer(
+                        ignored -> resourceRequests.incrementAndGet());
                 runTest(
                         () -> {
                             runInMainThread(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
index ed01b63b4c7..848647b12dc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
@@ -250,11 +250,7 @@ public class TaskExecutorManagerTest extends TestLogger {
         final AtomicInteger resourceRequests = new AtomicInteger(0);
         ResourceAllocator resourceAllocator =
                 createResourceAllocatorBuilder()
-                        .setAllocateResourceFunction(
-                                ignored -> {
-                                    resourceRequests.incrementAndGet();
-                                    return true;
-                                })
+                        .setAllocateResourceConsumer(ignored -> resourceRequests.incrementAndGet())
                         .build();
 
         try (final TaskExecutorManager taskExecutorManager =
@@ -283,11 +279,7 @@ public class TaskExecutorManagerTest extends TestLogger {
         final AtomicInteger resourceRequests = new AtomicInteger(0);
         ResourceAllocator resourceAllocator =
                 createResourceAllocatorBuilder()
-                        .setAllocateResourceFunction(
-                                ignored -> {
-                                    resourceRequests.incrementAndGet();
-                                    return true;
-                                })
+                        .setAllocateResourceConsumer(ignored -> resourceRequests.incrementAndGet())
                         .build();
 
         try (final TaskExecutorManager taskExecutorManager =
@@ -382,7 +374,7 @@ public class TaskExecutorManagerTest extends TestLogger {
     private static TestingResourceAllocatorBuilder createResourceAllocatorBuilder() {
         return new TestingResourceAllocatorBuilder()
                 // ensures we do something when excess resource are requested
-                .setAllocateResourceFunction(ignored -> true);
+                .setAllocateResourceConsumer(ignored -> {});
     }
 
     private static InstanceID createAndRegisterTaskExecutor(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java
index 83207ce41cd..c375fba4d83 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java
@@ -18,26 +18,32 @@
 
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
 
 import javax.annotation.Nonnull;
 
 import java.util.function.BiConsumer;
-import java.util.function.Function;
+import java.util.function.Consumer;
 
 /** Testing implementation of the {@link ResourceAllocator}. */
 public class TestingResourceAllocator implements ResourceAllocator {
 
     @Nonnull private final BiConsumer<InstanceID, Exception> releaseResourceConsumer;
 
-    @Nonnull private final Function<WorkerResourceSpec, Boolean> allocateResourceFunction;
+    @Nonnull private final Consumer<WorkerResourceSpec> allocateResourceConsumer;
 
     public TestingResourceAllocator(
             @Nonnull BiConsumer<InstanceID, Exception> releaseResourceConsumer,
-            @Nonnull Function<WorkerResourceSpec, Boolean> allocateResourceFunction) {
+            @Nonnull Consumer<WorkerResourceSpec> allocateResourceConsumer) {
         this.releaseResourceConsumer = releaseResourceConsumer;
-        this.allocateResourceFunction = allocateResourceFunction;
+        this.allocateResourceConsumer = allocateResourceConsumer;
+    }
+
+    @Override
+    public boolean isSupported() {
+        return true;
     }
 
     @Override
@@ -46,7 +52,12 @@ public class TestingResourceAllocator implements ResourceAllocator {
     }
 
     @Override
-    public boolean allocateResource(WorkerResourceSpec workerResourceSpec) {
-        return allocateResourceFunction.apply(workerResourceSpec);
+    public void releaseResource(ResourceID resourceID) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void allocateResource(WorkerResourceSpec workerResourceSpec) {
+        allocateResourceConsumer.accept(workerResourceSpec);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocatorBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocatorBuilder.java
index 8dc5abd2a81..795da0c2d69 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocatorBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocatorBuilder.java
@@ -23,12 +23,11 @@ import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
 
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
-import java.util.function.Function;
 
 /** Builder for the {@link TestingResourceAllocator}. */
 public class TestingResourceAllocatorBuilder {
     private BiConsumer<InstanceID, Exception> releaseResourceConsumer = (ignoredA, ignoredB) -> {};
-    private Function<WorkerResourceSpec, Boolean> allocateResourceFunction = (ignored) -> true;
+    private Consumer<WorkerResourceSpec> allocateResourceConsumer = (ignored) -> {};
 
     public TestingResourceAllocatorBuilder setReleaseResourceConsumer(
             BiConsumer<InstanceID, Exception> releaseResourceConsumer) {
@@ -36,23 +35,13 @@ public class TestingResourceAllocatorBuilder {
         return this;
     }
 
-    public TestingResourceAllocatorBuilder setAllocateResourceFunction(
-            Function<WorkerResourceSpec, Boolean> allocateResourceFunction) {
-        this.allocateResourceFunction = allocateResourceFunction;
-        return this;
-    }
-
     public TestingResourceAllocatorBuilder setAllocateResourceConsumer(
             Consumer<WorkerResourceSpec> allocateResourceConsumer) {
-        this.allocateResourceFunction =
-                workerRequest -> {
-                    allocateResourceConsumer.accept(workerRequest);
-                    return true;
-                };
+        this.allocateResourceConsumer = allocateResourceConsumer;
         return this;
     }
 
     public TestingResourceAllocator build() {
-        return new TestingResourceAllocator(releaseResourceConsumer, allocateResourceFunction);
+        return new TestingResourceAllocator(releaseResourceConsumer, allocateResourceConsumer);
     }
 }