You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/12/06 11:23:46 UTC
[flink] 02/02: [FLINK-29870][runtime] move ResourceAllocator to ActiveResourceManager.
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 209df810f13e2fcbe5a4ca8bb015a8a5f662adc4
Author: Weihua Hu <hu...@gmail.com>
AuthorDate: Thu Nov 3 21:13:36 2022 +0800
[FLINK-29870][runtime] move ResourceAllocator to ActiveResourceManager.
This closes #21233
---
.../runtime/resourcemanager/ResourceManager.java | 75 ++++++++--------------
.../resourcemanager/StandaloneResourceManager.java | 23 +++----
.../active/ActiveResourceManager.java | 70 +++++++++++++++-----
.../slotmanager/FineGrainedSlotManager.java | 25 +++++---
.../NonSupportedResourceAllocatorImpl.java} | 34 +++++-----
.../slotmanager/ResourceAllocator.java | 14 +++-
.../slotmanager/TaskExecutorManager.java | 27 ++++----
.../resourcemanager/ResourceManagerTest.java | 31 +++------
.../resourcemanager/TestingResourceManager.java | 27 ++++----
.../TestingResourceManagerFactory.java | 20 +++---
.../active/ActiveResourceManagerTest.java | 71 ++++++++++----------
.../slotmanager/DeclarativeSlotManagerTest.java | 18 ++----
...gerDefaultResourceAllocationStrategyITCase.java | 7 +-
.../slotmanager/TaskExecutorManagerTest.java | 14 +---
.../slotmanager/TestingResourceAllocator.java | 23 +++++--
.../TestingResourceAllocatorBuilder.java | 17 +----
16 files changed, 246 insertions(+), 250 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 3abd6b6b68d..5d38143974d 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -169,6 +169,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
private final AtomicReference<byte[]> latestTokens = new AtomicReference<>();
+ private final ResourceAllocator resourceAllocator;
+
public ResourceManager(
RpcService rpcService,
UUID leaderSessionId,
@@ -234,6 +236,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
this.startedFuture = new CompletableFuture<>();
this.delegationTokenManager = delegationTokenManager;
+
+ this.resourceAllocator = getResourceAllocator();
}
// ------------------------------------------------------------------------
@@ -267,7 +271,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
slotManager.start(
getFencingToken(),
getMainThreadExecutor(),
- new ResourceAllocatorImpl(),
+ resourceAllocator,
new ResourceEventListenerImpl(),
blocklistHandler::isBlockedTaskManager);
@@ -533,7 +537,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
@Override
public void disconnectTaskManager(final ResourceID resourceId, final Exception cause) {
- closeTaskManagerConnection(resourceId, cause).ifPresent(ResourceManager.this::stopWorker);
+ closeTaskManagerConnection(resourceId, cause)
+ .ifPresent(ResourceManager.this::stopWorkerIfSupported);
}
@Override
@@ -984,10 +989,11 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
taskExecutorResourceId.getStringWithMetadata())));
}
- final WorkerType newWorker = workerStarted(taskExecutorResourceId);
+ final Optional<WorkerType> newWorkerOptional =
+ getWorkerNodeIfAcceptRegistration(taskExecutorResourceId);
String taskExecutorAddress = taskExecutorRegistration.getTaskExecutorAddress();
- if (newWorker == null) {
+ if (!newWorkerOptional.isPresent()) {
log.warn(
"Discard registration from TaskExecutor {} at ({}) because the framework did "
+ "not recognize it",
@@ -996,6 +1002,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
return new TaskExecutorRegistrationRejection(
"The ResourceManager does not recognize this TaskExecutor.");
} else {
+ WorkerType newWorker = newWorkerOptional.get();
WorkerRegistration<WorkerType> registration =
new WorkerRegistration<>(
taskExecutorGateway,
@@ -1159,9 +1166,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
}
}
- protected void releaseResource(InstanceID instanceId, Exception cause) {
+ protected WorkerType getWorkerByInstanceId(InstanceID instanceId) {
WorkerType worker = null;
-
// TODO: Improve performance by having an index on the instanceId
for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> entry :
taskExecutors.entrySet()) {
@@ -1171,18 +1177,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
}
}
- if (worker != null) {
- if (stopWorker(worker)) {
- closeTaskManagerConnection(worker.getResourceID(), cause);
- } else {
- log.debug(
- "Worker {} could not be stopped.",
- worker.getResourceID().getStringWithMetadata());
- }
- } else {
- // unregister in order to clean up potential left over state
- slotManager.unregisterTaskManager(instanceId, cause);
- }
+ return worker;
}
private enum ResourceRequirementHandling {
@@ -1265,29 +1260,23 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
throws ResourceManagerException;
/**
- * Allocates a resource using the worker resource specification.
- *
- * @param workerResourceSpec workerResourceSpec specifies the size of the to be allocated
- * resource
- * @return whether the resource can be allocated
- */
- @VisibleForTesting
- public abstract boolean startNewWorker(WorkerResourceSpec workerResourceSpec);
-
- /**
- * Callback when a worker was started.
+ * Get worker node if the worker resource is accepted.
*
* @param resourceID The worker resource id
*/
- protected abstract WorkerType workerStarted(ResourceID resourceID);
+ protected abstract Optional<WorkerType> getWorkerNodeIfAcceptRegistration(
+ ResourceID resourceID);
/**
- * Stops the given worker.
+ * Stops the given worker if supported.
*
* @param worker The worker.
- * @return True if the worker was stopped, otherwise false
*/
- public abstract boolean stopWorker(WorkerType worker);
+ public void stopWorkerIfSupported(WorkerType worker) {
+ if (resourceAllocator.isSupported()) {
+ resourceAllocator.releaseResource(worker.getResourceID());
+ }
+ }
/**
* Get the ready to serve future of the resource manager.
@@ -1297,6 +1286,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
*/
protected abstract CompletableFuture<Void> getReadyToServeFuture();
+ protected abstract ResourceAllocator getResourceAllocator();
+
/**
* Set {@link SlotManager} whether to fail unfulfillable slot requests.
*
@@ -1336,22 +1327,6 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
}
}
- private class ResourceAllocatorImpl implements ResourceAllocator {
-
- @Override
- public void releaseResource(InstanceID instanceId, Exception cause) {
- validateRunsInMainThread();
-
- ResourceManager.this.releaseResource(instanceId, cause);
- }
-
- @Override
- public boolean allocateResource(WorkerResourceSpec workerResourceSpec) {
- validateRunsInMainThread();
- return startNewWorker(workerResourceSpec);
- }
- }
-
private class ResourceEventListenerImpl implements ResourceEventListener {
@Override
public void notEnoughResourceAvailable(
@@ -1419,7 +1394,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
private void handleTaskManagerConnectionLoss(ResourceID resourceID, Exception cause) {
validateRunsInMainThread();
closeTaskManagerConnection(resourceID, cause)
- .ifPresent(ResourceManager.this::stopWorker);
+ .ifPresent(ResourceManager.this::stopWorkerIfSupported);
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index f41f5bb4d79..e7d431702dd 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -27,6 +27,8 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerFactory;
import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.NonSupportedResourceAllocatorImpl;
+import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocator;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
@@ -35,6 +37,7 @@ import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
+import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
@@ -100,19 +103,8 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
ApplicationStatus finalStatus, @Nullable String diagnostics) {}
@Override
- public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) {
- return false;
- }
-
- @Override
- public boolean stopWorker(ResourceID resourceID) {
- // standalone resource manager cannot stop workers
- return false;
- }
-
- @Override
- protected ResourceID workerStarted(ResourceID resourceID) {
- return resourceID;
+ protected Optional<ResourceID> getWorkerNodeIfAcceptRegistration(ResourceID resourceID) {
+ return Optional.of(resourceID);
}
private void startStartupPeriod() {
@@ -132,4 +124,9 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
public CompletableFuture<Void> getReadyToServeFuture() {
return CompletableFuture.completedFuture(null);
}
+
+ @Override
+ protected ResourceAllocator getResourceAllocator() {
+ return NonSupportedResourceAllocatorImpl.INSTANCE;
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
index e5cb5315d63..61e2ea234ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerFactory;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.ThresholdMeter;
@@ -38,6 +39,7 @@ import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocator;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
@@ -54,6 +56,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
@@ -199,20 +202,8 @@ public class ActiveResourceManager<WorkerType extends ResourceIDRetrievable>
}
@Override
- public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) {
- requestNewWorker(workerResourceSpec);
- return true;
- }
-
- @Override
- protected WorkerType workerStarted(ResourceID resourceID) {
- return workerNodeMap.get(resourceID);
- }
-
- @Override
- public boolean stopWorker(WorkerType worker) {
- internalStopWorker(worker.getResourceID());
- return true;
+ protected Optional<WorkerType> getWorkerNodeIfAcceptRegistration(ResourceID resourceID) {
+ return Optional.ofNullable(workerNodeMap.get(resourceID));
}
@Override
@@ -299,7 +290,24 @@ public class ActiveResourceManager<WorkerType extends ResourceIDRetrievable>
// Internal
// ------------------------------------------------------------------------
- private void requestNewWorker(WorkerResourceSpec workerResourceSpec) {
+ private void releaseResource(InstanceID instanceId, Exception cause) {
+ WorkerType worker = getWorkerByInstanceId(instanceId);
+ if (worker != null) {
+ internalStopWorker(worker.getResourceID());
+ closeTaskManagerConnection(worker.getResourceID(), cause);
+ } else {
+ log.debug("Instance {} not found in ResourceManager.", instanceId);
+ }
+ }
+
+ /**
+ * Allocates a resource using the worker resource specification.
+ *
+ * @param workerResourceSpec workerResourceSpec specifies the size of the to be allocated
+ * resource
+ */
+ @VisibleForTesting
+ public void requestNewWorker(WorkerResourceSpec workerResourceSpec) {
final TaskExecutorProcessSpec taskExecutorProcessSpec =
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(
flinkConfig, workerResourceSpec);
@@ -451,6 +459,11 @@ public class ActiveResourceManager<WorkerType extends ResourceIDRetrievable>
return readyToServeFuture;
}
+ @Override
+ protected ResourceAllocator getResourceAllocator() {
+ return new ResourceAllocatorImpl();
+ }
+
private void tryRemovePreviousPendingRecoveryTaskManager(ResourceID resourceID) {
long sizeBeforeRemove = previousAttemptUnregisteredWorkers.size();
if (previousAttemptUnregisteredWorkers.remove(resourceID)) {
@@ -499,6 +512,33 @@ public class ActiveResourceManager<WorkerType extends ResourceIDRetrievable>
}
}
+ private class ResourceAllocatorImpl implements ResourceAllocator {
+
+ @Override
+ public boolean isSupported() {
+ return true;
+ }
+
+ @Override
+ public void releaseResource(InstanceID instanceId, Exception cause) {
+ validateRunsInMainThread();
+
+ ActiveResourceManager.this.releaseResource(instanceId, cause);
+ }
+
+ @Override
+ public void releaseResource(ResourceID resourceID) {
+ validateRunsInMainThread();
+ internalStopWorker(resourceID);
+ }
+
+ @Override
+ public void allocateResource(WorkerResourceSpec workerResourceSpec) {
+ validateRunsInMainThread();
+ requestNewWorker(workerResourceSpec);
+ }
+ }
+
// ------------------------------------------------------------------------
// Testing
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
index c999d9883de..125679cedef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
@@ -346,8 +346,10 @@ public class FineGrainedSlotManager implements SlotManager {
: findMatchingPendingTaskManager(
totalResourceProfile, defaultSlotResourceProfile);
- if (!matchedPendingTaskManagerOptional.isPresent()
+ if (resourceAllocator.isSupported()
+ && !matchedPendingTaskManagerOptional.isPresent()
&& isMaxTotalResourceExceededAfterAdding(totalResourceProfile)) {
+
LOG.info(
"Releasing task manager {}. The max total resource limitation <{}, {}> is reached.",
taskExecutorConnection.getResourceID(),
@@ -357,6 +359,7 @@ public class FineGrainedSlotManager implements SlotManager {
taskExecutorConnection.getInstanceID(),
new FlinkExpectedException(
"The max total resource limitation is reached."));
+
return false;
}
@@ -735,12 +738,19 @@ public class FineGrainedSlotManager implements SlotManager {
}
private void releaseIdleTaskExecutor(InstanceID timedOutTaskManagerId) {
- final FlinkExpectedException cause =
- new FlinkExpectedException("TaskManager exceeded the idle timeout.");
- resourceAllocator.releaseResource(timedOutTaskManagerId, cause);
+ if (resourceAllocator.isSupported()) {
+ final FlinkExpectedException cause =
+ new FlinkExpectedException("TaskManager exceeded the idle timeout.");
+ resourceAllocator.releaseResource(timedOutTaskManagerId, cause);
+ }
}
private boolean allocateResource(PendingTaskManager pendingTaskManager) {
+ if (!resourceAllocator.isSupported()) {
+ // resource cannot be allocated
+ return false;
+ }
+
if (isMaxTotalResourceExceededAfterAdding(pendingTaskManager.getTotalResourceProfile())) {
LOG.info(
"Could not allocate {}. Max total resource limitation <{}, {}> is reached.",
@@ -750,13 +760,10 @@ public class FineGrainedSlotManager implements SlotManager {
return false;
}
- if (!resourceAllocator.allocateResource(
+ resourceAllocator.allocateResource(
WorkerResourceSpec.fromTotalResourceProfile(
pendingTaskManager.getTotalResourceProfile(),
- pendingTaskManager.getNumSlots()))) {
- // resource cannot be allocated
- return false;
- }
+ pendingTaskManager.getNumSlots()));
taskManagerTracker.addPendingTaskManager(pendingTaskManager);
return true;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/NonSupportedResourceAllocatorImpl.java
similarity index 53%
copy from flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/NonSupportedResourceAllocatorImpl.java
index 83207ce41cd..52c22ec4ad8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/NonSupportedResourceAllocatorImpl.java
@@ -18,35 +18,33 @@
package org.apache.flink.runtime.resourcemanager.slotmanager;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
-import javax.annotation.Nonnull;
+/** ResourceAllocator that not support to allocate/release resources. */
+public class NonSupportedResourceAllocatorImpl implements ResourceAllocator {
+ public static final NonSupportedResourceAllocatorImpl INSTANCE =
+ new NonSupportedResourceAllocatorImpl();
-import java.util.function.BiConsumer;
-import java.util.function.Function;
+ private NonSupportedResourceAllocatorImpl() {}
-/** Testing implementation of the {@link ResourceAllocator}. */
-public class TestingResourceAllocator implements ResourceAllocator {
-
- @Nonnull private final BiConsumer<InstanceID, Exception> releaseResourceConsumer;
-
- @Nonnull private final Function<WorkerResourceSpec, Boolean> allocateResourceFunction;
-
- public TestingResourceAllocator(
- @Nonnull BiConsumer<InstanceID, Exception> releaseResourceConsumer,
- @Nonnull Function<WorkerResourceSpec, Boolean> allocateResourceFunction) {
- this.releaseResourceConsumer = releaseResourceConsumer;
- this.allocateResourceFunction = allocateResourceFunction;
+ @Override
+ public boolean isSupported() {
+ return false;
}
@Override
public void releaseResource(InstanceID instanceId, Exception cause) {
- releaseResourceConsumer.accept(instanceId, cause);
+ throw new UnsupportedOperationException();
}
@Override
- public boolean allocateResource(WorkerResourceSpec workerResourceSpec) {
- return allocateResourceFunction.apply(workerResourceSpec);
+ public void releaseResource(ResourceID resourceID) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void allocateResource(WorkerResourceSpec workerResourceSpec) {
+ throw new UnsupportedOperationException();
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocator.java
index 179a2ad5ae1..071242f419f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocator.java
@@ -18,12 +18,16 @@
package org.apache.flink.runtime.resourcemanager.slotmanager;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
/** Resource related actions which the {@link SlotManager} can perform. */
public interface ResourceAllocator {
+ /** Whether allocate/release resources are supported. */
+ boolean isSupported();
+
/**
* Releases the resource with the given instance id.
*
@@ -32,11 +36,17 @@ public interface ResourceAllocator {
*/
void releaseResource(InstanceID instanceId, Exception cause);
+ /**
+ * Releases the resource with the given resource id.
+ *
+ * @param resourceID identifying which resource to release
+ */
+ void releaseResource(ResourceID resourceID);
+
/**
* Requests to allocate a resource with the given {@link WorkerResourceSpec}.
*
* @param workerResourceSpec for the to be allocated worker
- * @return whether the resource can be allocated
*/
- boolean allocateResource(WorkerResourceSpec workerResourceSpec);
+ void allocateResource(WorkerResourceSpec workerResourceSpec);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
index d1515a45eea..6755342938b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
@@ -153,7 +153,8 @@ class TaskExecutorManager implements AutoCloseable {
SlotReport initialSlotReport,
ResourceProfile totalResourceProfile,
ResourceProfile defaultSlotResourceProfile) {
- if (isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {
+ if (resourceAllocator.isSupported()
+ && isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {
LOG.info(
"The total number of slots exceeds the max limitation {}, releasing the excess task executor.",
maxSlotNum);
@@ -254,6 +255,11 @@ class TaskExecutorManager implements AutoCloseable {
*/
public Optional<ResourceRequirement> allocateWorker(
ResourceProfile requestedSlotResourceProfile) {
+ if (!resourceAllocator.isSupported()) {
+ // resource cannot be allocated
+ return Optional.empty();
+ }
+
final int numRegisteredSlots = getNumberRegisteredSlots();
final int numPendingSlots = getNumberPendingTaskManagerSlots();
if (isMaxSlotNumExceededAfterAdding(numSlotsPerWorker)) {
@@ -270,10 +276,7 @@ class TaskExecutorManager implements AutoCloseable {
return Optional.empty();
}
- if (!resourceAllocator.allocateResource(defaultWorkerResourceSpec)) {
- // resource cannot be allocated
- return Optional.empty();
- }
+ resourceAllocator.allocateResource(defaultWorkerResourceSpec);
for (int i = 0; i < numSlotsPerWorker; ++i) {
PendingTaskManagerSlot pendingTaskManagerSlot =
@@ -401,12 +404,14 @@ class TaskExecutorManager implements AutoCloseable {
}
private void releaseIdleTaskExecutor(InstanceID timedOutTaskManagerId) {
- final FlinkExpectedException cause =
- new FlinkExpectedException("TaskExecutor exceeded the idle timeout.");
- LOG.debug(
- "Release TaskExecutor {} because it exceeded the idle timeout.",
- timedOutTaskManagerId);
- resourceAllocator.releaseResource(timedOutTaskManagerId, cause);
+ if (resourceAllocator.isSupported()) {
+ final FlinkExpectedException cause =
+ new FlinkExpectedException("TaskExecutor exceeded the idle timeout.");
+ LOG.debug(
+ "Release TaskExecutor {} because it exceeded the idle timeout.",
+ timedOutTaskManagerId);
+ resourceAllocator.releaseResource(timedOutTaskManagerId, cause);
+ }
}
// ---------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index 05087e8b148..a4b1131ff15 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -84,7 +84,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
-import java.util.function.Function;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -491,12 +490,7 @@ class ResourceManagerTest {
rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway);
runHeartbeatTimeoutTest(
- builder ->
- builder.withStopWorkerFunction(
- (worker) -> {
- stopWorkerFuture.complete(worker);
- return true;
- }),
+ builder -> builder.withStopWorkerConsumer(stopWorkerFuture::complete),
resourceManagerGateway ->
registerTaskExecutor(
resourceManagerGateway,
@@ -538,12 +532,7 @@ class ResourceManagerTest {
rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway);
runHeartbeatTargetBecomesUnreachableTest(
- builder ->
- builder.withStopWorkerFunction(
- (worker) -> {
- stopWorkerFuture.complete(worker);
- return true;
- }),
+ builder -> builder.withStopWorkerConsumer(stopWorkerFuture::complete),
resourceManagerGateway ->
registerTaskExecutor(
resourceManagerGateway,
@@ -579,7 +568,7 @@ class ResourceManagerTest {
resourceManager =
new ResourceManagerBuilder()
- .withStopWorkerFunction(stopWorkerFuture::complete)
+ .withStopWorkerConsumer(stopWorkerFuture::complete)
.buildAndStart();
registerTaskExecutor(resourceManager, taskExecutorId, taskExecutorGateway.getAddress());
@@ -884,7 +873,7 @@ class ResourceManagerTest {
private SlotManager slotManager = null;
private BlocklistHandler.Factory blocklistHandlerFactory =
new NoOpBlocklistHandler.Factory();
- private Function<ResourceID, Boolean> stopWorkerFunction = null;
+ private Consumer<ResourceID> stopWorkerConsumer = null;
private CompletableFuture<Void> readyToServeFuture =
CompletableFuture.completedFuture(null);
@@ -910,9 +899,9 @@ class ResourceManagerTest {
return this;
}
- private ResourceManagerBuilder withStopWorkerFunction(
- Function<ResourceID, Boolean> stopWorkerFunction) {
- this.stopWorkerFunction = stopWorkerFunction;
+ private ResourceManagerBuilder withStopWorkerConsumer(
+ Consumer<ResourceID> stopWorkerConsumer) {
+ this.stopWorkerConsumer = stopWorkerConsumer;
return this;
}
@@ -941,8 +930,8 @@ class ResourceManagerTest {
.build();
}
- if (stopWorkerFunction == null) {
- stopWorkerFunction = (ignore) -> false;
+ if (stopWorkerConsumer == null) {
+ stopWorkerConsumer = (ignore) -> {};
}
resourceManagerId = ResourceManagerId.generate();
@@ -959,7 +948,7 @@ class ResourceManagerTest {
jobLeaderIdService,
testingFatalErrorHandler,
UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup(),
- stopWorkerFunction,
+ stopWorkerConsumer,
readyToServeFuture);
resourceManager.start();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
index f73d4df3c0a..4289de57636 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
@@ -27,6 +27,8 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerFactory;
import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.NonSupportedResourceAllocatorImpl;
+import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocator;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
@@ -36,16 +38,17 @@ import org.apache.flink.util.TimeUtils;
import javax.annotation.Nullable;
+import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
-import java.util.function.Function;
+import java.util.function.Consumer;
/** Simple {@link ResourceManager} implementation for testing purposes. */
public class TestingResourceManager extends ResourceManager<ResourceID> {
- private final Function<ResourceID, Boolean> stopWorkerFunction;
+ private final Consumer<ResourceID> stopWorkerConsumer;
private final CompletableFuture<Void> readyToServeFuture;
public TestingResourceManager(
@@ -60,7 +63,7 @@ public class TestingResourceManager extends ResourceManager<ResourceID> {
JobLeaderIdService jobLeaderIdService,
FatalErrorHandler fatalErrorHandler,
ResourceManagerMetricGroup resourceManagerMetricGroup,
- Function<ResourceID, Boolean> stopWorkerFunction,
+ Consumer<ResourceID> stopWorkerConsumer,
CompletableFuture<Void> readyToServeFuture) {
super(
rpcService,
@@ -78,7 +81,7 @@ public class TestingResourceManager extends ResourceManager<ResourceID> {
RpcUtils.INF_TIMEOUT,
ForkJoinPool.commonPool());
- this.stopWorkerFunction = stopWorkerFunction;
+ this.stopWorkerConsumer = stopWorkerConsumer;
this.readyToServeFuture = readyToServeFuture;
}
@@ -100,23 +103,23 @@ public class TestingResourceManager extends ResourceManager<ResourceID> {
}
@Override
- public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) {
- return false;
+ protected Optional<ResourceID> getWorkerNodeIfAcceptRegistration(ResourceID resourceID) {
+ return Optional.of(resourceID);
}
@Override
- protected ResourceID workerStarted(ResourceID resourceID) {
- return resourceID;
+ public void stopWorkerIfSupported(ResourceID worker) {
+ stopWorkerConsumer.accept(worker);
}
@Override
- public boolean stopWorker(ResourceID worker) {
- return stopWorkerFunction.apply(worker);
+ public CompletableFuture<Void> getReadyToServeFuture() {
+ return readyToServeFuture;
}
@Override
- public CompletableFuture<Void> getReadyToServeFuture() {
- return readyToServeFuture;
+ protected ResourceAllocator getResourceAllocator() {
+ return NonSupportedResourceAllocatorImpl.INSTANCE;
}
public <T> CompletableFuture<T> runInMainThread(Callable<T> callable, Time timeout) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
index 181bc8c9618..67bc11a4266 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
@@ -30,6 +30,8 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerFactory;
import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerImpl;
import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.slotmanager.NonSupportedResourceAllocatorImpl;
+import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocator;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
@@ -39,6 +41,7 @@ import org.apache.flink.util.function.TriConsumer;
import javax.annotation.Nullable;
+import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
@@ -219,17 +222,7 @@ public class TestingResourceManagerFactory extends ResourceManagerFactory<Resour
}
@Override
- public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected ResourceID workerStarted(ResourceID resourceID) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean stopWorker(ResourceID worker) {
+ protected Optional<ResourceID> getWorkerNodeIfAcceptRegistration(ResourceID resourceID) {
throw new UnsupportedOperationException();
}
@@ -243,5 +236,10 @@ public class TestingResourceManagerFactory extends ResourceManagerFactory<Resour
public CompletableFuture<Void> getReadyToServeFuture() {
return CompletableFuture.completedFuture(null);
}
+
+ @Override
+ protected ResourceAllocator getResourceAllocator() {
+ return NonSupportedResourceAllocatorImpl.INSTANCE;
+ }
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
index 764147d405b..b685d57f7c4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
@@ -107,18 +107,17 @@ public class ActiveResourceManagerTest extends TestLogger {
runTest(
() -> {
// received worker request, verify requesting from driver
- CompletableFuture<Boolean> startNewWorkerFuture =
+ CompletableFuture<Void> startNewWorkerFuture =
runInMainThread(
() ->
getResourceManager()
- .startNewWorker(WORKER_RESOURCE_SPEC));
+ .requestNewWorker(
+ WORKER_RESOURCE_SPEC));
TaskExecutorProcessSpec taskExecutorProcessSpec =
requestWorkerFromDriverFuture.get(
TIMEOUT_SEC, TimeUnit.SECONDS);
- assertThat(
- startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
- is(true));
+ startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
assertThat(
taskExecutorProcessSpec,
is(
@@ -171,19 +170,18 @@ public class ActiveResourceManagerTest extends TestLogger {
runTest(
() -> {
// received worker request, verify requesting from driver
- CompletableFuture<Boolean> startNewWorkerFuture =
+ CompletableFuture<Void> startNewWorkerFuture =
runInMainThread(
() ->
getResourceManager()
- .startNewWorker(WORKER_RESOURCE_SPEC));
+ .requestNewWorker(
+ WORKER_RESOURCE_SPEC));
TaskExecutorProcessSpec taskExecutorProcessSpec1 =
requestWorkerFromDriverFutures
.get(0)
.get(TIMEOUT_SEC, TimeUnit.SECONDS);
- assertThat(
- startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
- is(true));
+ startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
assertThat(
taskExecutorProcessSpec1,
is(
@@ -250,19 +248,18 @@ public class ActiveResourceManagerTest extends TestLogger {
runTest(
() -> {
// received worker request, verify requesting from driver
- CompletableFuture<Boolean> startNewWorkerFuture =
+ CompletableFuture<Void> startNewWorkerFuture =
runInMainThread(
() ->
getResourceManager()
- .startNewWorker(WORKER_RESOURCE_SPEC));
+ .requestNewWorker(
+ WORKER_RESOURCE_SPEC));
TaskExecutorProcessSpec taskExecutorProcessSpec1 =
requestWorkerFromDriverFutures
.get(0)
.get(TIMEOUT_SEC, TimeUnit.SECONDS);
- assertThat(
- startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
- is(true));
+ startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
assertThat(
taskExecutorProcessSpec1,
is(
@@ -329,19 +326,18 @@ public class ActiveResourceManagerTest extends TestLogger {
runTest(
() -> {
// received worker request, verify requesting from driver
- CompletableFuture<Boolean> startNewWorkerFuture =
+ CompletableFuture<Void> startNewWorkerFuture =
runInMainThread(
() ->
getResourceManager()
- .startNewWorker(WORKER_RESOURCE_SPEC));
+ .requestNewWorker(
+ WORKER_RESOURCE_SPEC));
TaskExecutorProcessSpec taskExecutorProcessSpec1 =
requestWorkerFromDriverFutures
.get(0)
.get(TIMEOUT_SEC, TimeUnit.SECONDS);
- assertThat(
- startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
- is(true));
+ startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
assertThat(
taskExecutorProcessSpec1,
is(
@@ -408,19 +404,18 @@ public class ActiveResourceManagerTest extends TestLogger {
runTest(
() -> {
// received worker request, verify requesting from driver
- CompletableFuture<Boolean> startNewWorkerFuture =
+ CompletableFuture<Void> startNewWorkerFuture =
runInMainThread(
() ->
getResourceManager()
- .startNewWorker(WORKER_RESOURCE_SPEC));
+ .requestNewWorker(
+ WORKER_RESOURCE_SPEC));
TaskExecutorProcessSpec taskExecutorProcessSpec =
requestWorkerFromDriverFutures
.get(0)
.get(TIMEOUT_SEC, TimeUnit.SECONDS);
- assertThat(
- startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
- is(true));
+ startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
assertThat(
taskExecutorProcessSpec,
is(
@@ -482,7 +477,7 @@ public class ActiveResourceManagerTest extends TestLogger {
runInMainThread(
() ->
getResourceManager()
- .startNewWorker(WORKER_RESOURCE_SPEC))
+ .requestNewWorker(WORKER_RESOURCE_SPEC))
.thenCompose(
(ignore) ->
registerTaskExecutor(
@@ -539,18 +534,18 @@ public class ActiveResourceManagerTest extends TestLogger {
runTest(
() -> {
// received worker request, verify requesting from driver
- CompletableFuture<Boolean> startNewWorkerFuture =
+ CompletableFuture<Void> startNewWorkerFuture =
runInMainThread(
() ->
getResourceManager()
- .startNewWorker(WORKER_RESOURCE_SPEC));
+ .requestNewWorker(
+ WORKER_RESOURCE_SPEC));
long t1 =
requestWorkerFromDriverFutures
.get(0)
.get(TIMEOUT_SEC, TimeUnit.SECONDS);
- assertThat(
- startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
- is(true));
+
+ startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
// first worker failed before register, verify requesting another worker
// from driver
@@ -619,14 +614,14 @@ public class ActiveResourceManagerTest extends TestLogger {
runTest(
() -> {
// received worker request, verify requesting from driver
- CompletableFuture<Boolean> startNewWorkerFuture =
+ CompletableFuture<Void> startNewWorkerFuture =
runInMainThread(
() ->
getResourceManager()
- .startNewWorker(WORKER_RESOURCE_SPEC));
- assertThat(
- startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
- is(true));
+ .requestNewWorker(
+ WORKER_RESOURCE_SPEC));
+
+ startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
long t1 =
requestWorkerFromDriverFutures
@@ -745,7 +740,7 @@ public class ActiveResourceManagerTest extends TestLogger {
runInMainThread(
() ->
getResourceManager()
- .startNewWorker(WORKER_RESOURCE_SPEC));
+ .requestNewWorker(WORKER_RESOURCE_SPEC));
// verify worker is released due to not registered in time
assertThat(
@@ -781,7 +776,7 @@ public class ActiveResourceManagerTest extends TestLogger {
runInMainThread(
() ->
getResourceManager()
- .startNewWorker(WORKER_RESOURCE_SPEC));
+ .requestNewWorker(WORKER_RESOURCE_SPEC));
// resource allocation takes longer than worker registration timeout
try {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
index 684c4145cea..83a55c69857 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
@@ -258,12 +258,8 @@ class DeclarativeSlotManagerTest {
void testRequirementDeclarationWithResourceAllocationFailure() throws Exception {
final ResourceRequirements resourceRequirements = createResourceRequirementsForSingleSlot();
- ResourceAllocator resourceAllocator =
- new TestingResourceAllocatorBuilder()
- .setAllocateResourceFunction(value -> false)
- .build();
-
final ResourceTracker resourceTracker = new DefaultResourceTracker();
+ final ResourceAllocator resourceAllocator = NonSupportedResourceAllocatorImpl.INSTANCE;
try (DeclarativeSlotManager slotManager =
createDeclarativeSlotManagerBuilder()
@@ -1076,11 +1072,7 @@ class DeclarativeSlotManagerTest {
final AtomicInteger resourceRequests = new AtomicInteger(0);
final TestingResourceAllocator testingResourceAllocator =
new TestingResourceAllocatorBuilder()
- .setAllocateResourceFunction(
- ignored -> {
- resourceRequests.incrementAndGet();
- return true;
- })
+ .setAllocateResourceConsumer(ignored -> resourceRequests.incrementAndGet())
.build();
try (final DeclarativeSlotManager slotManager =
@@ -1191,10 +1183,6 @@ class DeclarativeSlotManagerTest {
List<Tuple2<JobID, Collection<ResourceRequirement>>> notEnoughResourceNotifications =
new ArrayList<>();
- ResourceAllocator resourceAllocator =
- new TestingResourceAllocatorBuilder()
- .setAllocateResourceFunction(ignored -> false)
- .build();
ResourceEventListener resourceEventListener =
new TestingResourceEventListenerBuilder()
@@ -1204,6 +1192,8 @@ class DeclarativeSlotManagerTest {
Tuple2.of(jobId1, acquiredResources)))
.build();
+ ResourceAllocator resourceAllocator = NonSupportedResourceAllocatorImpl.INSTANCE;
+
try (DeclarativeSlotManager slotManager =
createDeclarativeSlotManagerBuilder()
.buildAndStart(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
index 73e0355bd31..4d208f5290a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
@@ -52,11 +52,8 @@ class FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase
new Context() {
{
- resourceAllocatorBuilder.setAllocateResourceFunction(
- ignored -> {
- resourceRequests.incrementAndGet();
- return true;
- });
+ resourceAllocatorBuilder.setAllocateResourceConsumer(
+ ignored -> resourceRequests.incrementAndGet());
runTest(
() -> {
runInMainThread(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
index ed01b63b4c7..848647b12dc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
@@ -250,11 +250,7 @@ public class TaskExecutorManagerTest extends TestLogger {
final AtomicInteger resourceRequests = new AtomicInteger(0);
ResourceAllocator resourceAllocator =
createResourceAllocatorBuilder()
- .setAllocateResourceFunction(
- ignored -> {
- resourceRequests.incrementAndGet();
- return true;
- })
+ .setAllocateResourceConsumer(ignored -> resourceRequests.incrementAndGet())
.build();
try (final TaskExecutorManager taskExecutorManager =
@@ -283,11 +279,7 @@ public class TaskExecutorManagerTest extends TestLogger {
final AtomicInteger resourceRequests = new AtomicInteger(0);
ResourceAllocator resourceAllocator =
createResourceAllocatorBuilder()
- .setAllocateResourceFunction(
- ignored -> {
- resourceRequests.incrementAndGet();
- return true;
- })
+ .setAllocateResourceConsumer(ignored -> resourceRequests.incrementAndGet())
.build();
try (final TaskExecutorManager taskExecutorManager =
@@ -382,7 +374,7 @@ public class TaskExecutorManagerTest extends TestLogger {
private static TestingResourceAllocatorBuilder createResourceAllocatorBuilder() {
return new TestingResourceAllocatorBuilder()
// ensures we do something when excess resource are requested
- .setAllocateResourceFunction(ignored -> true);
+ .setAllocateResourceConsumer(ignored -> {});
}
private static InstanceID createAndRegisterTaskExecutor(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java
index 83207ce41cd..c375fba4d83 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java
@@ -18,26 +18,32 @@
package org.apache.flink.runtime.resourcemanager.slotmanager;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import javax.annotation.Nonnull;
import java.util.function.BiConsumer;
-import java.util.function.Function;
+import java.util.function.Consumer;
/** Testing implementation of the {@link ResourceAllocator}. */
public class TestingResourceAllocator implements ResourceAllocator {
@Nonnull private final BiConsumer<InstanceID, Exception> releaseResourceConsumer;
- @Nonnull private final Function<WorkerResourceSpec, Boolean> allocateResourceFunction;
+ @Nonnull private final Consumer<WorkerResourceSpec> allocateResourceConsumer;
public TestingResourceAllocator(
@Nonnull BiConsumer<InstanceID, Exception> releaseResourceConsumer,
- @Nonnull Function<WorkerResourceSpec, Boolean> allocateResourceFunction) {
+ @Nonnull Consumer<WorkerResourceSpec> allocateResourceConsumer) {
this.releaseResourceConsumer = releaseResourceConsumer;
- this.allocateResourceFunction = allocateResourceFunction;
+ this.allocateResourceConsumer = allocateResourceConsumer;
+ }
+
+ @Override
+ public boolean isSupported() {
+ return true;
}
@Override
@@ -46,7 +52,12 @@ public class TestingResourceAllocator implements ResourceAllocator {
}
@Override
- public boolean allocateResource(WorkerResourceSpec workerResourceSpec) {
- return allocateResourceFunction.apply(workerResourceSpec);
+ public void releaseResource(ResourceID resourceID) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void allocateResource(WorkerResourceSpec workerResourceSpec) {
+ allocateResourceConsumer.accept(workerResourceSpec);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocatorBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocatorBuilder.java
index 8dc5abd2a81..795da0c2d69 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocatorBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocatorBuilder.java
@@ -23,12 +23,11 @@ import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
-import java.util.function.Function;
/** Builder for the {@link TestingResourceAllocator}. */
public class TestingResourceAllocatorBuilder {
private BiConsumer<InstanceID, Exception> releaseResourceConsumer = (ignoredA, ignoredB) -> {};
- private Function<WorkerResourceSpec, Boolean> allocateResourceFunction = (ignored) -> true;
+ private Consumer<WorkerResourceSpec> allocateResourceConsumer = (ignored) -> {};
public TestingResourceAllocatorBuilder setReleaseResourceConsumer(
BiConsumer<InstanceID, Exception> releaseResourceConsumer) {
@@ -36,23 +35,13 @@ public class TestingResourceAllocatorBuilder {
return this;
}
- public TestingResourceAllocatorBuilder setAllocateResourceFunction(
- Function<WorkerResourceSpec, Boolean> allocateResourceFunction) {
- this.allocateResourceFunction = allocateResourceFunction;
- return this;
- }
-
public TestingResourceAllocatorBuilder setAllocateResourceConsumer(
Consumer<WorkerResourceSpec> allocateResourceConsumer) {
- this.allocateResourceFunction =
- workerRequest -> {
- allocateResourceConsumer.accept(workerRequest);
- return true;
- };
+ this.allocateResourceConsumer = allocateResourceConsumer;
return this;
}
public TestingResourceAllocator build() {
- return new TestingResourceAllocator(releaseResourceConsumer, allocateResourceFunction);
+ return new TestingResourceAllocator(releaseResourceConsumer, allocateResourceConsumer);
}
}