You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2021/02/28 10:53:20 UTC
[flink] 04/04: [FLINK-21479][coordination] Provide
TaskManagerResourceInfoProvider to ResourceAllocationStrategy
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b995633c3d0245bcc2bf3ca9b47a4c6758bb927b
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Thu Feb 25 11:08:22 2021 +0800
[FLINK-21479][coordination] Provide TaskManagerResourceInfoProvider to ResourceAllocationStrategy
This closes #15015
---
.../DefaultResourceAllocationStrategy.java | 38 ++++-
.../slotmanager/FineGrainedSlotManager.java | 15 +-
.../slotmanager/ResourceAllocationStrategy.java | 12 +-
.../DefaultResourceAllocationStrategyTest.java | 51 ++++---
.../slotmanager/FineGrainedSlotManagerTest.java | 8 +-
.../TestingResourceAllocationStrategy.java | 34 ++---
.../slotmanager/TestingTaskManagerInfo.java | 101 +++++++++++++
.../TestingTaskManagerResourceInfoProvider.java | 167 +++++++++++++++++++++
8 files changed, 346 insertions(+), 80 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java
index ed01816..569f906 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.util.Preconditions;
import java.util.Collection;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -67,15 +66,17 @@ public class DefaultResourceAllocationStrategy implements ResourceAllocationStra
@Override
public ResourceAllocationResult tryFulfillRequirements(
Map<JobID, Collection<ResourceRequirement>> missingResources,
- Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> registeredResources,
- List<PendingTaskManager> pendingTaskManagers) {
+ TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
final ResourceAllocationResult.Builder resultBuilder = ResourceAllocationResult.builder();
+
+ // Tuples of available and default slot resource for registered task managers, indexed by
+ // instanceId
+ final Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> registeredResources =
+ getRegisteredResources(taskManagerResourceInfoProvider);
+ // Available resources of pending task managers, indexed by the pendingTaskManagerId
final Map<PendingTaskManagerId, ResourceProfile> pendingResources =
- pendingTaskManagers.stream()
- .collect(
- Collectors.toMap(
- PendingTaskManager::getPendingTaskManagerId,
- PendingTaskManager::getTotalResourceProfile));
+ getPendingResources(taskManagerResourceInfoProvider);
+
for (Map.Entry<JobID, Collection<ResourceRequirement>> resourceRequirements :
missingResources.entrySet()) {
final JobID jobId = resourceRequirements.getKey();
@@ -95,6 +96,27 @@ public class DefaultResourceAllocationStrategy implements ResourceAllocationStra
return resultBuilder.build();
}
+ private static Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> getRegisteredResources(
+ TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
+ return taskManagerResourceInfoProvider.getRegisteredTaskManagers().stream()
+ .collect(
+ Collectors.toMap(
+ TaskManagerInfo::getInstanceId,
+ taskManager ->
+ Tuple2.of(
+ taskManager.getAvailableResource(),
+ taskManager.getDefaultSlotResourceProfile())));
+ }
+
+ private static Map<PendingTaskManagerId, ResourceProfile> getPendingResources(
+ TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
+ return taskManagerResourceInfoProvider.getPendingTaskManagers().stream()
+ .collect(
+ Collectors.toMap(
+ PendingTaskManager::getPendingTaskManagerId,
+ PendingTaskManager::getTotalResourceProfile));
+ }
+
private static ResourceCounter tryFulfillRequirementsForJobWithRegisteredResources(
JobID jobId,
Collection<ResourceRequirement> missingResources,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
index 7f2d03a..f6c6890 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.resourcemanager.slotmanager;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
@@ -452,21 +451,9 @@ public class FineGrainedSlotManager implements SlotManager {
Collectors.toMap(
Map.Entry::getKey, e -> new ArrayList<>(e.getValue())));
- final Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> availableResources =
- taskManagerTracker.getRegisteredTaskManagers().stream()
- .collect(
- Collectors.toMap(
- TaskManagerInfo::getInstanceId,
- taskManager ->
- Tuple2.of(
- taskManager.getAvailableResource(),
- taskManager
- .getDefaultSlotResourceProfile())));
final ResourceAllocationResult result =
resourceAllocationStrategy.tryFulfillRequirements(
- missingResources,
- availableResources,
- new ArrayList<>(taskManagerTracker.getPendingTaskManagers()));
+ missingResources, taskManagerTracker);
// Allocate slots according to the result
allocateSlotsAccordingTo(result.getAllocationsOnRegisteredResources());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java
index 71fad04..2585e11 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java
@@ -19,13 +19,9 @@
package org.apache.flink.runtime.resourcemanager.slotmanager;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.slots.ResourceRequirement;
import java.util.Collection;
-import java.util.List;
import java.util.Map;
/** Strategy for allocating slots and task managers to fulfill the unfulfilled requirements. */
@@ -39,14 +35,12 @@ public interface ResourceAllocationStrategy {
* input arguments. If the arguments are reused elsewhere, please make a deep copy in advance.
*
* @param missingResources resource requirements that are not yet fulfilled, indexed by jobId
- * @param registeredResources tuples of available and default slot resource for registered task
- * managers, indexed by instanceId
- * @param pendingTaskManagers available and default slot resources of pending task managers
+ * @param taskManagerResourceInfoProvider provide the registered/pending resources of the
+ * current cluster
* @return a {@link ResourceAllocationResult} based on the current status, which contains
* whether the requirements can be fulfilled and the actions to take
*/
ResourceAllocationResult tryFulfillRequirements(
Map<JobID, Collection<ResourceRequirement>> missingResources,
- Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> registeredResources,
- List<PendingTaskManager> pendingTaskManagers);
+ TaskManagerResourceInfoProvider taskManagerResourceInfoProvider);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java
index b02f523..15cccd7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java
@@ -19,9 +19,7 @@
package org.apache.flink.runtime.resourcemanager.slotmanager;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.util.TestLogger;
@@ -30,7 +28,6 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -49,56 +46,61 @@ public class DefaultResourceAllocationStrategyTest extends TestLogger {
@Test
public void testFulfillRequirementWithRegisteredResources() {
- final Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> registeredResources =
- new HashMap<>();
+ final TaskManagerInfo taskManager =
+ new TestingTaskManagerInfo(
+ DEFAULT_SLOT_RESOURCE.multiply(10),
+ DEFAULT_SLOT_RESOURCE.multiply(10),
+ DEFAULT_SLOT_RESOURCE);
final JobID jobId = new JobID();
- final InstanceID instanceId = new InstanceID();
final List<ResourceRequirement> requirements = new ArrayList<>();
final ResourceProfile largeResource = DEFAULT_SLOT_RESOURCE.multiply(8);
+ final TaskManagerResourceInfoProvider taskManagerResourceInfoProvider =
+ TestingTaskManagerResourceInfoProvider.newBuilder()
+ .setRegisteredTaskManagersSupplier(() -> Collections.singleton(taskManager))
+ .build();
requirements.add(ResourceRequirement.create(largeResource, 1));
requirements.add(ResourceRequirement.create(ResourceProfile.UNKNOWN, 2));
- registeredResources.put(
- instanceId, Tuple2.of(DEFAULT_SLOT_RESOURCE.multiply(10), DEFAULT_SLOT_RESOURCE));
final ResourceAllocationResult result =
STRATEGY.tryFulfillRequirements(
Collections.singletonMap(jobId, requirements),
- registeredResources,
- Collections.emptyList());
+ taskManagerResourceInfoProvider);
assertThat(result.getUnfulfillableJobs(), is(empty()));
assertThat(result.getAllocationsOnPendingResources().keySet(), is(empty()));
assertThat(result.getPendingTaskManagersToAllocate(), is(empty()));
assertThat(
result.getAllocationsOnRegisteredResources()
.get(jobId)
- .get(instanceId)
+ .get(taskManager.getInstanceId())
.getResourceCount(DEFAULT_SLOT_RESOURCE),
is(2));
assertThat(
result.getAllocationsOnRegisteredResources()
.get(jobId)
- .get(instanceId)
+ .get(taskManager.getInstanceId())
.getResourceCount(largeResource),
is(1));
}
@Test
public void testFulfillRequirementWithPendingResources() {
- final List<PendingTaskManager> pendingTaskManagers = new ArrayList<>();
final JobID jobId = new JobID();
final List<ResourceRequirement> requirements = new ArrayList<>();
final ResourceProfile largeResource = DEFAULT_SLOT_RESOURCE.multiply(3);
final PendingTaskManager pendingTaskManager =
new PendingTaskManager(DEFAULT_SLOT_RESOURCE.multiply(NUM_OF_SLOTS), NUM_OF_SLOTS);
+ final TaskManagerResourceInfoProvider taskManagerResourceInfoProvider =
+ TestingTaskManagerResourceInfoProvider.newBuilder()
+ .setPendingTaskManagersSupplier(
+ () -> Collections.singleton(pendingTaskManager))
+ .build();
requirements.add(ResourceRequirement.create(largeResource, 1));
requirements.add(ResourceRequirement.create(ResourceProfile.UNKNOWN, 7));
- pendingTaskManagers.add(pendingTaskManager);
final ResourceAllocationResult result =
STRATEGY.tryFulfillRequirements(
Collections.singletonMap(jobId, requirements),
- Collections.emptyMap(),
- pendingTaskManagers);
+ taskManagerResourceInfoProvider);
assertThat(result.getUnfulfillableJobs(), is(empty()));
assertThat(result.getAllocationsOnRegisteredResources().keySet(), is(empty()));
assertThat(result.getPendingTaskManagersToAllocate().size(), is(1));
@@ -130,21 +132,24 @@ public class DefaultResourceAllocationStrategyTest extends TestLogger {
@Test
public void testUnfulfillableRequirement() {
- final Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> registeredResources =
- new HashMap<>();
+ final TaskManagerInfo taskManager =
+ new TestingTaskManagerInfo(
+ DEFAULT_SLOT_RESOURCE.multiply(NUM_OF_SLOTS),
+ DEFAULT_SLOT_RESOURCE.multiply(NUM_OF_SLOTS),
+ DEFAULT_SLOT_RESOURCE);
final JobID jobId = new JobID();
final List<ResourceRequirement> requirements = new ArrayList<>();
final ResourceProfile unfulfillableResource = DEFAULT_SLOT_RESOURCE.multiply(8);
+ final TaskManagerResourceInfoProvider taskManagerResourceInfoProvider =
+ TestingTaskManagerResourceInfoProvider.newBuilder()
+ .setRegisteredTaskManagersSupplier(() -> Collections.singleton(taskManager))
+ .build();
requirements.add(ResourceRequirement.create(unfulfillableResource, 1));
- registeredResources.put(
- new InstanceID(),
- Tuple2.of(DEFAULT_SLOT_RESOURCE.multiply(NUM_OF_SLOTS), DEFAULT_SLOT_RESOURCE));
final ResourceAllocationResult result =
STRATEGY.tryFulfillRequirements(
Collections.singletonMap(jobId, requirements),
- registeredResources,
- Collections.emptyList());
+ taskManagerResourceInfoProvider);
assertThat(result.getUnfulfillableJobs(), contains(jobId));
assertThat(result.getPendingTaskManagersToAllocate(), is(empty()));
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
index dbb0e42..038b9b2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
@@ -286,7 +286,7 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
new Context() {
{
resourceAllocationStrategyBuilder.setTryFulfillRequirementsFunction(
- ((jobIDCollectionMap, instanceIDTuple2Map, pendingTaskManagers) ->
+ ((jobIDCollectionMap, taskManagerResourceInfoProvider) ->
ResourceAllocationResult.builder()
.addAllocationOnRegisteredResource(
jobId,
@@ -333,7 +333,7 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
return true;
});
resourceAllocationStrategyBuilder.setTryFulfillRequirementsFunction(
- ((jobIDCollectionMap, instanceIDTuple2Map, pendingTaskManagers) ->
+ ((jobIDCollectionMap, taskManagerResourceInfoProvider) ->
ResourceAllocationResult.builder()
.addPendingTaskManagerAllocate(
new PendingTaskManager(
@@ -379,7 +379,7 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
new Context() {
{
resourceAllocationStrategyBuilder.setTryFulfillRequirementsFunction(
- ((jobIDCollectionMap, instanceIDTuple2Map, pendingTaskManagers) ->
+ ((jobIDCollectionMap, taskManagerResourceInfoProvider) ->
ResourceAllocationResult.builder()
.addPendingTaskManagerAllocate(pendingTaskManager)
.addAllocationOnPendingResource(
@@ -437,7 +437,7 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
notEnoughResourceNotifications.add(
Tuple2.of(jobId1, acquiredResources)));
resourceAllocationStrategyBuilder.setTryFulfillRequirementsFunction(
- ((jobIDCollectionMap, instanceIDTuple2Map, pendingTaskManagers) ->
+ ((jobIDCollectionMap, taskManagerResourceInfoProvider) ->
ResourceAllocationResult.builder()
.addUnfulfillableJob(jobId)
.build()));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocationStrategy.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocationStrategy.java
index cfdfcd2..a744527 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocationStrategy.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocationStrategy.java
@@ -18,31 +18,25 @@
package org.apache.flink.runtime.resourcemanager.slotmanager;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.function.TriFunction;
import java.util.Collection;
-import java.util.List;
import java.util.Map;
+import java.util.function.BiFunction;
/** Implementation of {@link ResourceAllocationStrategy} for testing purpose. */
public class TestingResourceAllocationStrategy implements ResourceAllocationStrategy {
- private final TriFunction<
+ private final BiFunction<
Map<JobID, Collection<ResourceRequirement>>,
- Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>>,
- List<PendingTaskManager>,
+ TaskManagerResourceInfoProvider,
ResourceAllocationResult>
tryFulfillRequirementsFunction;
private TestingResourceAllocationStrategy(
- TriFunction<
+ BiFunction<
Map<JobID, Collection<ResourceRequirement>>,
- Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>>,
- List<PendingTaskManager>,
+ TaskManagerResourceInfoProvider,
ResourceAllocationResult>
tryFulfillRequirementsFunction) {
this.tryFulfillRequirementsFunction =
@@ -52,10 +46,9 @@ public class TestingResourceAllocationStrategy implements ResourceAllocationStra
@Override
public ResourceAllocationResult tryFulfillRequirements(
Map<JobID, Collection<ResourceRequirement>> missingResources,
- Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> registeredResources,
- List<PendingTaskManager> pendingTaskManagers) {
+ TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
return tryFulfillRequirementsFunction.apply(
- missingResources, registeredResources, pendingTaskManagers);
+ missingResources, taskManagerResourceInfoProvider);
}
public static Builder newBuilder() {
@@ -63,20 +56,17 @@ public class TestingResourceAllocationStrategy implements ResourceAllocationStra
}
public static class Builder {
- private TriFunction<
+ private BiFunction<
Map<JobID, Collection<ResourceRequirement>>,
- Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>>,
- List<PendingTaskManager>,
+ TaskManagerResourceInfoProvider,
ResourceAllocationResult>
tryFulfillRequirementsFunction =
- (ignored0, ignored1, ignored2) ->
- ResourceAllocationResult.builder().build();
+ (ignored0, ignored1) -> ResourceAllocationResult.builder().build();
public Builder setTryFulfillRequirementsFunction(
- TriFunction<
+ BiFunction<
Map<JobID, Collection<ResourceRequirement>>,
- Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>>,
- List<PendingTaskManager>,
+ TaskManagerResourceInfoProvider,
ResourceAllocationResult>
tryFulfillRequirementsFunction) {
this.tryFulfillRequirementsFunction = tryFulfillRequirementsFunction;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingTaskManagerInfo.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingTaskManagerInfo.java
new file mode 100644
index 0000000..50ad317
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingTaskManagerInfo.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.Map;
+
+/** Implementation of {@link TaskManagerInfo} for testing purpose. */
+public class TestingTaskManagerInfo implements TaskManagerInfo {
+ private final TaskExecutorConnection taskExecutorConnection;
+ private final ResourceProfile totalResource;
+ private final ResourceProfile availableResource;
+ private final ResourceProfile defaultSlotResourceProfile;
+ private final int defaultNumSlots;
+
+ public TestingTaskManagerInfo(
+ ResourceProfile totalResource,
+ ResourceProfile availableResource,
+ ResourceProfile defaultSlotResourceProfile) {
+ this.totalResource = Preconditions.checkNotNull(totalResource);
+ this.availableResource = Preconditions.checkNotNull(availableResource);
+ this.defaultSlotResourceProfile = Preconditions.checkNotNull(defaultSlotResourceProfile);
+
+ this.defaultNumSlots =
+ SlotManagerUtils.calculateDefaultNumSlots(
+ totalResource, defaultSlotResourceProfile);
+ this.taskExecutorConnection =
+ new TaskExecutorConnection(
+ ResourceID.generate(),
+ new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
+ }
+
+ @Override
+ public InstanceID getInstanceId() {
+ return taskExecutorConnection.getInstanceID();
+ }
+
+ @Override
+ public TaskExecutorConnection getTaskExecutorConnection() {
+ return taskExecutorConnection;
+ }
+
+ @Override
+ public Map<AllocationID, TaskManagerSlotInformation> getAllocatedSlots() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public ResourceProfile getAvailableResource() {
+ return availableResource;
+ }
+
+ @Override
+ public ResourceProfile getTotalResource() {
+ return totalResource;
+ }
+
+ @Override
+ public ResourceProfile getDefaultSlotResourceProfile() {
+ return defaultSlotResourceProfile;
+ }
+
+ @Override
+ public int getDefaultNumSlots() {
+ return defaultNumSlots;
+ }
+
+ @Override
+ public long getIdleSince() {
+ return Long.MAX_VALUE;
+ }
+
+ @Override
+ public boolean isIdle() {
+ return false;
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingTaskManagerResourceInfoProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingTaskManagerResourceInfoProvider.java
new file mode 100644
index 0000000..c904cd7
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingTaskManagerResourceInfoProvider.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.util.ResourceCounter;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/** Implementation of {@link TaskManagerResourceInfoProvider} for testing purpose. */
+public class TestingTaskManagerResourceInfoProvider implements TaskManagerResourceInfoProvider {
+ private final Function<PendingTaskManagerId, Map<JobID, ResourceCounter>>
+ getPendingAllocationsOfPendingTaskManagerFunction;
+ private final Supplier<Collection<? extends TaskManagerInfo>> registeredTaskManagersSupplier;
+ private final Function<InstanceID, Optional<TaskManagerInfo>> getRegisteredTaskManagerFunction;
+ private final Supplier<Collection<PendingTaskManager>> pendingTaskManagersSupplier;
+ private final Function<AllocationID, Optional<TaskManagerSlotInformation>>
+ getAllocatedOrPendingSlotFunction;
+ private final Supplier<ClusterResourceOverview> clusterResourceOverviewSupplier;
+
+ private TestingTaskManagerResourceInfoProvider(
+ Function<PendingTaskManagerId, Map<JobID, ResourceCounter>>
+ getPendingAllocationsOfPendingTaskManagerFunction,
+ Supplier<Collection<? extends TaskManagerInfo>> registeredTaskManagersSupplier,
+ Function<InstanceID, Optional<TaskManagerInfo>> getRegisteredTaskManagerFunction,
+ Supplier<Collection<PendingTaskManager>> pendingTaskManagersSupplier,
+ Function<AllocationID, Optional<TaskManagerSlotInformation>>
+ getAllocatedOrPendingSlotFunction,
+ Supplier<ClusterResourceOverview> clusterResourceOverviewSupplier) {
+ this.getPendingAllocationsOfPendingTaskManagerFunction =
+ Preconditions.checkNotNull(getPendingAllocationsOfPendingTaskManagerFunction);
+ this.registeredTaskManagersSupplier =
+ Preconditions.checkNotNull(registeredTaskManagersSupplier);
+ this.getRegisteredTaskManagerFunction =
+ Preconditions.checkNotNull(getRegisteredTaskManagerFunction);
+ this.pendingTaskManagersSupplier = Preconditions.checkNotNull(pendingTaskManagersSupplier);
+ this.getAllocatedOrPendingSlotFunction =
+ Preconditions.checkNotNull(getAllocatedOrPendingSlotFunction);
+ this.clusterResourceOverviewSupplier =
+ Preconditions.checkNotNull(clusterResourceOverviewSupplier);
+ }
+
+ @Override
+ public Map<JobID, ResourceCounter> getPendingAllocationsOfPendingTaskManager(
+ PendingTaskManagerId pendingTaskManagerId) {
+ return getPendingAllocationsOfPendingTaskManagerFunction.apply(pendingTaskManagerId);
+ }
+
+ @Override
+ public Collection<? extends TaskManagerInfo> getRegisteredTaskManagers() {
+ return registeredTaskManagersSupplier.get();
+ }
+
+ @Override
+ public Optional<TaskManagerInfo> getRegisteredTaskManager(InstanceID instanceId) {
+ return getRegisteredTaskManagerFunction.apply(instanceId);
+ }
+
+ @Override
+ public Collection<PendingTaskManager> getPendingTaskManagers() {
+ return pendingTaskManagersSupplier.get();
+ }
+
+ @Override
+ public Optional<TaskManagerSlotInformation> getAllocatedOrPendingSlot(
+ AllocationID allocationId) {
+ return getAllocatedOrPendingSlotFunction.apply(allocationId);
+ }
+
+ @Override
+ public ClusterResourceOverview getClusterResourceOverview() {
+ return clusterResourceOverviewSupplier.get();
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private Function<PendingTaskManagerId, Map<JobID, ResourceCounter>>
+ getPendingAllocationsOfPendingTaskManagerFunction =
+ ignore -> Collections.emptyMap();
+ private Supplier<Collection<? extends TaskManagerInfo>> registeredTaskManagersSupplier =
+ Collections::emptyList;
+ private Function<InstanceID, Optional<TaskManagerInfo>> getRegisteredTaskManagerFunction =
+ ignore -> Optional.empty();
+ private Supplier<Collection<PendingTaskManager>> pendingTaskManagersSupplier =
+ Collections::emptyList;
+ private Function<AllocationID, Optional<TaskManagerSlotInformation>>
+ getAllocatedOrPendingSlotFunction = ignore -> Optional.empty();
+ private Supplier<ClusterResourceOverview> clusterResourceOverviewSupplier =
+ () -> new ClusterResourceOverview(Collections.emptyMap());
+
+ public Builder setClusterResourceOverviewSupplier(
+ Supplier<ClusterResourceOverview> clusterResourceOverviewSupplier) {
+ this.clusterResourceOverviewSupplier = clusterResourceOverviewSupplier;
+ return this;
+ }
+
+ public Builder setGetAllocatedOrPendingSlotFunction(
+ Function<AllocationID, Optional<TaskManagerSlotInformation>>
+ getAllocatedOrPendingSlotFunction) {
+ this.getAllocatedOrPendingSlotFunction = getAllocatedOrPendingSlotFunction;
+ return this;
+ }
+
+ public Builder setGetPendingAllocationsOfPendingTaskManagerFunction(
+ Function<PendingTaskManagerId, Map<JobID, ResourceCounter>>
+ getPendingAllocationsOfPendingTaskManagerFunction) {
+ this.getPendingAllocationsOfPendingTaskManagerFunction =
+ getPendingAllocationsOfPendingTaskManagerFunction;
+ return this;
+ }
+
+ public Builder setGetRegisteredTaskManagerFunction(
+ Function<InstanceID, Optional<TaskManagerInfo>> getRegisteredTaskManagerFunction) {
+ this.getRegisteredTaskManagerFunction = getRegisteredTaskManagerFunction;
+ return this;
+ }
+
+ public Builder setPendingTaskManagersSupplier(
+ Supplier<Collection<PendingTaskManager>> pendingTaskManagersSupplier) {
+ this.pendingTaskManagersSupplier = pendingTaskManagersSupplier;
+ return this;
+ }
+
+ public Builder setRegisteredTaskManagersSupplier(
+ Supplier<Collection<? extends TaskManagerInfo>> registeredTaskManagersSupplier) {
+ this.registeredTaskManagersSupplier = registeredTaskManagersSupplier;
+ return this;
+ }
+
+ public TestingTaskManagerResourceInfoProvider build() {
+ return new TestingTaskManagerResourceInfoProvider(
+ getPendingAllocationsOfPendingTaskManagerFunction,
+ registeredTaskManagersSupplier,
+ getRegisteredTaskManagerFunction,
+ pendingTaskManagersSupplier,
+ getAllocatedOrPendingSlotFunction,
+ clusterResourceOverviewSupplier);
+ }
+ }
+}