You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/12/06 11:23:45 UTC
[flink] 01/02: [FLINK-29870][runtime] Split ResourceActions to ResourceAllocator and ResourceEventListener.
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a0aa4813b9fe3db37cabe38fa47f1d1096c879dc
Author: Weihua Hu <hu...@gmail.com>
AuthorDate: Thu Nov 3 16:10:49 2022 +0800
[FLINK-29870][runtime] Split ResourceActions to ResourceAllocator and ResourceEventListener.
---
.../runtime/resourcemanager/ResourceManager.java | 12 ++--
.../slotmanager/DeclarativeSlotManager.java | 25 +++----
.../slotmanager/FineGrainedSlotManager.java | 32 +++++----
...ResourceActions.java => ResourceAllocator.java} | 16 +----
...urceActions.java => ResourceEventListener.java} | 30 ++------
.../resourcemanager/slotmanager/SlotManager.java | 10 +--
.../slotmanager/TaskExecutorManager.java | 14 ++--
.../AbstractFineGrainedSlotManagerITCase.java | 12 ++--
.../slotmanager/DeclarativeSlotManagerBuilder.java | 40 +++++++++--
.../slotmanager/DeclarativeSlotManagerTest.java | 83 ++++++++++++----------
...gerDefaultResourceAllocationStrategyITCase.java | 2 +-
.../slotmanager/FineGrainedSlotManagerTest.java | 12 ++--
.../FineGrainedSlotManagerTestBase.java | 10 ++-
.../slotmanager/TaskExecutorManagerBuilder.java | 8 +--
.../slotmanager/TaskExecutorManagerTest.java | 38 +++++-----
...eActions.java => TestingResourceAllocator.java} | 25 ++-----
...r.java => TestingResourceAllocatorBuilder.java} | 28 ++------
.../slotmanager/TestingResourceEventListener.java | 48 +++++++++++++
.../TestingResourceEventListenerBuilder.java | 41 +++++++++++
.../slotmanager/TestingSlotManager.java | 3 +-
20 files changed, 284 insertions(+), 205 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 12684c25247..3abd6b6b68d 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -56,7 +56,8 @@ import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerExcept
import org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException;
import org.apache.flink.runtime.resourcemanager.registration.JobManagerRegistration;
import org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration;
-import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions;
+import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocator;
+import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceEventListener;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rest.messages.LogInfo;
import org.apache.flink.runtime.rest.messages.ThreadDumpInfo;
@@ -266,7 +267,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
slotManager.start(
getFencingToken(),
getMainThreadExecutor(),
- new ResourceActionsImpl(),
+ new ResourceAllocatorImpl(),
+ new ResourceEventListenerImpl(),
blocklistHandler::isBlockedTaskManager);
delegationTokenManager.start(this);
@@ -1334,7 +1336,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
}
}
- private class ResourceActionsImpl implements ResourceActions {
+ private class ResourceAllocatorImpl implements ResourceAllocator {
@Override
public void releaseResource(InstanceID instanceId, Exception cause) {
@@ -1348,9 +1350,11 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
validateRunsInMainThread();
return startNewWorker(workerResourceSpec);
}
+ }
+ private class ResourceEventListenerImpl implements ResourceEventListener {
@Override
- public void notifyNotEnoughResourcesAvailable(
+ public void notEnoughResourceAvailable(
JobID jobId, Collection<ResourceRequirement> acquiredResources) {
validateRunsInMainThread();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
index 71898ae747d..f5e210de073 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
@@ -69,7 +69,7 @@ public class DeclarativeSlotManager implements SlotManager {
private final SlotTracker slotTracker;
private final ResourceTracker resourceTracker;
- private final BiFunction<Executor, ResourceActions, TaskExecutorManager>
+ private final BiFunction<Executor, ResourceAllocator, TaskExecutorManager>
taskExecutorManagerFactory;
@Nullable private TaskExecutorManager taskExecutorManager;
@@ -97,8 +97,8 @@ public class DeclarativeSlotManager implements SlotManager {
/** Executor for future callbacks which have to be "synchronized". */
@Nullable private Executor mainThreadExecutor;
- /** Callbacks for resource (de-)allocations. */
- @Nullable private ResourceActions resourceActions;
+ /** Callbacks for resource not enough. */
+ @Nullable private ResourceEventListener resourceEventListener;
/** The future of the requirements delay check. */
@Nullable private CompletableFuture<Void> requirementsCheckFuture;
@@ -131,7 +131,7 @@ public class DeclarativeSlotManager implements SlotManager {
slotMatchingStrategy = slotManagerConfiguration.getSlotMatchingStrategy();
taskExecutorManagerFactory =
- (executor, resourceActions) ->
+ (executor, resourceAllocator) ->
new TaskExecutorManager(
slotManagerConfiguration.getDefaultWorkerResourceSpec(),
slotManagerConfiguration.getNumSlotsPerWorker(),
@@ -141,10 +141,10 @@ public class DeclarativeSlotManager implements SlotManager {
slotManagerConfiguration.getTaskManagerTimeout(),
scheduledExecutor,
executor,
- resourceActions);
+ resourceAllocator);
resourceManagerId = null;
- resourceActions = null;
+ resourceEventListener = null;
mainThreadExecutor = null;
taskExecutorManager = null;
blockedTaskManagerChecker = null;
@@ -199,22 +199,23 @@ public class DeclarativeSlotManager implements SlotManager {
*
* @param newResourceManagerId to use for communication with the task managers
* @param newMainThreadExecutor to use to run code in the ResourceManager's main thread
- * @param newResourceActions to use for resource (de-)allocations
+ * @param newResourceAllocator to use for resource (de-)allocations
* @param newBlockedTaskManagerChecker to query whether a task manager is blocked
*/
@Override
public void start(
ResourceManagerId newResourceManagerId,
Executor newMainThreadExecutor,
- ResourceActions newResourceActions,
+ ResourceAllocator newResourceAllocator,
+ ResourceEventListener newResourceEventListener,
BlockedTaskManagerChecker newBlockedTaskManagerChecker) {
LOG.debug("Starting the slot manager.");
this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
- resourceActions = Preconditions.checkNotNull(newResourceActions);
+ resourceEventListener = Preconditions.checkNotNull(newResourceEventListener);
taskExecutorManager =
- taskExecutorManagerFactory.apply(newMainThreadExecutor, newResourceActions);
+ taskExecutorManagerFactory.apply(newMainThreadExecutor, newResourceAllocator);
blockedTaskManagerChecker = Preconditions.checkNotNull(newBlockedTaskManagerChecker);
started = true;
@@ -253,7 +254,7 @@ public class DeclarativeSlotManager implements SlotManager {
taskExecutorManager = null;
resourceManagerId = null;
- resourceActions = null;
+ resourceEventListener = null;
blockedTaskManagerChecker = null;
started = false;
}
@@ -723,7 +724,7 @@ public class DeclarativeSlotManager implements SlotManager {
"Could not fulfill resource requirements of job {}. Free slots: {}",
jobId,
slotTracker.getFreeSlots().size());
- resourceActions.notifyNotEnoughResourcesAvailable(
+ resourceEventListener.notEnoughResourceAvailable(
jobId, resourceTracker.getAcquiredResources(jobId));
return pendingSlots;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
index 4833c7ad1a6..c999d9883de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
@@ -108,7 +108,10 @@ public class FineGrainedSlotManager implements SlotManager {
@Nullable private Executor mainThreadExecutor;
/** Callbacks for resource (de-)allocations. */
- @Nullable private ResourceActions resourceActions;
+ @Nullable private ResourceAllocator resourceAllocator;
+
+ /** Callbacks for resource not enough. */
+ @Nullable private ResourceEventListener resourceEventListener;
@Nullable private ScheduledFuture<?> taskManagerTimeoutsCheck;
@@ -149,7 +152,8 @@ public class FineGrainedSlotManager implements SlotManager {
this.maxTotalMem = Preconditions.checkNotNull(slotManagerConfiguration.getMaxTotalMem());
resourceManagerId = null;
- resourceActions = null;
+ resourceAllocator = null;
+ resourceEventListener = null;
mainThreadExecutor = null;
taskManagerTimeoutsCheck = null;
requirementsCheckFuture = null;
@@ -166,7 +170,7 @@ public class FineGrainedSlotManager implements SlotManager {
if (failUnfulfillableRequest && !unfulfillableJobs.isEmpty()) {
for (JobID jobId : unfulfillableJobs) {
- resourceActions.notifyNotEnoughResourcesAvailable(
+ resourceEventListener.notEnoughResourceAvailable(
jobId, resourceTracker.getAcquiredResources(jobId));
}
}
@@ -186,20 +190,22 @@ public class FineGrainedSlotManager implements SlotManager {
*
* @param newResourceManagerId to use for communication with the task managers
* @param newMainThreadExecutor to use to run code in the ResourceManager's main thread
- * @param newResourceActions to use for resource (de-)allocations
+ * @param newResourceAllocator to use for resource (de-)allocations
* @param newBlockedTaskManagerChecker to query whether a task manager is blocked
*/
@Override
public void start(
ResourceManagerId newResourceManagerId,
Executor newMainThreadExecutor,
- ResourceActions newResourceActions,
+ ResourceAllocator newResourceAllocator,
+ ResourceEventListener newResourceEventListener,
BlockedTaskManagerChecker newBlockedTaskManagerChecker) {
LOG.info("Starting the slot manager.");
resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
- resourceActions = Preconditions.checkNotNull(newResourceActions);
+ resourceAllocator = Preconditions.checkNotNull(newResourceAllocator);
+ resourceEventListener = Preconditions.checkNotNull(newResourceEventListener);
slotStatusSyncer.initialize(
taskManagerTracker, resourceTracker, resourceManagerId, mainThreadExecutor);
blockedTaskManagerChecker = Preconditions.checkNotNull(newBlockedTaskManagerChecker);
@@ -246,7 +252,8 @@ public class FineGrainedSlotManager implements SlotManager {
unfulfillableJobs.clear();
resourceManagerId = null;
- resourceActions = null;
+ resourceAllocator = null;
+ resourceEventListener = null;
started = false;
}
@@ -346,7 +353,7 @@ public class FineGrainedSlotManager implements SlotManager {
taskExecutorConnection.getResourceID(),
maxTotalCpu,
maxTotalMem.toHumanReadableString());
- resourceActions.releaseResource(
+ resourceAllocator.releaseResource(
taskExecutorConnection.getInstanceID(),
new FlinkExpectedException(
"The max total resource limitation is reached."));
@@ -571,7 +578,7 @@ public class FineGrainedSlotManager implements SlotManager {
if (sendNotEnoughResourceNotifications) {
for (JobID jobId : unfulfillableJobs) {
LOG.warn("Could not fulfill resource requirements of job {}.", jobId);
- resourceActions.notifyNotEnoughResourcesAvailable(
+ resourceEventListener.notEnoughResourceAvailable(
jobId, resourceTracker.getAcquiredResources(jobId));
}
}
@@ -730,7 +737,7 @@ public class FineGrainedSlotManager implements SlotManager {
private void releaseIdleTaskExecutor(InstanceID timedOutTaskManagerId) {
final FlinkExpectedException cause =
new FlinkExpectedException("TaskManager exceeded the idle timeout.");
- resourceActions.releaseResource(timedOutTaskManagerId, cause);
+ resourceAllocator.releaseResource(timedOutTaskManagerId, cause);
}
private boolean allocateResource(PendingTaskManager pendingTaskManager) {
@@ -743,7 +750,7 @@ public class FineGrainedSlotManager implements SlotManager {
return false;
}
- if (!resourceActions.allocateResource(
+ if (!resourceAllocator.allocateResource(
WorkerResourceSpec.fromTotalResourceProfile(
pendingTaskManager.getTotalResourceProfile(),
pendingTaskManager.getNumSlots()))) {
@@ -771,7 +778,8 @@ public class FineGrainedSlotManager implements SlotManager {
Preconditions.checkState(started, "The slot manager has not been started.");
Preconditions.checkNotNull(resourceManagerId);
Preconditions.checkNotNull(mainThreadExecutor);
- Preconditions.checkNotNull(resourceActions);
+ Preconditions.checkNotNull(resourceAllocator);
+ Preconditions.checkNotNull(resourceEventListener);
}
private boolean isMaxTotalResourceExceededAfterAdding(ResourceProfile newResource) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocator.java
similarity index 73%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocator.java
index 639f8cb53fa..179a2ad5ae1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocator.java
@@ -18,15 +18,11 @@
package org.apache.flink.runtime.resourcemanager.slotmanager;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
-import org.apache.flink.runtime.slots.ResourceRequirement;
-
-import java.util.Collection;
/** Resource related actions which the {@link SlotManager} can perform. */
-public interface ResourceActions {
+public interface ResourceAllocator {
/**
* Releases the resource with the given instance id.
@@ -43,14 +39,4 @@ public interface ResourceActions {
* @return whether the resource can be allocated
*/
boolean allocateResource(WorkerResourceSpec workerResourceSpec);
-
- /**
- * Notifies that not enough resources are available to fulfill the resource requirements of a
- * job.
- *
- * @param jobId job for which not enough resources are available
- * @param acquiredResources the resources that have been acquired for the job
- */
- void notifyNotEnoughResourcesAvailable(
- JobID jobId, Collection<ResourceRequirement> acquiredResources);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceEventListener.java
similarity index 53%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceEventListener.java
index 639f8cb53fa..1106b378a18 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceEventListener.java
@@ -19,38 +19,16 @@
package org.apache.flink.runtime.resourcemanager.slotmanager;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.slots.ResourceRequirement;
import java.util.Collection;
-/** Resource related actions which the {@link SlotManager} can perform. */
-public interface ResourceActions {
-
- /**
- * Releases the resource with the given instance id.
- *
- * @param instanceId identifying which resource to release
- * @param cause why the resource is released
- */
- void releaseResource(InstanceID instanceId, Exception cause);
-
- /**
- * Requests to allocate a resource with the given {@link WorkerResourceSpec}.
- *
- * @param workerResourceSpec for the to be allocated worker
- * @return whether the resource can be allocated
- */
- boolean allocateResource(WorkerResourceSpec workerResourceSpec);
-
+/** Listener for resource events of {@link SlotManager}. */
+@FunctionalInterface
+public interface ResourceEventListener {
/**
- * Notifies that not enough resources are available to fulfill the resource requirements of a
- * job.
- *
* @param jobId job for which not enough resources are available
* @param acquiredResources the resources that have been acquired for the job
*/
- void notifyNotEnoughResourcesAvailable(
- JobID jobId, Collection<ResourceRequirement> acquiredResources);
+ void notEnoughResourceAvailable(JobID jobId, Collection<ResourceRequirement> acquiredResources);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index cf706515b02..9ea06c1478b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -40,7 +40,7 @@ import java.util.concurrent.Executor;
* their allocation and all pending slot requests. Whenever a new slot is registered or an allocated
* slot is freed, then it tries to fulfill another pending slot request. Whenever there are not
* enough slots available the slot manager will notify the resource manager about it via {@link
- * ResourceActions#allocateResource(WorkerResourceSpec)}.
+ * ResourceAllocator#allocateResource(WorkerResourceSpec)}.
*
* <p>In order to free resources and avoid resource leaks, idling task managers (task managers whose
* slots are currently not used) and pending slot requests time out triggering their release and
@@ -56,7 +56,7 @@ public interface SlotManager extends AutoCloseable {
int getNumberFreeSlotsOf(InstanceID instanceId);
/**
- * Get number of workers SlotManager requested from {@link ResourceActions} that are not yet
+ * Get number of workers SlotManager requested from {@link ResourceAllocator} that are not yet
* fulfilled.
*
* @return a map whose key set is all the unique resource specs of the pending workers, and the
@@ -79,13 +79,15 @@ public interface SlotManager extends AutoCloseable {
*
* @param newResourceManagerId to use for communication with the task managers
* @param newMainThreadExecutor to use to run code in the ResourceManager's main thread
- * @param newResourceActions to use for resource (de-)allocations
+ * @param newResourceAllocator to use for resource (de-)allocations
+ * @param resourceEventListener to use for notify resource not enough
* @param newBlockedTaskManagerChecker to query whether a task manager is blocked
*/
void start(
ResourceManagerId newResourceManagerId,
Executor newMainThreadExecutor,
- ResourceActions newResourceActions,
+ ResourceAllocator newResourceAllocator,
+ ResourceEventListener resourceEventListener,
BlockedTaskManagerChecker newBlockedTaskManagerChecker);
/** Suspends the component. This clears the internal state of the slot manager. */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
index 1df5a872ee4..d1515a45eea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
@@ -90,7 +90,7 @@ class TaskExecutorManager implements AutoCloseable {
private final Time taskManagerTimeout;
/** Callbacks for resource (de-)allocations. */
- private final ResourceActions resourceActions;
+ private final ResourceAllocator resourceAllocator;
/** All currently registered task managers. */
private final Map<InstanceID, TaskManagerRegistration> taskManagerRegistrations =
@@ -111,7 +111,7 @@ class TaskExecutorManager implements AutoCloseable {
Time taskManagerTimeout,
ScheduledExecutor scheduledExecutor,
Executor mainThreadExecutor,
- ResourceActions resourceActions) {
+ ResourceAllocator resourceAllocator) {
this.defaultWorkerResourceSpec = defaultWorkerResourceSpec;
this.numSlotsPerWorker = numSlotsPerWorker;
@@ -123,7 +123,7 @@ class TaskExecutorManager implements AutoCloseable {
SlotManagerUtils.generateDefaultSlotResourceProfile(
defaultWorkerResourceSpec, numSlotsPerWorker);
- this.resourceActions = Preconditions.checkNotNull(resourceActions);
+ this.resourceAllocator = Preconditions.checkNotNull(resourceAllocator);
this.mainThreadExecutor = mainThreadExecutor;
taskManagerTimeoutsAndRedundancyCheck =
scheduledExecutor.scheduleWithFixedDelay(
@@ -157,7 +157,7 @@ class TaskExecutorManager implements AutoCloseable {
LOG.info(
"The total number of slots exceeds the max limitation {}, releasing the excess task executor.",
maxSlotNum);
- resourceActions.releaseResource(
+ resourceAllocator.releaseResource(
taskExecutorConnection.getInstanceID(),
new FlinkExpectedException(
"The total number of slots exceeds the max limitation."));
@@ -270,7 +270,7 @@ class TaskExecutorManager implements AutoCloseable {
return Optional.empty();
}
- if (!resourceActions.allocateResource(defaultWorkerResourceSpec)) {
+ if (!resourceAllocator.allocateResource(defaultWorkerResourceSpec)) {
// resource cannot be allocated
return Optional.empty();
}
@@ -321,7 +321,7 @@ class TaskExecutorManager implements AutoCloseable {
>= taskManagerTimeout.toMilliseconds()) {
// we collect the instance ids first in order to avoid concurrent modifications
// by the
- // ResourceActions.releaseResource call
+ // ResourceAllocator.releaseResource call
timedOutTaskManagers.add(taskManagerRegistration);
}
}
@@ -406,7 +406,7 @@ class TaskExecutorManager implements AutoCloseable {
LOG.debug(
"Release TaskExecutor {} because it exceeded the idle timeout.",
timedOutTaskManagerId);
- resourceActions.releaseResource(timedOutTaskManagerId, cause);
+ resourceAllocator.releaseResource(timedOutTaskManagerId, cause);
}
// ---------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java
index fd86e99efb1..e472b718539 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java
@@ -67,7 +67,7 @@ abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSlotManag
new CompletableFuture<>();
new Context() {
{
- resourceActionsBuilder.setAllocateResourceConsumer(
+ resourceAllocatorBuilder.setAllocateResourceConsumer(
allocateResourceFuture::complete);
runTest(
() -> {
@@ -224,7 +224,7 @@ abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSlotManag
new Context() {
{
setBlockedTaskManagerChecker(blockedTaskManager::equals);
- resourceActionsBuilder.setAllocateResourceConsumer(
+ resourceAllocatorBuilder.setAllocateResourceConsumer(
allocateResourceFuture::complete);
runTest(
() -> {
@@ -259,7 +259,7 @@ abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSlotManag
new Context() {
{
- resourceActionsBuilder.setAllocateResourceConsumer(
+ resourceAllocatorBuilder.setAllocateResourceConsumer(
ignored -> {
if (allocateResourceFutures.get(0).isDone()) {
allocateResourceFutures.get(1).complete(null);
@@ -430,7 +430,7 @@ abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSlotManag
final SlotReport slotReport = new SlotReport();
new Context() {
{
- resourceActionsBuilder.setAllocateResourceConsumer(
+ resourceAllocatorBuilder.setAllocateResourceConsumer(
ignored -> allocateResourceFutures.complete(null));
runTest(
() -> {
@@ -481,7 +481,7 @@ abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSlotManag
final SlotReport slotReport = new SlotReport();
new Context() {
{
- resourceActionsBuilder.setAllocateResourceConsumer(
+ resourceAllocatorBuilder.setAllocateResourceConsumer(
ignored -> allocateResourceFutures.complete(null));
runTest(
() -> {
@@ -528,7 +528,7 @@ abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSlotManag
new Context() {
{
- resourceActionsBuilder.setAllocateResourceConsumer(
+ resourceAllocatorBuilder.setAllocateResourceConsumer(
ignored -> {
if (allocateResourceFutures.get(0).isDone()) {
allocateResourceFutures.get(1).complete(null);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java
index ee0b811350b..959f5250018 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java
@@ -170,30 +170,56 @@ public class DeclarativeSlotManagerBuilder {
public DeclarativeSlotManager buildAndStartWithDirectExec() {
return buildAndStartWithDirectExec(
- ResourceManagerId.generate(), new TestingResourceActionsBuilder().build());
+ ResourceManagerId.generate(),
+ new TestingResourceAllocatorBuilder().build(),
+ new TestingResourceEventListenerBuilder().build());
}
public DeclarativeSlotManager buildAndStartWithDirectExec(
- ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions) {
- return buildAndStart(resourceManagerId, Executors.directExecutor(), resourceManagerActions);
+ ResourceManagerId resourceManagerId, ResourceAllocator resourceAllocator) {
+ return buildAndStartWithDirectExec(
+ resourceManagerId,
+ resourceAllocator,
+ new TestingResourceEventListenerBuilder().build());
+ }
+
+ public DeclarativeSlotManager buildAndStartWithDirectExec(
+ ResourceManagerId resourceManagerId,
+ ResourceAllocator resourceAllocator,
+ ResourceEventListener resourceEventListener) {
+ return buildAndStart(
+ resourceManagerId,
+ Executors.directExecutor(),
+ resourceAllocator,
+ resourceEventListener);
}
public DeclarativeSlotManager buildAndStart(
ResourceManagerId resourceManagerId,
Executor executor,
- ResourceActions resourceManagerActions) {
+ ResourceAllocator resourceAllocator,
+ ResourceEventListener resourceEventListener) {
return buildAndStart(
- resourceManagerId, executor, resourceManagerActions, resourceID -> false);
+ resourceManagerId,
+ executor,
+ resourceAllocator,
+ resourceEventListener,
+ resourceID -> false);
}
public DeclarativeSlotManager buildAndStart(
ResourceManagerId resourceManagerId,
Executor executor,
- ResourceActions resourceManagerActions,
+ ResourceAllocator resourceAllocator,
+ ResourceEventListener resourceEventListener,
BlockedTaskManagerChecker blockedTaskManagerChecker) {
final DeclarativeSlotManager slotManager = build();
slotManager.start(
- resourceManagerId, executor, resourceManagerActions, blockedTaskManagerChecker);
+ resourceManagerId,
+ executor,
+ resourceAllocator,
+ resourceEventListener,
+ blockedTaskManagerChecker);
return slotManager;
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
index 6ae3cdeffba..684c4145cea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
@@ -194,13 +194,12 @@ class DeclarativeSlotManagerTest {
final ResourceRequirements resourceRequirements = createResourceRequirementsForSingleSlot();
CompletableFuture<WorkerResourceSpec> allocateResourceFuture = new CompletableFuture<>();
- ResourceActions resourceManagerActions =
- new TestingResourceActionsBuilder()
+ ResourceAllocator resourceAllocator =
+ new TestingResourceAllocatorBuilder()
.setAllocateResourceConsumer(allocateResourceFuture::complete)
.build();
- try (SlotManager slotManager =
- createSlotManager(resourceManagerId, resourceManagerActions)) {
+ try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceAllocator)) {
slotManager.processResourceRequirements(resourceRequirements);
@@ -218,8 +217,8 @@ class DeclarativeSlotManagerTest {
final ResourceRequirements resourceRequirements = createResourceRequirementsForSingleSlot();
CompletableFuture<WorkerResourceSpec> allocateResourceFuture = new CompletableFuture<>();
- ResourceActions resourceManagerActions =
- new TestingResourceActionsBuilder()
+ ResourceAllocator resourceAllocator =
+ new TestingResourceAllocatorBuilder()
.setAllocateResourceConsumer(allocateResourceFuture::complete)
.build();
@@ -230,7 +229,8 @@ class DeclarativeSlotManagerTest {
.buildAndStart(
resourceManagerId,
Executors.directExecutor(),
- resourceManagerActions,
+ resourceAllocator,
+ new TestingResourceEventListenerBuilder().build(),
blockedTaskManager::equals)) {
final TaskExecutorGateway taskExecutorGateway =
@@ -258,8 +258,8 @@ class DeclarativeSlotManagerTest {
void testRequirementDeclarationWithResourceAllocationFailure() throws Exception {
final ResourceRequirements resourceRequirements = createResourceRequirementsForSingleSlot();
- ResourceActions resourceManagerActions =
- new TestingResourceActionsBuilder()
+ ResourceAllocator resourceAllocator =
+ new TestingResourceAllocatorBuilder()
.setAllocateResourceFunction(value -> false)
.build();
@@ -269,7 +269,7 @@ class DeclarativeSlotManagerTest {
createDeclarativeSlotManagerBuilder()
.setResourceTracker(resourceTracker)
.buildAndStartWithDirectExec(
- ResourceManagerId.generate(), resourceManagerActions)) {
+ ResourceManagerId.generate(), resourceAllocator)) {
slotManager.processResourceRequirements(resourceRequirements);
@@ -347,7 +347,7 @@ class DeclarativeSlotManagerTest {
createDeclarativeSlotManagerBuilder()
.setSlotTracker(slotTracker)
.buildAndStartWithDirectExec(
- resourceManagerId, new TestingResourceActionsBuilder().build())) {
+ resourceManagerId, new TestingResourceAllocatorBuilder().build())) {
if (scenario
== RequirementDeclarationScenario
@@ -432,8 +432,8 @@ class DeclarativeSlotManagerTest {
void testDuplicateResourceRequirementDeclarationAfterSuccessfulAllocation() throws Exception {
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
final AtomicInteger allocateResourceCalls = new AtomicInteger(0);
- final ResourceActions resourceManagerActions =
- new TestingResourceActionsBuilder()
+ final ResourceAllocator resourceAllocator =
+ new TestingResourceAllocatorBuilder()
.setAllocateResourceConsumer(
ignored -> allocateResourceCalls.incrementAndGet())
.build();
@@ -454,7 +454,7 @@ class DeclarativeSlotManagerTest {
try (DeclarativeSlotManager slotManager =
createDeclarativeSlotManagerBuilder()
.setSlotTracker(slotTracker)
- .buildAndStartWithDirectExec(resourceManagerId, resourceManagerActions)) {
+ .buildAndStartWithDirectExec(resourceManagerId, resourceAllocator)) {
slotManager.registerTaskManager(
taskManagerConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
@@ -549,14 +549,13 @@ class DeclarativeSlotManagerTest {
@Test
void testReceivingUnknownSlotReport() throws Exception {
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
- final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
+ final ResourceAllocator resourceAllocator = new TestingResourceAllocatorBuilder().build();
final InstanceID unknownInstanceID = new InstanceID();
final SlotID unknownSlotId = new SlotID(ResourceID.generate(), 0);
final SlotReport unknownSlotReport = new SlotReport(createFreeSlotStatus(unknownSlotId));
- try (SlotManager slotManager =
- createSlotManager(resourceManagerId, resourceManagerActions)) {
+ try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceAllocator)) {
// check that we don't have any slots registered
assertThat(slotManager.getNumberRegisteredSlots()).isEqualTo(0);
@@ -656,7 +655,8 @@ class DeclarativeSlotManagerTest {
.buildAndStart(
ResourceManagerId.generate(),
mainThreadExecutor,
- new TestingResourceActionsBuilder().build())) {
+ new TestingResourceAllocatorBuilder().build(),
+ new TestingResourceEventListenerBuilder().build())) {
CompletableFuture.runAsync(
() ->
@@ -790,7 +790,8 @@ class DeclarativeSlotManagerTest {
.buildAndStart(
ResourceManagerId.generate(),
mainThreadExecutor,
- new TestingResourceActionsBuilder().build())) {
+ new TestingResourceAllocatorBuilder().build(),
+ new TestingResourceEventListenerBuilder().build())) {
slotManager.registerTaskManager(
taskExecutorConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
@@ -1073,8 +1074,8 @@ class DeclarativeSlotManagerTest {
void testRequestNewResources() throws Exception {
final int numberSlots = 2;
final AtomicInteger resourceRequests = new AtomicInteger(0);
- final TestingResourceActions testingResourceActions =
- new TestingResourceActionsBuilder()
+ final TestingResourceAllocator testingResourceAllocator =
+ new TestingResourceAllocatorBuilder()
.setAllocateResourceFunction(
ignored -> {
resourceRequests.incrementAndGet();
@@ -1084,7 +1085,7 @@ class DeclarativeSlotManagerTest {
try (final DeclarativeSlotManager slotManager =
createSlotManager(
- ResourceManagerId.generate(), testingResourceActions, numberSlots)) {
+ ResourceManagerId.generate(), testingResourceAllocator, numberSlots)) {
final JobID jobId = new JobID();
@@ -1190,10 +1191,14 @@ class DeclarativeSlotManagerTest {
List<Tuple2<JobID, Collection<ResourceRequirement>>> notEnoughResourceNotifications =
new ArrayList<>();
- ResourceActions resourceManagerActions =
- new TestingResourceActionsBuilder()
+ ResourceAllocator resourceAllocator =
+ new TestingResourceAllocatorBuilder()
.setAllocateResourceFunction(ignored -> false)
- .setNotEnoughResourcesConsumer(
+ .build();
+
+ ResourceEventListener resourceEventListener =
+ new TestingResourceEventListenerBuilder()
+ .setNotEnoughResourceAvailableConsumer(
(jobId1, acquiredResources) ->
notEnoughResourceNotifications.add(
Tuple2.of(jobId1, acquiredResources)))
@@ -1204,7 +1209,8 @@ class DeclarativeSlotManagerTest {
.buildAndStart(
ResourceManagerId.generate(),
new ManuallyTriggeredScheduledExecutor(),
- resourceManagerActions)) {
+ resourceAllocator,
+ resourceEventListener)) {
if (withNotificationGracePeriod) {
// this should disable notifications
@@ -1271,7 +1277,8 @@ class DeclarativeSlotManagerTest {
.buildAndStart(
ResourceManagerId.generate(),
executor,
- new TestingResourceActionsBuilder().build())) {
+ new TestingResourceAllocatorBuilder().build(),
+ new TestingResourceEventListenerBuilder().build())) {
JobID jobId = new JobID();
slotManager.processResourceRequirements(createResourceRequirements(jobId, 1));
@@ -1318,7 +1325,8 @@ class DeclarativeSlotManagerTest {
.buildAndStart(
ResourceManagerId.generate(),
executor,
- new TestingResourceActionsBuilder().build())) {
+ new TestingResourceAllocatorBuilder().build(),
+ new TestingResourceEventListenerBuilder().build())) {
JobID jobId = new JobID();
slotManager.processResourceRequirements(createResourceRequirements(jobId, 1));
@@ -1372,7 +1380,8 @@ class DeclarativeSlotManagerTest {
.buildAndStart(
ResourceManagerId.generate(),
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
- new TestingResourceActionsBuilder().build())) {
+ new TestingResourceAllocatorBuilder().build(),
+ new TestingResourceEventListenerBuilder().build())) {
final TaskExecutorConnection taskExecutionConnection =
createTaskExecutorConnection(taskExecutorGateway);
@@ -1422,7 +1431,8 @@ class DeclarativeSlotManagerTest {
.buildAndStart(
ResourceManagerId.generate(),
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
- new TestingResourceActionsBuilder().build())) {
+ new TestingResourceAllocatorBuilder().build(),
+ new TestingResourceEventListenerBuilder().build())) {
final JobID jobId = new JobID();
@@ -1461,7 +1471,7 @@ class DeclarativeSlotManagerTest {
.setRequirementCheckDelay(delay)
.buildAndStartWithDirectExec(
ResourceManagerId.generate(),
- new TestingResourceActionsBuilder()
+ new TestingResourceAllocatorBuilder()
.setAllocateResourceConsumer(
workerResourceSpec ->
allocatedResourceCounter.getAndIncrement())
@@ -1502,7 +1512,8 @@ class DeclarativeSlotManagerTest {
.buildAndStart(
ResourceManagerId.generate(),
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
- new TestingResourceActionsBuilder().build())) {
+ new TestingResourceAllocatorBuilder().build(),
+ new TestingResourceEventListenerBuilder().build())) {
final JobID jobId = new JobID();
@@ -1585,19 +1596,19 @@ class DeclarativeSlotManagerTest {
}
private DeclarativeSlotManager createSlotManager(
- ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions) {
- return createSlotManager(resourceManagerId, resourceManagerActions, 1);
+ ResourceManagerId resourceManagerId, ResourceAllocator resourceAllocator) {
+ return createSlotManager(resourceManagerId, resourceAllocator, 1);
}
private DeclarativeSlotManager createSlotManager(
ResourceManagerId resourceManagerId,
- ResourceActions resourceManagerActions,
+ ResourceAllocator resourceAllocator,
int numSlotsPerWorker) {
return createDeclarativeSlotManagerBuilder(
new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()))
.setNumSlotsPerWorker(numSlotsPerWorker)
.setRedundantTaskManagerNum(0)
- .buildAndStartWithDirectExec(resourceManagerId, resourceManagerActions);
+ .buildAndStartWithDirectExec(resourceManagerId, resourceAllocator);
}
private static DeclarativeSlotManagerBuilder createDeclarativeSlotManagerBuilder() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
index d68320761df..73e0355bd31 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
@@ -52,7 +52,7 @@ class FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase
new Context() {
{
- resourceActionsBuilder.setAllocateResourceFunction(
+ resourceAllocatorBuilder.setAllocateResourceFunction(
ignored -> {
resourceRequests.incrementAndGet();
return true;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
index bfc438c4316..0c85473d13a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
@@ -378,7 +378,7 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
allocateResourceFutures.add(new CompletableFuture<>());
new Context() {
{
- resourceActionsBuilder.setAllocateResourceConsumer(
+ resourceAllocatorBuilder.setAllocateResourceConsumer(
ignored -> {
if (allocateResourceFutures.get(0).isDone()) {
allocateResourceFutures.get(1).complete(null);
@@ -445,7 +445,7 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
pendingTaskManager.getPendingTaskManagerId(),
DEFAULT_SLOT_RESOURCE_PROFILE)
.build()));
- resourceActionsBuilder.setAllocateResourceConsumer(
+ resourceAllocatorBuilder.setAllocateResourceConsumer(
ignored -> requestResourceFuture.complete(null));
runTest(
() -> {
@@ -496,7 +496,7 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
final CompletableFuture<Void> notifyNotEnoughResourceFuture = new CompletableFuture<>();
new Context() {
{
- resourceActionsBuilder.setNotEnoughResourcesConsumer(
+ resourceEventListenerBuilder.setNotEnoughResourceAvailableConsumer(
(jobId1, acquiredResources) -> {
notEnoughResourceNotifications.add(
Tuple2.of(jobId1, acquiredResources));
@@ -638,7 +638,7 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
final InstanceID instanceId = taskExecutionConnection.getInstanceID();
new Context() {
{
- resourceActionsBuilder.setReleaseResourceConsumer(
+ resourceAllocatorBuilder.setReleaseResourceConsumer(
(instanceID, e) -> releaseResourceFuture.complete(instanceID));
slotManagerConfigurationBuilder.setTaskManagerTimeout(taskManagerTimeout);
runTest(
@@ -827,7 +827,7 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
{
maxTotalResourceSetter.accept(slotManagerConfigurationBuilder);
- resourceActionsBuilder.setAllocateResourceConsumer(
+ resourceAllocatorBuilder.setAllocateResourceConsumer(
ignored -> {
if (allocateResourceFutures.get(0).isDone()) {
allocateResourceFutures.get(1).complete(null);
@@ -876,7 +876,7 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
{
maxTotalResourceSetter.accept(slotManagerConfigurationBuilder);
- resourceActionsBuilder.setReleaseResourceConsumer(
+ resourceAllocatorBuilder.setReleaseResourceConsumer(
(instanceId, ignore) -> releaseResourceFuture.complete(instanceId));
runTest(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
index 049ef97bc0e..f2e8e4f9d76 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
@@ -157,8 +157,11 @@ abstract class FineGrainedSlotManagerTestBase {
final TestingResourceAllocationStrategy.Builder resourceAllocationStrategyBuilder =
TestingResourceAllocationStrategy.newBuilder();
- final TestingResourceActionsBuilder resourceActionsBuilder =
- new TestingResourceActionsBuilder();
+ final TestingResourceAllocatorBuilder resourceAllocatorBuilder =
+ new TestingResourceAllocatorBuilder();
+
+ final TestingResourceEventListenerBuilder resourceEventListenerBuilder =
+ new TestingResourceEventListenerBuilder();
final SlotManagerConfigurationBuilder slotManagerConfigurationBuilder =
SlotManagerConfigurationBuilder.newBuilder();
@@ -221,7 +224,8 @@ abstract class FineGrainedSlotManagerTestBase {
slotManager.start(
resourceManagerId,
mainThreadExecutor,
- resourceActionsBuilder.build(),
+ resourceAllocatorBuilder.build(),
+ resourceEventListenerBuilder.build(),
blockedTaskManagerChecker));
testMethod.run();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerBuilder.java
index 157e894aea1..88e37e1ba16 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerBuilder.java
@@ -35,7 +35,7 @@ public class TaskExecutorManagerBuilder {
private Time taskManagerTimeout = Time.seconds(5);
private final ScheduledExecutor scheduledExecutor;
private Executor mainThreadExecutor = Executors.directExecutor();
- private ResourceActions newResourceActions = new TestingResourceActionsBuilder().build();
+ private ResourceAllocator newResourceAllocator = new TestingResourceAllocatorBuilder().build();
public TaskExecutorManagerBuilder(ScheduledExecutor scheduledExecutor) {
this.scheduledExecutor = scheduledExecutor;
@@ -78,8 +78,8 @@ public class TaskExecutorManagerBuilder {
return this;
}
- public TaskExecutorManagerBuilder setResourceActions(ResourceActions newResourceActions) {
- this.newResourceActions = newResourceActions;
+ public TaskExecutorManagerBuilder setResourceAllocator(ResourceAllocator newResourceAllocator) {
+ this.newResourceAllocator = newResourceAllocator;
return this;
}
@@ -93,6 +93,6 @@ public class TaskExecutorManagerBuilder {
taskManagerTimeout,
scheduledExecutor,
mainThreadExecutor,
- newResourceActions);
+ newResourceAllocator);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
index 28593e31f76..ed01b63b4c7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.java
@@ -133,7 +133,7 @@ public class TaskExecutorManagerTest extends TestLogger {
/**
* Tests that a task manager timeout does not remove the slots from the SlotManager. A timeout
- * should only trigger the {@link ResourceActions#releaseResource(InstanceID, Exception)}
+ * should only trigger the {@link ResourceAllocator#releaseResource(InstanceID, Exception)}
* callback. The receiver of the callback can then decide what to do with the TaskManager.
*
* <p>See FLINK-7793
@@ -143,8 +143,8 @@ public class TaskExecutorManagerTest extends TestLogger {
final Time taskManagerTimeout = Time.milliseconds(10L);
final CompletableFuture<InstanceID> releaseResourceFuture = new CompletableFuture<>();
- final ResourceActions resourceActions =
- createResourceActionsBuilder()
+ final ResourceAllocator resourceAllocator =
+ createResourceAllocatorBuilder()
.setReleaseResourceConsumer(
(instanceId, ignored) -> releaseResourceFuture.complete(instanceId))
.build();
@@ -154,7 +154,7 @@ public class TaskExecutorManagerTest extends TestLogger {
try (final TaskExecutorManager taskExecutorManager =
createTaskExecutorManagerBuilder()
.setTaskManagerTimeout(taskManagerTimeout)
- .setResourceActions(resourceActions)
+ .setResourceAllocator(resourceAllocator)
.setMainThreadExecutor(mainThreadExecutor)
.createTaskExecutorManager()) {
@@ -195,8 +195,8 @@ public class TaskExecutorManagerTest extends TestLogger {
final Time taskManagerTimeout = Time.milliseconds(50L);
final CompletableFuture<InstanceID> releaseResourceFuture = new CompletableFuture<>();
- final ResourceActions resourceManagerActions =
- new TestingResourceActionsBuilder()
+ final ResourceAllocator resourceAllocator =
+ new TestingResourceAllocatorBuilder()
.setReleaseResourceConsumer(
(instanceID, e) -> releaseResourceFuture.complete(instanceID))
.build();
@@ -207,7 +207,7 @@ public class TaskExecutorManagerTest extends TestLogger {
createTaskExecutorManagerBuilder()
.setTaskManagerTimeout(taskManagerTimeout)
.setDefaultWorkerResourceSpec(workerResourceSpec)
- .setResourceActions(resourceManagerActions)
+ .setResourceAllocator(resourceAllocator)
.setMainThreadExecutor(mainThreadExecutor)
.createTaskExecutorManager()) {
@@ -248,8 +248,8 @@ public class TaskExecutorManagerTest extends TestLogger {
ResourceProfile.newBuilder().setCpuCores(numCoresPerWorker + 1).build();
final AtomicInteger resourceRequests = new AtomicInteger(0);
- ResourceActions resourceActions =
- createResourceActionsBuilder()
+ ResourceAllocator resourceAllocator =
+ createResourceAllocatorBuilder()
.setAllocateResourceFunction(
ignored -> {
resourceRequests.incrementAndGet();
@@ -262,7 +262,7 @@ public class TaskExecutorManagerTest extends TestLogger {
.setDefaultWorkerResourceSpec(workerResourceSpec)
.setNumSlotsPerWorker(1)
.setMaxNumSlots(1)
- .setResourceActions(resourceActions)
+ .setResourceAllocator(resourceAllocator)
.createTaskExecutorManager()) {
assertThat(
@@ -281,8 +281,8 @@ public class TaskExecutorManagerTest extends TestLogger {
final int maxSlotNum = 1;
final AtomicInteger resourceRequests = new AtomicInteger(0);
- ResourceActions resourceActions =
- createResourceActionsBuilder()
+ ResourceAllocator resourceAllocator =
+ createResourceAllocatorBuilder()
.setAllocateResourceFunction(
ignored -> {
resourceRequests.incrementAndGet();
@@ -294,7 +294,7 @@ public class TaskExecutorManagerTest extends TestLogger {
createTaskExecutorManagerBuilder()
.setNumSlotsPerWorker(numberSlots)
.setMaxNumSlots(maxSlotNum)
- .setResourceActions(resourceActions)
+ .setResourceAllocator(resourceAllocator)
.createTaskExecutorManager()) {
assertThat(resourceRequests.get(), is(0));
@@ -317,8 +317,8 @@ public class TaskExecutorManagerTest extends TestLogger {
final int maxSlotNum = 1;
final CompletableFuture<InstanceID> releasedResourceFuture = new CompletableFuture<>();
- ResourceActions resourceActions =
- createResourceActionsBuilder()
+ ResourceAllocator resourceAllocator =
+ createResourceAllocatorBuilder()
.setReleaseResourceConsumer(
(instanceID, e) -> releasedResourceFuture.complete(instanceID))
.build();
@@ -327,7 +327,7 @@ public class TaskExecutorManagerTest extends TestLogger {
createTaskExecutorManagerBuilder()
.setNumSlotsPerWorker(numberSlots)
.setMaxNumSlots(maxSlotNum)
- .setResourceActions(resourceActions)
+ .setResourceAllocator(resourceAllocator)
.createTaskExecutorManager()) {
createAndRegisterTaskExecutor(taskExecutorManager, 1, ResourceProfile.ANY);
@@ -376,11 +376,11 @@ public class TaskExecutorManagerTest extends TestLogger {
private static TaskExecutorManagerBuilder createTaskExecutorManagerBuilder() {
return new TaskExecutorManagerBuilder(
new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()))
- .setResourceActions(createResourceActionsBuilder().build());
+ .setResourceAllocator(createResourceAllocatorBuilder().build());
}
- private static TestingResourceActionsBuilder createResourceActionsBuilder() {
- return new TestingResourceActionsBuilder()
+ private static TestingResourceAllocatorBuilder createResourceAllocatorBuilder() {
+ return new TestingResourceAllocatorBuilder()
// ensures we do something when excess resource are requested
.setAllocateResourceFunction(ignored -> true);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java
similarity index 67%
rename from flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java
rename to flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java
index a302b2001e7..83207ce41cd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocator.java
@@ -18,37 +18,26 @@
package org.apache.flink.runtime.resourcemanager.slotmanager;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
-import org.apache.flink.runtime.slots.ResourceRequirement;
import javax.annotation.Nonnull;
-import java.util.Collection;
import java.util.function.BiConsumer;
import java.util.function.Function;
-/** Testing implementation of the {@link ResourceActions}. */
-public class TestingResourceActions implements ResourceActions {
+/** Testing implementation of the {@link ResourceAllocator}. */
+public class TestingResourceAllocator implements ResourceAllocator {
@Nonnull private final BiConsumer<InstanceID, Exception> releaseResourceConsumer;
@Nonnull private final Function<WorkerResourceSpec, Boolean> allocateResourceFunction;
- @Nonnull
- private final BiConsumer<JobID, Collection<ResourceRequirement>>
- notifyNotEnoughResourcesConsumer;
-
- public TestingResourceActions(
+ public TestingResourceAllocator(
@Nonnull BiConsumer<InstanceID, Exception> releaseResourceConsumer,
- @Nonnull Function<WorkerResourceSpec, Boolean> allocateResourceFunction,
- @Nonnull
- BiConsumer<JobID, Collection<ResourceRequirement>>
- notifyNotEnoughResourcesConsumer) {
+ @Nonnull Function<WorkerResourceSpec, Boolean> allocateResourceFunction) {
this.releaseResourceConsumer = releaseResourceConsumer;
this.allocateResourceFunction = allocateResourceFunction;
- this.notifyNotEnoughResourcesConsumer = notifyNotEnoughResourcesConsumer;
}
@Override
@@ -60,10 +49,4 @@ public class TestingResourceActions implements ResourceActions {
public boolean allocateResource(WorkerResourceSpec workerResourceSpec) {
return allocateResourceFunction.apply(workerResourceSpec);
}
-
- @Override
- public void notifyNotEnoughResourcesAvailable(
- JobID jobId, Collection<ResourceRequirement> acquiredResources) {
- notifyNotEnoughResourcesConsumer.accept(jobId, acquiredResources);
- }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocatorBuilder.java
similarity index 64%
rename from flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java
rename to flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocatorBuilder.java
index a2c1a07c30a..8dc5abd2a81 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocatorBuilder.java
@@ -18,36 +18,31 @@
package org.apache.flink.runtime.resourcemanager.slotmanager;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
-import org.apache.flink.runtime.slots.ResourceRequirement;
-import java.util.Collection;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
-/** Builder for the {@link TestingResourceActions}. */
-public class TestingResourceActionsBuilder {
+/** Builder for the {@link TestingResourceAllocator}. */
+public class TestingResourceAllocatorBuilder {
private BiConsumer<InstanceID, Exception> releaseResourceConsumer = (ignoredA, ignoredB) -> {};
private Function<WorkerResourceSpec, Boolean> allocateResourceFunction = (ignored) -> true;
- private BiConsumer<JobID, Collection<ResourceRequirement>> notifyNotEnoughResourcesConsumer =
- (ignoredA, ignoredB) -> {};
- public TestingResourceActionsBuilder setReleaseResourceConsumer(
+ public TestingResourceAllocatorBuilder setReleaseResourceConsumer(
BiConsumer<InstanceID, Exception> releaseResourceConsumer) {
this.releaseResourceConsumer = releaseResourceConsumer;
return this;
}
- public TestingResourceActionsBuilder setAllocateResourceFunction(
+ public TestingResourceAllocatorBuilder setAllocateResourceFunction(
Function<WorkerResourceSpec, Boolean> allocateResourceFunction) {
this.allocateResourceFunction = allocateResourceFunction;
return this;
}
- public TestingResourceActionsBuilder setAllocateResourceConsumer(
+ public TestingResourceAllocatorBuilder setAllocateResourceConsumer(
Consumer<WorkerResourceSpec> allocateResourceConsumer) {
this.allocateResourceFunction =
workerRequest -> {
@@ -57,16 +52,7 @@ public class TestingResourceActionsBuilder {
return this;
}
- public TestingResourceActionsBuilder setNotEnoughResourcesConsumer(
- BiConsumer<JobID, Collection<ResourceRequirement>> notifyNotEnoughResourcesConsumer) {
- this.notifyNotEnoughResourcesConsumer = notifyNotEnoughResourcesConsumer;
- return this;
- }
-
- public TestingResourceActions build() {
- return new TestingResourceActions(
- releaseResourceConsumer,
- allocateResourceFunction,
- notifyNotEnoughResourcesConsumer);
+ public TestingResourceAllocator build() {
+ return new TestingResourceAllocator(releaseResourceConsumer, allocateResourceFunction);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceEventListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceEventListener.java
new file mode 100644
index 00000000000..bff5f30f04a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceEventListener.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.function.BiConsumer;
+
+/** Testing implementation of the {@link ResourceEventListener}. */
+public class TestingResourceEventListener implements ResourceEventListener {
+
+ @Nonnull
+ private final BiConsumer<JobID, Collection<ResourceRequirement>>
+ notEnoughResourceAvailableConsumer;
+
+ public TestingResourceEventListener(
+ @Nonnull
+ BiConsumer<JobID, Collection<ResourceRequirement>>
+ notEnoughResourceAvailableConsumer) {
+ this.notEnoughResourceAvailableConsumer = notEnoughResourceAvailableConsumer;
+ }
+
+ @Override
+ public void notEnoughResourceAvailable(
+ JobID jobId, Collection<ResourceRequirement> acquiredResources) {
+ notEnoughResourceAvailableConsumer.accept(jobId, acquiredResources);
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceEventListenerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceEventListenerBuilder.java
new file mode 100644
index 00000000000..5e2e8d884ec
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceEventListenerBuilder.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+
+import java.util.Collection;
+import java.util.function.BiConsumer;
+
+/** Builder for the {@link TestingResourceEventListener}. */
+public class TestingResourceEventListenerBuilder {
+ private BiConsumer<JobID, Collection<ResourceRequirement>> notEnoughResourceAvailableConsumer =
+ (ignoredA, ignoredB) -> {};
+
+ public TestingResourceEventListenerBuilder setNotEnoughResourceAvailableConsumer(
+ BiConsumer<JobID, Collection<ResourceRequirement>> notEnoughResourceAvailableConsumer) {
+ this.notEnoughResourceAvailableConsumer = notEnoughResourceAvailableConsumer;
+ return this;
+ }
+
+ public TestingResourceEventListener build() {
+ return new TestingResourceEventListener(notEnoughResourceAvailableConsumer);
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java
index 4494320d488..bb740f8fb59 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingSlotManager.java
@@ -114,7 +114,8 @@ public class TestingSlotManager implements SlotManager {
public void start(
ResourceManagerId newResourceManagerId,
Executor newMainThreadExecutor,
- ResourceActions newResourceActions,
+ ResourceAllocator newResourceAllocator,
+ ResourceEventListener resourceEventListener,
BlockedTaskManagerChecker newBlockedTaskManagerChecker) {}
@Override