You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2021/02/28 10:53:20 UTC

[flink] 04/04: [FLINK-21479][coordination] Provide TaskManagerResourceInfoProvider to ResourceAllocationStrategy

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b995633c3d0245bcc2bf3ca9b47a4c6758bb927b
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Thu Feb 25 11:08:22 2021 +0800

    [FLINK-21479][coordination] Provide TaskManagerResourceInfoProvider to ResourceAllocationStrategy
    
    This closes #15015
---
 .../DefaultResourceAllocationStrategy.java         |  38 ++++-
 .../slotmanager/FineGrainedSlotManager.java        |  15 +-
 .../slotmanager/ResourceAllocationStrategy.java    |  12 +-
 .../DefaultResourceAllocationStrategyTest.java     |  51 ++++---
 .../slotmanager/FineGrainedSlotManagerTest.java    |   8 +-
 .../TestingResourceAllocationStrategy.java         |  34 ++---
 .../slotmanager/TestingTaskManagerInfo.java        | 101 +++++++++++++
 .../TestingTaskManagerResourceInfoProvider.java    | 167 +++++++++++++++++++++
 8 files changed, 346 insertions(+), 80 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java
index ed01816..569f906 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.util.ResourceCounter;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
@@ -67,15 +66,17 @@ public class DefaultResourceAllocationStrategy implements ResourceAllocationStra
     @Override
     public ResourceAllocationResult tryFulfillRequirements(
             Map<JobID, Collection<ResourceRequirement>> missingResources,
-            Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> registeredResources,
-            List<PendingTaskManager> pendingTaskManagers) {
+            TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
         final ResourceAllocationResult.Builder resultBuilder = ResourceAllocationResult.builder();
+
+        // Tuples of available and default slot resource for registered task managers, indexed by
+        // instanceId
+        final Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> registeredResources =
+                getRegisteredResources(taskManagerResourceInfoProvider);
+        // Available resources of pending task managers, indexed by the pendingTaskManagerId
         final Map<PendingTaskManagerId, ResourceProfile> pendingResources =
-                pendingTaskManagers.stream()
-                        .collect(
-                                Collectors.toMap(
-                                        PendingTaskManager::getPendingTaskManagerId,
-                                        PendingTaskManager::getTotalResourceProfile));
+                getPendingResources(taskManagerResourceInfoProvider);
+
         for (Map.Entry<JobID, Collection<ResourceRequirement>> resourceRequirements :
                 missingResources.entrySet()) {
             final JobID jobId = resourceRequirements.getKey();
@@ -95,6 +96,27 @@ public class DefaultResourceAllocationStrategy implements ResourceAllocationStra
         return resultBuilder.build();
     }
 
+    private static Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> getRegisteredResources(
+            TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
+        return taskManagerResourceInfoProvider.getRegisteredTaskManagers().stream()
+                .collect(
+                        Collectors.toMap(
+                                TaskManagerInfo::getInstanceId,
+                                taskManager ->
+                                        Tuple2.of(
+                                                taskManager.getAvailableResource(),
+                                                taskManager.getDefaultSlotResourceProfile())));
+    }
+
+    private static Map<PendingTaskManagerId, ResourceProfile> getPendingResources(
+            TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
+        return taskManagerResourceInfoProvider.getPendingTaskManagers().stream()
+                .collect(
+                        Collectors.toMap(
+                                PendingTaskManager::getPendingTaskManagerId,
+                                PendingTaskManager::getTotalResourceProfile));
+    }
+
     private static ResourceCounter tryFulfillRequirementsForJobWithRegisteredResources(
             JobID jobId,
             Collection<ResourceRequirement> missingResources,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
index 7f2d03a..f6c6890 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.resourcemanager.slotmanager;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
@@ -452,21 +451,9 @@ public class FineGrainedSlotManager implements SlotManager {
                                 Collectors.toMap(
                                         Map.Entry::getKey, e -> new ArrayList<>(e.getValue())));
 
-        final Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> availableResources =
-                taskManagerTracker.getRegisteredTaskManagers().stream()
-                        .collect(
-                                Collectors.toMap(
-                                        TaskManagerInfo::getInstanceId,
-                                        taskManager ->
-                                                Tuple2.of(
-                                                        taskManager.getAvailableResource(),
-                                                        taskManager
-                                                                .getDefaultSlotResourceProfile())));
         final ResourceAllocationResult result =
                 resourceAllocationStrategy.tryFulfillRequirements(
-                        missingResources,
-                        availableResources,
-                        new ArrayList<>(taskManagerTracker.getPendingTaskManagers()));
+                        missingResources, taskManagerTracker);
 
         // Allocate slots according to the result
         allocateSlotsAccordingTo(result.getAllocationsOnRegisteredResources());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java
index 71fad04..2585e11 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java
@@ -19,13 +19,9 @@
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 
 /** Strategy for allocating slots and task managers to fulfill the unfulfilled requirements. */
@@ -39,14 +35,12 @@ public interface ResourceAllocationStrategy {
      * input arguments. If the arguments are reused elsewhere, please make a deep copy in advance.
      *
      * @param missingResources resource requirements that are not yet fulfilled, indexed by jobId
-     * @param registeredResources tuples of available and default slot resource for registered task
-     *     managers, indexed by instanceId
-     * @param pendingTaskManagers available and default slot resources of pending task managers
+     * @param taskManagerResourceInfoProvider provide the registered/pending resources of the
+     *     current cluster
      * @return a {@link ResourceAllocationResult} based on the current status, which contains
      *     whether the requirements can be fulfilled and the actions to take
      */
     ResourceAllocationResult tryFulfillRequirements(
             Map<JobID, Collection<ResourceRequirement>> missingResources,
-            Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> registeredResources,
-            List<PendingTaskManager> pendingTaskManagers);
+            TaskManagerResourceInfoProvider taskManagerResourceInfoProvider);
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java
index b02f523..15cccd7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java
@@ -19,9 +19,7 @@
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 import org.apache.flink.runtime.util.ResourceCounter;
 import org.apache.flink.util.TestLogger;
@@ -30,7 +28,6 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -49,56 +46,61 @@ public class DefaultResourceAllocationStrategyTest extends TestLogger {
 
     @Test
     public void testFulfillRequirementWithRegisteredResources() {
-        final Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> registeredResources =
-                new HashMap<>();
+        final TaskManagerInfo taskManager =
+                new TestingTaskManagerInfo(
+                        DEFAULT_SLOT_RESOURCE.multiply(10),
+                        DEFAULT_SLOT_RESOURCE.multiply(10),
+                        DEFAULT_SLOT_RESOURCE);
         final JobID jobId = new JobID();
-        final InstanceID instanceId = new InstanceID();
         final List<ResourceRequirement> requirements = new ArrayList<>();
         final ResourceProfile largeResource = DEFAULT_SLOT_RESOURCE.multiply(8);
+        final TaskManagerResourceInfoProvider taskManagerResourceInfoProvider =
+                TestingTaskManagerResourceInfoProvider.newBuilder()
+                        .setRegisteredTaskManagersSupplier(() -> Collections.singleton(taskManager))
+                        .build();
         requirements.add(ResourceRequirement.create(largeResource, 1));
         requirements.add(ResourceRequirement.create(ResourceProfile.UNKNOWN, 2));
-        registeredResources.put(
-                instanceId, Tuple2.of(DEFAULT_SLOT_RESOURCE.multiply(10), DEFAULT_SLOT_RESOURCE));
 
         final ResourceAllocationResult result =
                 STRATEGY.tryFulfillRequirements(
                         Collections.singletonMap(jobId, requirements),
-                        registeredResources,
-                        Collections.emptyList());
+                        taskManagerResourceInfoProvider);
         assertThat(result.getUnfulfillableJobs(), is(empty()));
         assertThat(result.getAllocationsOnPendingResources().keySet(), is(empty()));
         assertThat(result.getPendingTaskManagersToAllocate(), is(empty()));
         assertThat(
                 result.getAllocationsOnRegisteredResources()
                         .get(jobId)
-                        .get(instanceId)
+                        .get(taskManager.getInstanceId())
                         .getResourceCount(DEFAULT_SLOT_RESOURCE),
                 is(2));
         assertThat(
                 result.getAllocationsOnRegisteredResources()
                         .get(jobId)
-                        .get(instanceId)
+                        .get(taskManager.getInstanceId())
                         .getResourceCount(largeResource),
                 is(1));
     }
 
     @Test
     public void testFulfillRequirementWithPendingResources() {
-        final List<PendingTaskManager> pendingTaskManagers = new ArrayList<>();
         final JobID jobId = new JobID();
         final List<ResourceRequirement> requirements = new ArrayList<>();
         final ResourceProfile largeResource = DEFAULT_SLOT_RESOURCE.multiply(3);
         final PendingTaskManager pendingTaskManager =
                 new PendingTaskManager(DEFAULT_SLOT_RESOURCE.multiply(NUM_OF_SLOTS), NUM_OF_SLOTS);
+        final TaskManagerResourceInfoProvider taskManagerResourceInfoProvider =
+                TestingTaskManagerResourceInfoProvider.newBuilder()
+                        .setPendingTaskManagersSupplier(
+                                () -> Collections.singleton(pendingTaskManager))
+                        .build();
         requirements.add(ResourceRequirement.create(largeResource, 1));
         requirements.add(ResourceRequirement.create(ResourceProfile.UNKNOWN, 7));
-        pendingTaskManagers.add(pendingTaskManager);
 
         final ResourceAllocationResult result =
                 STRATEGY.tryFulfillRequirements(
                         Collections.singletonMap(jobId, requirements),
-                        Collections.emptyMap(),
-                        pendingTaskManagers);
+                        taskManagerResourceInfoProvider);
         assertThat(result.getUnfulfillableJobs(), is(empty()));
         assertThat(result.getAllocationsOnRegisteredResources().keySet(), is(empty()));
         assertThat(result.getPendingTaskManagersToAllocate().size(), is(1));
@@ -130,21 +132,24 @@ public class DefaultResourceAllocationStrategyTest extends TestLogger {
 
     @Test
     public void testUnfulfillableRequirement() {
-        final Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> registeredResources =
-                new HashMap<>();
+        final TaskManagerInfo taskManager =
+                new TestingTaskManagerInfo(
+                        DEFAULT_SLOT_RESOURCE.multiply(NUM_OF_SLOTS),
+                        DEFAULT_SLOT_RESOURCE.multiply(NUM_OF_SLOTS),
+                        DEFAULT_SLOT_RESOURCE);
         final JobID jobId = new JobID();
         final List<ResourceRequirement> requirements = new ArrayList<>();
         final ResourceProfile unfulfillableResource = DEFAULT_SLOT_RESOURCE.multiply(8);
+        final TaskManagerResourceInfoProvider taskManagerResourceInfoProvider =
+                TestingTaskManagerResourceInfoProvider.newBuilder()
+                        .setRegisteredTaskManagersSupplier(() -> Collections.singleton(taskManager))
+                        .build();
         requirements.add(ResourceRequirement.create(unfulfillableResource, 1));
-        registeredResources.put(
-                new InstanceID(),
-                Tuple2.of(DEFAULT_SLOT_RESOURCE.multiply(NUM_OF_SLOTS), DEFAULT_SLOT_RESOURCE));
 
         final ResourceAllocationResult result =
                 STRATEGY.tryFulfillRequirements(
                         Collections.singletonMap(jobId, requirements),
-                        registeredResources,
-                        Collections.emptyList());
+                        taskManagerResourceInfoProvider);
         assertThat(result.getUnfulfillableJobs(), contains(jobId));
         assertThat(result.getPendingTaskManagersToAllocate(), is(empty()));
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
index dbb0e42..038b9b2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
@@ -286,7 +286,7 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
         new Context() {
             {
                 resourceAllocationStrategyBuilder.setTryFulfillRequirementsFunction(
-                        ((jobIDCollectionMap, instanceIDTuple2Map, pendingTaskManagers) ->
+                        ((jobIDCollectionMap, taskManagerResourceInfoProvider) ->
                                 ResourceAllocationResult.builder()
                                         .addAllocationOnRegisteredResource(
                                                 jobId,
@@ -333,7 +333,7 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
                             return true;
                         });
                 resourceAllocationStrategyBuilder.setTryFulfillRequirementsFunction(
-                        ((jobIDCollectionMap, instanceIDTuple2Map, pendingTaskManagers) ->
+                        ((jobIDCollectionMap, taskManagerResourceInfoProvider) ->
                                 ResourceAllocationResult.builder()
                                         .addPendingTaskManagerAllocate(
                                                 new PendingTaskManager(
@@ -379,7 +379,7 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
         new Context() {
             {
                 resourceAllocationStrategyBuilder.setTryFulfillRequirementsFunction(
-                        ((jobIDCollectionMap, instanceIDTuple2Map, pendingTaskManagers) ->
+                        ((jobIDCollectionMap, taskManagerResourceInfoProvider) ->
                                 ResourceAllocationResult.builder()
                                         .addPendingTaskManagerAllocate(pendingTaskManager)
                                         .addAllocationOnPendingResource(
@@ -437,7 +437,7 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
                                 notEnoughResourceNotifications.add(
                                         Tuple2.of(jobId1, acquiredResources)));
                 resourceAllocationStrategyBuilder.setTryFulfillRequirementsFunction(
-                        ((jobIDCollectionMap, instanceIDTuple2Map, pendingTaskManagers) ->
+                        ((jobIDCollectionMap, taskManagerResourceInfoProvider) ->
                                 ResourceAllocationResult.builder()
                                         .addUnfulfillableJob(jobId)
                                         .build()));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocationStrategy.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocationStrategy.java
index cfdfcd2..a744527 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocationStrategy.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocationStrategy.java
@@ -18,31 +18,25 @@
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.function.TriFunction;
 
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
+import java.util.function.BiFunction;
 
 /** Implementation of {@link ResourceAllocationStrategy} for testing purpose. */
 public class TestingResourceAllocationStrategy implements ResourceAllocationStrategy {
-    private final TriFunction<
+    private final BiFunction<
                     Map<JobID, Collection<ResourceRequirement>>,
-                    Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>>,
-                    List<PendingTaskManager>,
+                    TaskManagerResourceInfoProvider,
                     ResourceAllocationResult>
             tryFulfillRequirementsFunction;
 
     private TestingResourceAllocationStrategy(
-            TriFunction<
+            BiFunction<
                             Map<JobID, Collection<ResourceRequirement>>,
-                            Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>>,
-                            List<PendingTaskManager>,
+                            TaskManagerResourceInfoProvider,
                             ResourceAllocationResult>
                     tryFulfillRequirementsFunction) {
         this.tryFulfillRequirementsFunction =
@@ -52,10 +46,9 @@ public class TestingResourceAllocationStrategy implements ResourceAllocationStra
     @Override
     public ResourceAllocationResult tryFulfillRequirements(
             Map<JobID, Collection<ResourceRequirement>> missingResources,
-            Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> registeredResources,
-            List<PendingTaskManager> pendingTaskManagers) {
+            TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
         return tryFulfillRequirementsFunction.apply(
-                missingResources, registeredResources, pendingTaskManagers);
+                missingResources, taskManagerResourceInfoProvider);
     }
 
     public static Builder newBuilder() {
@@ -63,20 +56,17 @@ public class TestingResourceAllocationStrategy implements ResourceAllocationStra
     }
 
     public static class Builder {
-        private TriFunction<
+        private BiFunction<
                         Map<JobID, Collection<ResourceRequirement>>,
-                        Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>>,
-                        List<PendingTaskManager>,
+                        TaskManagerResourceInfoProvider,
                         ResourceAllocationResult>
                 tryFulfillRequirementsFunction =
-                        (ignored0, ignored1, ignored2) ->
-                                ResourceAllocationResult.builder().build();
+                        (ignored0, ignored1) -> ResourceAllocationResult.builder().build();
 
         public Builder setTryFulfillRequirementsFunction(
-                TriFunction<
+                BiFunction<
                                 Map<JobID, Collection<ResourceRequirement>>,
-                                Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>>,
-                                List<PendingTaskManager>,
+                                TaskManagerResourceInfoProvider,
                                 ResourceAllocationResult>
                         tryFulfillRequirementsFunction) {
             this.tryFulfillRequirementsFunction = tryFulfillRequirementsFunction;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingTaskManagerInfo.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingTaskManagerInfo.java
new file mode 100644
index 0000000..50ad317
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingTaskManagerInfo.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.Map;
+
+/** Implementation of {@link TaskManagerInfo} for testing purpose. */
+public class TestingTaskManagerInfo implements TaskManagerInfo {
+    private final TaskExecutorConnection taskExecutorConnection;
+    private final ResourceProfile totalResource;
+    private final ResourceProfile availableResource;
+    private final ResourceProfile defaultSlotResourceProfile;
+    private final int defaultNumSlots;
+
+    public TestingTaskManagerInfo(
+            ResourceProfile totalResource,
+            ResourceProfile availableResource,
+            ResourceProfile defaultSlotResourceProfile) {
+        this.totalResource = Preconditions.checkNotNull(totalResource);
+        this.availableResource = Preconditions.checkNotNull(availableResource);
+        this.defaultSlotResourceProfile = Preconditions.checkNotNull(defaultSlotResourceProfile);
+
+        this.defaultNumSlots =
+                SlotManagerUtils.calculateDefaultNumSlots(
+                        totalResource, defaultSlotResourceProfile);
+        this.taskExecutorConnection =
+                new TaskExecutorConnection(
+                        ResourceID.generate(),
+                        new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
+    }
+
+    @Override
+    public InstanceID getInstanceId() {
+        return taskExecutorConnection.getInstanceID();
+    }
+
+    @Override
+    public TaskExecutorConnection getTaskExecutorConnection() {
+        return taskExecutorConnection;
+    }
+
+    @Override
+    public Map<AllocationID, TaskManagerSlotInformation> getAllocatedSlots() {
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public ResourceProfile getAvailableResource() {
+        return availableResource;
+    }
+
+    @Override
+    public ResourceProfile getTotalResource() {
+        return totalResource;
+    }
+
+    @Override
+    public ResourceProfile getDefaultSlotResourceProfile() {
+        return defaultSlotResourceProfile;
+    }
+
+    @Override
+    public int getDefaultNumSlots() {
+        return defaultNumSlots;
+    }
+
+    @Override
+    public long getIdleSince() {
+        return Long.MAX_VALUE;
+    }
+
+    @Override
+    public boolean isIdle() {
+        return false;
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingTaskManagerResourceInfoProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingTaskManagerResourceInfoProvider.java
new file mode 100644
index 0000000..c904cd7
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingTaskManagerResourceInfoProvider.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.util.ResourceCounter;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/** Implementation of {@link TaskManagerResourceInfoProvider} for testing purpose. */
+public class TestingTaskManagerResourceInfoProvider implements TaskManagerResourceInfoProvider {
+    private final Function<PendingTaskManagerId, Map<JobID, ResourceCounter>>
+            getPendingAllocationsOfPendingTaskManagerFunction;
+    private final Supplier<Collection<? extends TaskManagerInfo>> registeredTaskManagersSupplier;
+    private final Function<InstanceID, Optional<TaskManagerInfo>> getRegisteredTaskManagerFunction;
+    private final Supplier<Collection<PendingTaskManager>> pendingTaskManagersSupplier;
+    private final Function<AllocationID, Optional<TaskManagerSlotInformation>>
+            getAllocatedOrPendingSlotFunction;
+    private final Supplier<ClusterResourceOverview> clusterResourceOverviewSupplier;
+
+    private TestingTaskManagerResourceInfoProvider(
+            Function<PendingTaskManagerId, Map<JobID, ResourceCounter>>
+                    getPendingAllocationsOfPendingTaskManagerFunction,
+            Supplier<Collection<? extends TaskManagerInfo>> registeredTaskManagersSupplier,
+            Function<InstanceID, Optional<TaskManagerInfo>> getRegisteredTaskManagerFunction,
+            Supplier<Collection<PendingTaskManager>> pendingTaskManagersSupplier,
+            Function<AllocationID, Optional<TaskManagerSlotInformation>>
+                    getAllocatedOrPendingSlotFunction,
+            Supplier<ClusterResourceOverview> clusterResourceOverviewSupplier) {
+        this.getPendingAllocationsOfPendingTaskManagerFunction =
+                Preconditions.checkNotNull(getPendingAllocationsOfPendingTaskManagerFunction);
+        this.registeredTaskManagersSupplier =
+                Preconditions.checkNotNull(registeredTaskManagersSupplier);
+        this.getRegisteredTaskManagerFunction =
+                Preconditions.checkNotNull(getRegisteredTaskManagerFunction);
+        this.pendingTaskManagersSupplier = Preconditions.checkNotNull(pendingTaskManagersSupplier);
+        this.getAllocatedOrPendingSlotFunction =
+                Preconditions.checkNotNull(getAllocatedOrPendingSlotFunction);
+        this.clusterResourceOverviewSupplier =
+                Preconditions.checkNotNull(clusterResourceOverviewSupplier);
+    }
+
+    @Override
+    public Map<JobID, ResourceCounter> getPendingAllocationsOfPendingTaskManager(
+            PendingTaskManagerId pendingTaskManagerId) {
+        return getPendingAllocationsOfPendingTaskManagerFunction.apply(pendingTaskManagerId);
+    }
+
+    @Override
+    public Collection<? extends TaskManagerInfo> getRegisteredTaskManagers() {
+        return registeredTaskManagersSupplier.get();
+    }
+
+    @Override
+    public Optional<TaskManagerInfo> getRegisteredTaskManager(InstanceID instanceId) {
+        return getRegisteredTaskManagerFunction.apply(instanceId);
+    }
+
+    @Override
+    public Collection<PendingTaskManager> getPendingTaskManagers() {
+        return pendingTaskManagersSupplier.get();
+    }
+
+    @Override
+    public Optional<TaskManagerSlotInformation> getAllocatedOrPendingSlot(
+            AllocationID allocationId) {
+        return getAllocatedOrPendingSlotFunction.apply(allocationId);
+    }
+
+    @Override
+    public ClusterResourceOverview getClusterResourceOverview() {
+        return clusterResourceOverviewSupplier.get();
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+        private Function<PendingTaskManagerId, Map<JobID, ResourceCounter>>
+                getPendingAllocationsOfPendingTaskManagerFunction =
+                        ignore -> Collections.emptyMap();
+        private Supplier<Collection<? extends TaskManagerInfo>> registeredTaskManagersSupplier =
+                Collections::emptyList;
+        private Function<InstanceID, Optional<TaskManagerInfo>> getRegisteredTaskManagerFunction =
+                ignore -> Optional.empty();
+        private Supplier<Collection<PendingTaskManager>> pendingTaskManagersSupplier =
+                Collections::emptyList;
+        private Function<AllocationID, Optional<TaskManagerSlotInformation>>
+                getAllocatedOrPendingSlotFunction = ignore -> Optional.empty();
+        private Supplier<ClusterResourceOverview> clusterResourceOverviewSupplier =
+                () -> new ClusterResourceOverview(Collections.emptyMap());
+
+        public Builder setClusterResourceOverviewSupplier(
+                Supplier<ClusterResourceOverview> clusterResourceOverviewSupplier) {
+            this.clusterResourceOverviewSupplier = clusterResourceOverviewSupplier;
+            return this;
+        }
+
+        public Builder setGetAllocatedOrPendingSlotFunction(
+                Function<AllocationID, Optional<TaskManagerSlotInformation>>
+                        getAllocatedOrPendingSlotFunction) {
+            this.getAllocatedOrPendingSlotFunction = getAllocatedOrPendingSlotFunction;
+            return this;
+        }
+
+        public Builder setGetPendingAllocationsOfPendingTaskManagerFunction(
+                Function<PendingTaskManagerId, Map<JobID, ResourceCounter>>
+                        getPendingAllocationsOfPendingTaskManagerFunction) {
+            this.getPendingAllocationsOfPendingTaskManagerFunction =
+                    getPendingAllocationsOfPendingTaskManagerFunction;
+            return this;
+        }
+
+        public Builder setGetRegisteredTaskManagerFunction(
+                Function<InstanceID, Optional<TaskManagerInfo>> getRegisteredTaskManagerFunction) {
+            this.getRegisteredTaskManagerFunction = getRegisteredTaskManagerFunction;
+            return this;
+        }
+
+        public Builder setPendingTaskManagersSupplier(
+                Supplier<Collection<PendingTaskManager>> pendingTaskManagersSupplier) {
+            this.pendingTaskManagersSupplier = pendingTaskManagersSupplier;
+            return this;
+        }
+
+        public Builder setRegisteredTaskManagersSupplier(
+                Supplier<Collection<? extends TaskManagerInfo>> registeredTaskManagersSupplier) {
+            this.registeredTaskManagersSupplier = registeredTaskManagersSupplier;
+            return this;
+        }
+
+        public TestingTaskManagerResourceInfoProvider build() {
+            return new TestingTaskManagerResourceInfoProvider(
+                    getPendingAllocationsOfPendingTaskManagerFunction,
+                    registeredTaskManagersSupplier,
+                    getRegisteredTaskManagerFunction,
+                    pendingTaskManagersSupplier,
+                    getAllocatedOrPendingSlotFunction,
+                    clusterResourceOverviewSupplier);
+        }
+    }
+}