You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2021/02/28 10:53:16 UTC

[flink] branch master updated (adaaed4 -> b995633)

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from adaaed4  [hotfix][state] Serialize KeyGroupsStateHandle in shared Incremental state handle
     new e6cf590  [hotfix][coordination] Move utility profiles to FineGrainedSlotManagerTestBase
     new 1ba2962  [hotfix][runtime] Directly pass ResourceCounter as the totalRequirements to the RequirementMatcher
     new ac54c12  [FLINK-21479][coordination] Introduce TaskManagerResourceInfoProvider as the read-only interface of TaskManagerTracker
     new b995633  [FLINK-21479][coordination] Provide TaskManagerResourceInfoProvider to ResourceAllocationStrategy

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../slotpool/DefaultDeclarativeSlotPool.java       |   2 +-
 .../DefaultResourceAllocationStrategy.java         |  38 ++++-
 .../slotmanager/FineGrainedSlotManager.java        |  15 +-
 .../slotmanager/JobScopedResourceTracker.java      |   2 +-
 .../slotmanager/ResourceAllocationStrategy.java    |  12 +-
 .../TaskManagerResourceInfoProvider.java           |  76 ++++++++++
 .../slotmanager/TaskManagerTracker.java            |  54 +------
 .../runtime/slots/DefaultRequirementMatcher.java   |  14 +-
 .../flink/runtime/slots/RequirementMatcher.java    |   5 +-
 .../AbstractFineGrainedSlotManagerITCase.java      |  34 ++---
 .../DefaultResourceAllocationStrategyTest.java     |  51 ++++---
 ...gerDefaultResourceAllocationStrategyITCase.java |  57 +------
 .../slotmanager/FineGrainedSlotManagerTest.java    | 127 +++++-----------
 .../FineGrainedSlotManagerTestBase.java            |  36 ++---
 .../TestingResourceAllocationStrategy.java         |  34 ++---
 .../slotmanager/TestingTaskManagerInfo.java        | 101 +++++++++++++
 .../TestingTaskManagerResourceInfoProvider.java    | 167 +++++++++++++++++++++
 17 files changed, 508 insertions(+), 317 deletions(-)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerResourceInfoProvider.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingTaskManagerInfo.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingTaskManagerResourceInfoProvider.java


[flink] 01/04: [hotfix][coordination] Move utility profiles to FineGrainedSlotManagerTestBase

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e6cf590838a950672e81e526b71656e4056ce37b
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Wed Feb 24 18:15:15 2021 +0800

    [hotfix][coordination] Move utility profiles to FineGrainedSlotManagerTestBase
---
 .../AbstractFineGrainedSlotManagerITCase.java      |  34 +++---
 ...gerDefaultResourceAllocationStrategyITCase.java |  57 +---------
 .../slotmanager/FineGrainedSlotManagerTest.java    | 119 ++++++---------------
 .../FineGrainedSlotManagerTestBase.java            |  36 +++----
 4 files changed, 70 insertions(+), 176 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java
index 296e406..2a8c9d8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java
@@ -122,7 +122,7 @@ public abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSl
                         jobId,
                         targetAddress,
                         Collections.singleton(
-                                ResourceRequirement.create(getDefaultSlotResourceProfile(), 1)));
+                                ResourceRequirement.create(DEFAULT_SLOT_RESOURCE_PROFILE, 1)));
 
         final CompletableFuture<
                         Tuple6<
@@ -157,8 +157,8 @@ public abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSl
                                         .registerTaskManager(
                                                 taskExecutorConnection,
                                                 new SlotReport(),
-                                                getDefaultTaskManagerResourceProfile(),
-                                                getDefaultSlotResourceProfile());
+                                                DEFAULT_TOTAL_RESOURCE_PROFILE,
+                                                DEFAULT_SLOT_RESOURCE_PROFILE);
                             }
 
                             getSlotManager().processResourceRequirements(requirements);
@@ -170,8 +170,8 @@ public abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSl
                                         .registerTaskManager(
                                                 taskExecutorConnection,
                                                 new SlotReport(),
-                                                getDefaultTaskManagerResourceProfile(),
-                                                getDefaultSlotResourceProfile());
+                                                DEFAULT_TOTAL_RESOURCE_PROFILE,
+                                                DEFAULT_SLOT_RESOURCE_PROFILE);
                             }
 
                             assertThat(
@@ -185,7 +185,7 @@ public abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSl
                                                                             FUTURE_TIMEOUT_SECOND,
                                                                             TimeUnit.SECONDS)
                                                                     .f2,
-                                                            getDefaultSlotResourceProfile(),
+                                                            DEFAULT_SLOT_RESOURCE_PROFILE,
                                                             targetAddress,
                                                             getResourceManagerId()))));
 
@@ -291,8 +291,8 @@ public abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSl
                                     .registerTaskManager(
                                             taskManagerConnection,
                                             slotReport,
-                                            getDefaultTaskManagerResourceProfile(),
-                                            getDefaultSlotResourceProfile());
+                                            DEFAULT_TOTAL_RESOURCE_PROFILE,
+                                            DEFAULT_SLOT_RESOURCE_PROFILE);
 
                             getSlotManager().processResourceRequirements(resourceRequirements1);
 
@@ -372,13 +372,13 @@ public abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSl
                             getSlotManager()
                                     .processResourceRequirements(
                                             createResourceRequirements(
-                                                    jobId, getDefaultNumberSlotsPerWorker()));
+                                                    jobId, DEFAULT_NUM_SLOTS_PER_WORKER));
                             assertThat(resourceRequests.get(), is(1));
 
                             getSlotManager()
                                     .processResourceRequirements(
                                             createResourceRequirements(
-                                                    jobId, getDefaultNumberSlotsPerWorker() + 1));
+                                                    jobId, DEFAULT_NUM_SLOTS_PER_WORKER + 1));
                             assertThat(resourceRequests.get(), is(2));
                         });
             }
@@ -427,8 +427,8 @@ public abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSl
                                     .registerTaskManager(
                                             taskManagerConnection,
                                             slotReport,
-                                            getDefaultTaskManagerResourceProfile(),
-                                            getDefaultSlotResourceProfile());
+                                            DEFAULT_TOTAL_RESOURCE_PROFILE,
+                                            DEFAULT_SLOT_RESOURCE_PROFILE);
 
                             getSlotManager().processResourceRequirements(resourceRequirements);
 
@@ -505,8 +505,8 @@ public abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSl
                                     .registerTaskManager(
                                             taskExecutionConnection,
                                             slotReport,
-                                            getDefaultTaskManagerResourceProfile(),
-                                            getDefaultSlotResourceProfile());
+                                            DEFAULT_TOTAL_RESOURCE_PROFILE,
+                                            DEFAULT_SLOT_RESOURCE_PROFILE);
                             getSlotManager()
                                     .unregisterTaskManager(
                                             taskExecutionConnection.getInstanceID(),
@@ -561,8 +561,8 @@ public abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSl
                                     .registerTaskManager(
                                             taskExecutionConnection,
                                             slotReport,
-                                            getDefaultTaskManagerResourceProfile(),
-                                            getDefaultSlotResourceProfile());
+                                            DEFAULT_TOTAL_RESOURCE_PROFILE,
+                                            DEFAULT_SLOT_RESOURCE_PROFILE);
                             getSlotManager()
                                     .reportSlotStatus(
                                             taskExecutionConnection.getInstanceID(),
@@ -571,7 +571,7 @@ public abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSl
                                                             allocationId.get(
                                                                     FUTURE_TIMEOUT_SECOND,
                                                                     TimeUnit.SECONDS),
-                                                            getDefaultSlotResourceProfile())));
+                                                            DEFAULT_SLOT_RESOURCE_PROFILE)));
 
                             assertThat(
                                     trackingSecurityManager.getSystemExitFuture().isDone(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
index bb976cf1..ef2e529 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
@@ -19,7 +19,6 @@ package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
 
 import org.junit.Test;
 
@@ -34,58 +33,8 @@ import static org.junit.Assert.assertThat;
  */
 public class FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase
         extends AbstractFineGrainedSlotManagerITCase {
-    private static final WorkerResourceSpec DEFAULT_WORKER_RESOURCE_SPEC =
-            new WorkerResourceSpec.Builder()
-                    .setCpuCores(10.0)
-                    .setTaskHeapMemoryMB(1000)
-                    .setTaskOffHeapMemoryMB(1000)
-                    .setNetworkMemoryMB(1000)
-                    .setManagedMemoryMB(1000)
-                    .build();
-    private static final WorkerResourceSpec LARGE_WORKER_RESOURCE_SPEC =
-            new WorkerResourceSpec.Builder()
-                    .setCpuCores(100.0)
-                    .setTaskHeapMemoryMB(10000)
-                    .setTaskOffHeapMemoryMB(10000)
-                    .setNetworkMemoryMB(10000)
-                    .setManagedMemoryMB(10000)
-                    .build();
-    private static final int DEFAULT_NUM_SLOTS_PER_WORKER = 2;
-    private static final ResourceProfile DEFAULT_TOTAL_RESOURCE_PROFILE =
-            SlotManagerUtils.generateTaskManagerTotalResourceProfile(DEFAULT_WORKER_RESOURCE_SPEC);
-    private static final ResourceProfile DEFAULT_SLOT_RESOURCE_PROFILE =
-            SlotManagerUtils.generateDefaultSlotResourceProfile(
-                    DEFAULT_WORKER_RESOURCE_SPEC, DEFAULT_NUM_SLOTS_PER_WORKER);
-    private static final ResourceProfile LARGE_TOTAL_RESOURCE_PROFILE =
-            SlotManagerUtils.generateTaskManagerTotalResourceProfile(LARGE_WORKER_RESOURCE_SPEC);
-    private static final ResourceProfile LARGE_SLOT_RESOURCE_PROFILE =
-            SlotManagerUtils.generateDefaultSlotResourceProfile(
-                    LARGE_WORKER_RESOURCE_SPEC, DEFAULT_NUM_SLOTS_PER_WORKER);
-
-    @Override
-    protected ResourceProfile getDefaultTaskManagerResourceProfile() {
-        return DEFAULT_TOTAL_RESOURCE_PROFILE;
-    }
-
-    @Override
-    protected ResourceProfile getDefaultSlotResourceProfile() {
-        return DEFAULT_SLOT_RESOURCE_PROFILE;
-    }
-
-    @Override
-    protected int getDefaultNumberSlotsPerWorker() {
-        return DEFAULT_NUM_SLOTS_PER_WORKER;
-    }
-
-    @Override
-    protected ResourceProfile getLargeTaskManagerResourceProfile() {
-        return LARGE_TOTAL_RESOURCE_PROFILE;
-    }
-
-    @Override
-    protected ResourceProfile getLargeSlotResourceProfile() {
-        return LARGE_SLOT_RESOURCE_PROFILE;
-    }
+    private static final ResourceProfile OTHER_SLOT_RESOURCE_PROFILE =
+            DEFAULT_TOTAL_RESOURCE_PROFILE.multiply(2);
 
     @Override
     protected Optional<ResourceAllocationStrategy> getResourceAllocationStrategy() {
@@ -114,7 +63,7 @@ public class FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase
                             getSlotManager()
                                     .processResourceRequirements(
                                             createResourceRequirements(
-                                                    new JobID(), 1, getLargeSlotResourceProfile()));
+                                                    new JobID(), 1, OTHER_SLOT_RESOURCE_PROFILE));
                             assertThat(resourceRequests.get(), is(0));
                         });
             }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
index 4d01336..dbb0e42 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
-import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
 import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 import org.apache.flink.runtime.slots.ResourceRequirements;
@@ -59,58 +58,10 @@ import static org.junit.Assert.assertTrue;
 /** Tests of {@link FineGrainedSlotManager}. */
 public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
 
-    private static final WorkerResourceSpec DEFAULT_WORKER_RESOURCE_SPEC =
-            new WorkerResourceSpec.Builder()
-                    .setCpuCores(10.0)
-                    .setTaskHeapMemoryMB(1000)
-                    .setTaskOffHeapMemoryMB(1000)
-                    .setNetworkMemoryMB(1000)
-                    .setManagedMemoryMB(1000)
-                    .build();
-    private static final WorkerResourceSpec LARGE_WORKER_RESOURCE_SPEC =
-            new WorkerResourceSpec.Builder()
-                    .setCpuCores(100.0)
-                    .setTaskHeapMemoryMB(10000)
-                    .setTaskOffHeapMemoryMB(10000)
-                    .setNetworkMemoryMB(10000)
-                    .setManagedMemoryMB(10000)
-                    .build();
-    private static final int DEFAULT_NUM_SLOTS_PER_WORKER = 2;
-    private static final ResourceProfile DEFAULT_TOTAL_RESOURCE_PROFILE =
-            SlotManagerUtils.generateTaskManagerTotalResourceProfile(DEFAULT_WORKER_RESOURCE_SPEC);
-    private static final ResourceProfile DEFAULT_SLOT_RESOURCE_PROFILE =
-            SlotManagerUtils.generateDefaultSlotResourceProfile(
-                    DEFAULT_WORKER_RESOURCE_SPEC, DEFAULT_NUM_SLOTS_PER_WORKER);
-    private static final ResourceProfile LARGE_TOTAL_RESOURCE_PROFILE =
-            SlotManagerUtils.generateTaskManagerTotalResourceProfile(LARGE_WORKER_RESOURCE_SPEC);
     private static final ResourceProfile LARGE_SLOT_RESOURCE_PROFILE =
-            SlotManagerUtils.generateDefaultSlotResourceProfile(
-                    LARGE_WORKER_RESOURCE_SPEC, DEFAULT_NUM_SLOTS_PER_WORKER);
-
-    @Override
-    protected ResourceProfile getDefaultTaskManagerResourceProfile() {
-        return DEFAULT_TOTAL_RESOURCE_PROFILE;
-    }
-
-    @Override
-    protected ResourceProfile getDefaultSlotResourceProfile() {
-        return DEFAULT_SLOT_RESOURCE_PROFILE;
-    }
-
-    @Override
-    protected int getDefaultNumberSlotsPerWorker() {
-        return DEFAULT_NUM_SLOTS_PER_WORKER;
-    }
-
-    @Override
-    protected ResourceProfile getLargeTaskManagerResourceProfile() {
-        return LARGE_TOTAL_RESOURCE_PROFILE;
-    }
-
-    @Override
-    protected ResourceProfile getLargeSlotResourceProfile() {
-        return LARGE_SLOT_RESOURCE_PROFILE;
-    }
+            DEFAULT_TOTAL_RESOURCE_PROFILE.multiply(2);
+    private static final ResourceProfile LARGE_TOTAL_RESOURCE_PROFILE =
+            LARGE_SLOT_RESOURCE_PROFILE.multiply(2);
 
     @Override
     protected Optional<ResourceAllocationStrategy> getResourceAllocationStrategy() {
@@ -146,12 +97,12 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
                                     .registerTaskManager(
                                             taskManagerConnection,
                                             new SlotReport(),
-                                            getDefaultTaskManagerResourceProfile(),
-                                            getDefaultSlotResourceProfile());
+                                            DEFAULT_TOTAL_RESOURCE_PROFILE,
+                                            DEFAULT_SLOT_RESOURCE_PROFILE);
 
                             assertThat(
                                     getSlotManager().getNumberRegisteredSlots(),
-                                    equalTo(getDefaultNumberSlotsPerWorker()));
+                                    equalTo(DEFAULT_NUM_SLOTS_PER_WORKER));
                             assertThat(
                                     getTaskManagerTracker().getRegisteredTaskManagers().size(),
                                     equalTo(1));
@@ -166,14 +117,14 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
                                                     taskManagerConnection.getInstanceID())
                                             .get()
                                             .getAvailableResource(),
-                                    equalTo(getDefaultTaskManagerResourceProfile()));
+                                    equalTo(DEFAULT_TOTAL_RESOURCE_PROFILE));
                             assertThat(
                                     getTaskManagerTracker()
                                             .getRegisteredTaskManager(
                                                     taskManagerConnection.getInstanceID())
                                             .get()
                                             .getTotalResource(),
-                                    equalTo(getDefaultTaskManagerResourceProfile()));
+                                    equalTo(DEFAULT_TOTAL_RESOURCE_PROFILE));
                         });
             }
         };
@@ -191,7 +142,7 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
         final AllocationID allocationId = new AllocationID();
         final SlotReport slotReport =
                 new SlotReport(
-                        createAllocatedSlotStatus(allocationId, getDefaultSlotResourceProfile()));
+                        createAllocatedSlotStatus(allocationId, DEFAULT_SLOT_RESOURCE_PROFILE));
         new Context() {
             {
                 runTest(
@@ -200,8 +151,8 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
                                     .registerTaskManager(
                                             taskManagerConnection,
                                             slotReport,
-                                            getDefaultTaskManagerResourceProfile(),
-                                            getDefaultSlotResourceProfile());
+                                            DEFAULT_TOTAL_RESOURCE_PROFILE,
+                                            DEFAULT_SLOT_RESOURCE_PROFILE);
                             assertThat(
                                     getTaskManagerTracker().getRegisteredTaskManagers().size(),
                                     is(1));
@@ -235,7 +186,7 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
         final SlotReport slotReportWithAllocatedSlot =
                 new SlotReport(
                         createAllocatedSlotStatus(
-                                new AllocationID(), getDefaultSlotResourceProfile()));
+                                new AllocationID(), DEFAULT_SLOT_RESOURCE_PROFILE));
         new Context() {
             {
                 runTest(
@@ -243,15 +194,15 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
                             getTaskManagerTracker()
                                     .addPendingTaskManager(
                                             new PendingTaskManager(
-                                                    getDefaultTaskManagerResourceProfile(),
-                                                    getDefaultNumberSlotsPerWorker()));
+                                                    DEFAULT_TOTAL_RESOURCE_PROFILE,
+                                                    DEFAULT_NUM_SLOTS_PER_WORKER));
                             // task manager with allocated slot cannot deduct pending task manager
                             getSlotManager()
                                     .registerTaskManager(
                                             taskExecutionConnection1,
                                             slotReportWithAllocatedSlot,
-                                            getDefaultTaskManagerResourceProfile(),
-                                            getDefaultSlotResourceProfile());
+                                            DEFAULT_TOTAL_RESOURCE_PROFILE,
+                                            DEFAULT_SLOT_RESOURCE_PROFILE);
                             assertThat(
                                     getTaskManagerTracker().getPendingTaskManagers().size(), is(1));
                             // task manager with mismatched resource cannot deduct pending task
@@ -260,16 +211,16 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
                                     .registerTaskManager(
                                             taskExecutionConnection2,
                                             new SlotReport(),
-                                            getLargeTaskManagerResourceProfile(),
-                                            getLargeSlotResourceProfile());
+                                            LARGE_TOTAL_RESOURCE_PROFILE,
+                                            LARGE_SLOT_RESOURCE_PROFILE);
                             assertThat(
                                     getTaskManagerTracker().getPendingTaskManagers().size(), is(1));
                             getSlotManager()
                                     .registerTaskManager(
                                             taskExecutionConnection3,
                                             new SlotReport(),
-                                            getDefaultTaskManagerResourceProfile(),
-                                            getDefaultSlotResourceProfile());
+                                            DEFAULT_TOTAL_RESOURCE_PROFILE,
+                                            DEFAULT_SLOT_RESOURCE_PROFILE);
                             assertThat(
                                     getTaskManagerTracker().getPendingTaskManagers().size(), is(0));
                         });
@@ -340,7 +291,7 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
                                         .addAllocationOnRegisteredResource(
                                                 jobId,
                                                 taskManagerConnection.getInstanceID(),
-                                                getDefaultSlotResourceProfile())
+                                                DEFAULT_SLOT_RESOURCE_PROFILE)
                                         .build()));
                 runTest(
                         () -> {
@@ -348,8 +299,8 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
                                     .registerTaskManager(
                                             taskManagerConnection,
                                             slotReport,
-                                            getDefaultTaskManagerResourceProfile(),
-                                            getDefaultSlotResourceProfile());
+                                            DEFAULT_TOTAL_RESOURCE_PROFILE,
+                                            DEFAULT_SLOT_RESOURCE_PROFILE);
                             getSlotManager()
                                     .processResourceRequirements(
                                             createResourceRequirements(jobId, 1));
@@ -364,7 +315,7 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
                                             requestSlotFuture.get(
                                                     FUTURE_TIMEOUT_SECOND, TimeUnit.SECONDS);
                             assertEquals(jobId, requestSlot.f1);
-                            assertEquals(getDefaultSlotResourceProfile(), requestSlot.f3);
+                            assertEquals(DEFAULT_SLOT_RESOURCE_PROFILE, requestSlot.f3);
                         });
             }
         };
@@ -386,8 +337,8 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
                                 ResourceAllocationResult.builder()
                                         .addPendingTaskManagerAllocate(
                                                 new PendingTaskManager(
-                                                        getDefaultTaskManagerResourceProfile(),
-                                                        getDefaultNumberSlotsPerWorker()))
+                                                        DEFAULT_TOTAL_RESOURCE_PROFILE,
+                                                        DEFAULT_NUM_SLOTS_PER_WORKER))
                                         .build()));
                 runTest(
                         () -> {
@@ -405,7 +356,7 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
         final JobID jobId = new JobID();
         final PendingTaskManager pendingTaskManager =
                 new PendingTaskManager(
-                        getDefaultTaskManagerResourceProfile(), getDefaultNumberSlotsPerWorker());
+                        DEFAULT_TOTAL_RESOURCE_PROFILE, DEFAULT_NUM_SLOTS_PER_WORKER);
         final CompletableFuture<
                         Tuple6<
                                 SlotID,
@@ -434,7 +385,7 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
                                         .addAllocationOnPendingResource(
                                                 jobId,
                                                 pendingTaskManager.getPendingTaskManagerId(),
-                                                getDefaultSlotResourceProfile())
+                                                DEFAULT_SLOT_RESOURCE_PROFILE)
                                         .build()));
                 runTest(
                         () -> {
@@ -445,8 +396,8 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
                                     .registerTaskManager(
                                             taskManagerConnection,
                                             new SlotReport(),
-                                            getDefaultTaskManagerResourceProfile(),
-                                            getDefaultSlotResourceProfile());
+                                            DEFAULT_TOTAL_RESOURCE_PROFILE,
+                                            DEFAULT_SLOT_RESOURCE_PROFILE);
                             final Tuple6<
                                             SlotID,
                                             JobID,
@@ -458,7 +409,7 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
                                             requestSlotFuture.get(
                                                     FUTURE_TIMEOUT_SECOND, TimeUnit.SECONDS);
                             assertEquals(jobId, requestSlot.f1);
-                            assertEquals(getDefaultSlotResourceProfile(), requestSlot.f3);
+                            assertEquals(DEFAULT_SLOT_RESOURCE_PROFILE, requestSlot.f3);
                         });
             }
         };
@@ -547,9 +498,9 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
                                             new SlotReport(
                                                     createAllocatedSlotStatus(
                                                             allocationId,
-                                                            getDefaultSlotResourceProfile())),
-                                            getDefaultTaskManagerResourceProfile(),
-                                            getDefaultSlotResourceProfile());
+                                                            DEFAULT_SLOT_RESOURCE_PROFILE)),
+                                            DEFAULT_TOTAL_RESOURCE_PROFILE,
+                                            DEFAULT_SLOT_RESOURCE_PROFILE);
                             assertEquals(
                                     getSlotManager().getTaskManagerIdleSince(instanceId),
                                     Long.MAX_VALUE);
@@ -570,7 +521,7 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
                             // SlotManager. The receiver of the callback can then decide what to do
                             // with the TaskManager.
                             assertEquals(
-                                    getDefaultNumberSlotsPerWorker(),
+                                    DEFAULT_NUM_SLOTS_PER_WORKER,
                                     getSlotManager().getNumberRegisteredSlots());
 
                             getSlotManager()
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
index 4929b2c5..efc3003 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.metrics.groups.SlotManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
 import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 import org.apache.flink.runtime.slots.ResourceRequirements;
@@ -48,27 +49,20 @@ public abstract class FineGrainedSlotManagerTestBase extends TestLogger {
     private static final Executor MAIN_THREAD_EXECUTOR = Executors.directExecutor();
     static final FlinkException TEST_EXCEPTION = new FlinkException("Test exception");
     static final long FUTURE_TIMEOUT_SECOND = 5;
-
-    /** Resource profile for the default task manager. */
-    protected abstract ResourceProfile getDefaultTaskManagerResourceProfile();
-
-    /** Resource profile for the default slot and requirement. */
-    protected abstract ResourceProfile getDefaultSlotResourceProfile();
-
-    /** The number of slot for the default task manager. */
-    protected abstract int getDefaultNumberSlotsPerWorker();
-
-    /**
-     * Resource profile for a larger task manager, which can fulfill both the larger and the default
-     * slots.
-     */
-    protected abstract ResourceProfile getLargeTaskManagerResourceProfile();
-
-    /**
-     * Resource profile for a larger slot or requirement, which can be fulfilled by the task manager
-     * and cannot be fulfilled by the default task manager.
-     */
-    protected abstract ResourceProfile getLargeSlotResourceProfile();
+    static final WorkerResourceSpec DEFAULT_WORKER_RESOURCE_SPEC =
+            new WorkerResourceSpec.Builder()
+                    .setCpuCores(10.0)
+                    .setTaskHeapMemoryMB(1000)
+                    .setTaskOffHeapMemoryMB(1000)
+                    .setNetworkMemoryMB(1000)
+                    .setManagedMemoryMB(1000)
+                    .build();
+    static final int DEFAULT_NUM_SLOTS_PER_WORKER = 2;
+    static final ResourceProfile DEFAULT_TOTAL_RESOURCE_PROFILE =
+            SlotManagerUtils.generateTaskManagerTotalResourceProfile(DEFAULT_WORKER_RESOURCE_SPEC);
+    static final ResourceProfile DEFAULT_SLOT_RESOURCE_PROFILE =
+            SlotManagerUtils.generateDefaultSlotResourceProfile(
+                    DEFAULT_WORKER_RESOURCE_SPEC, DEFAULT_NUM_SLOTS_PER_WORKER);
 
     protected abstract Optional<ResourceAllocationStrategy> getResourceAllocationStrategy();
 


[flink] 04/04: [FLINK-21479][coordination] Provide TaskManagerResourceInfoProvider to ResourceAllocationStrategy

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b995633c3d0245bcc2bf3ca9b47a4c6758bb927b
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Thu Feb 25 11:08:22 2021 +0800

    [FLINK-21479][coordination] Provide TaskManagerResourceInfoProvider to ResourceAllocationStrategy
    
    This closes #15015
---
 .../DefaultResourceAllocationStrategy.java         |  38 ++++-
 .../slotmanager/FineGrainedSlotManager.java        |  15 +-
 .../slotmanager/ResourceAllocationStrategy.java    |  12 +-
 .../DefaultResourceAllocationStrategyTest.java     |  51 ++++---
 .../slotmanager/FineGrainedSlotManagerTest.java    |   8 +-
 .../TestingResourceAllocationStrategy.java         |  34 ++---
 .../slotmanager/TestingTaskManagerInfo.java        | 101 +++++++++++++
 .../TestingTaskManagerResourceInfoProvider.java    | 167 +++++++++++++++++++++
 8 files changed, 346 insertions(+), 80 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java
index ed01816..569f906 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.util.ResourceCounter;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
@@ -67,15 +66,17 @@ public class DefaultResourceAllocationStrategy implements ResourceAllocationStra
     @Override
     public ResourceAllocationResult tryFulfillRequirements(
             Map<JobID, Collection<ResourceRequirement>> missingResources,
-            Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> registeredResources,
-            List<PendingTaskManager> pendingTaskManagers) {
+            TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
         final ResourceAllocationResult.Builder resultBuilder = ResourceAllocationResult.builder();
+
+        // Tuples of available and default slot resource for registered task managers, indexed by
+        // instanceId
+        final Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> registeredResources =
+                getRegisteredResources(taskManagerResourceInfoProvider);
+        // Available resources of pending task managers, indexed by the pendingTaskManagerId
         final Map<PendingTaskManagerId, ResourceProfile> pendingResources =
-                pendingTaskManagers.stream()
-                        .collect(
-                                Collectors.toMap(
-                                        PendingTaskManager::getPendingTaskManagerId,
-                                        PendingTaskManager::getTotalResourceProfile));
+                getPendingResources(taskManagerResourceInfoProvider);
+
         for (Map.Entry<JobID, Collection<ResourceRequirement>> resourceRequirements :
                 missingResources.entrySet()) {
             final JobID jobId = resourceRequirements.getKey();
@@ -95,6 +96,27 @@ public class DefaultResourceAllocationStrategy implements ResourceAllocationStra
         return resultBuilder.build();
     }
 
+    private static Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> getRegisteredResources(
+            TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
+        return taskManagerResourceInfoProvider.getRegisteredTaskManagers().stream()
+                .collect(
+                        Collectors.toMap(
+                                TaskManagerInfo::getInstanceId,
+                                taskManager ->
+                                        Tuple2.of(
+                                                taskManager.getAvailableResource(),
+                                                taskManager.getDefaultSlotResourceProfile())));
+    }
+
+    private static Map<PendingTaskManagerId, ResourceProfile> getPendingResources(
+            TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
+        return taskManagerResourceInfoProvider.getPendingTaskManagers().stream()
+                .collect(
+                        Collectors.toMap(
+                                PendingTaskManager::getPendingTaskManagerId,
+                                PendingTaskManager::getTotalResourceProfile));
+    }
+
     private static ResourceCounter tryFulfillRequirementsForJobWithRegisteredResources(
             JobID jobId,
             Collection<ResourceRequirement> missingResources,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
index 7f2d03a..f6c6890 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.resourcemanager.slotmanager;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
@@ -452,21 +451,9 @@ public class FineGrainedSlotManager implements SlotManager {
                                 Collectors.toMap(
                                         Map.Entry::getKey, e -> new ArrayList<>(e.getValue())));
 
-        final Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> availableResources =
-                taskManagerTracker.getRegisteredTaskManagers().stream()
-                        .collect(
-                                Collectors.toMap(
-                                        TaskManagerInfo::getInstanceId,
-                                        taskManager ->
-                                                Tuple2.of(
-                                                        taskManager.getAvailableResource(),
-                                                        taskManager
-                                                                .getDefaultSlotResourceProfile())));
         final ResourceAllocationResult result =
                 resourceAllocationStrategy.tryFulfillRequirements(
-                        missingResources,
-                        availableResources,
-                        new ArrayList<>(taskManagerTracker.getPendingTaskManagers()));
+                        missingResources, taskManagerTracker);
 
         // Allocate slots according to the result
         allocateSlotsAccordingTo(result.getAllocationsOnRegisteredResources());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java
index 71fad04..2585e11 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java
@@ -19,13 +19,9 @@
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 
 /** Strategy for allocating slots and task managers to fulfill the unfulfilled requirements. */
@@ -39,14 +35,12 @@ public interface ResourceAllocationStrategy {
      * input arguments. If the arguments are reused elsewhere, please make a deep copy in advance.
      *
      * @param missingResources resource requirements that are not yet fulfilled, indexed by jobId
-     * @param registeredResources tuples of available and default slot resource for registered task
-     *     managers, indexed by instanceId
-     * @param pendingTaskManagers available and default slot resources of pending task managers
+     * @param taskManagerResourceInfoProvider provide the registered/pending resources of the
+     *     current cluster
      * @return a {@link ResourceAllocationResult} based on the current status, which contains
      *     whether the requirements can be fulfilled and the actions to take
      */
     ResourceAllocationResult tryFulfillRequirements(
             Map<JobID, Collection<ResourceRequirement>> missingResources,
-            Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> registeredResources,
-            List<PendingTaskManager> pendingTaskManagers);
+            TaskManagerResourceInfoProvider taskManagerResourceInfoProvider);
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java
index b02f523..15cccd7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java
@@ -19,9 +19,7 @@
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 import org.apache.flink.runtime.util.ResourceCounter;
 import org.apache.flink.util.TestLogger;
@@ -30,7 +28,6 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -49,56 +46,61 @@ public class DefaultResourceAllocationStrategyTest extends TestLogger {
 
     @Test
     public void testFulfillRequirementWithRegisteredResources() {
-        final Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> registeredResources =
-                new HashMap<>();
+        final TaskManagerInfo taskManager =
+                new TestingTaskManagerInfo(
+                        DEFAULT_SLOT_RESOURCE.multiply(10),
+                        DEFAULT_SLOT_RESOURCE.multiply(10),
+                        DEFAULT_SLOT_RESOURCE);
         final JobID jobId = new JobID();
-        final InstanceID instanceId = new InstanceID();
         final List<ResourceRequirement> requirements = new ArrayList<>();
         final ResourceProfile largeResource = DEFAULT_SLOT_RESOURCE.multiply(8);
+        final TaskManagerResourceInfoProvider taskManagerResourceInfoProvider =
+                TestingTaskManagerResourceInfoProvider.newBuilder()
+                        .setRegisteredTaskManagersSupplier(() -> Collections.singleton(taskManager))
+                        .build();
         requirements.add(ResourceRequirement.create(largeResource, 1));
         requirements.add(ResourceRequirement.create(ResourceProfile.UNKNOWN, 2));
-        registeredResources.put(
-                instanceId, Tuple2.of(DEFAULT_SLOT_RESOURCE.multiply(10), DEFAULT_SLOT_RESOURCE));
 
         final ResourceAllocationResult result =
                 STRATEGY.tryFulfillRequirements(
                         Collections.singletonMap(jobId, requirements),
-                        registeredResources,
-                        Collections.emptyList());
+                        taskManagerResourceInfoProvider);
         assertThat(result.getUnfulfillableJobs(), is(empty()));
         assertThat(result.getAllocationsOnPendingResources().keySet(), is(empty()));
         assertThat(result.getPendingTaskManagersToAllocate(), is(empty()));
         assertThat(
                 result.getAllocationsOnRegisteredResources()
                         .get(jobId)
-                        .get(instanceId)
+                        .get(taskManager.getInstanceId())
                         .getResourceCount(DEFAULT_SLOT_RESOURCE),
                 is(2));
         assertThat(
                 result.getAllocationsOnRegisteredResources()
                         .get(jobId)
-                        .get(instanceId)
+                        .get(taskManager.getInstanceId())
                         .getResourceCount(largeResource),
                 is(1));
     }
 
     @Test
     public void testFulfillRequirementWithPendingResources() {
-        final List<PendingTaskManager> pendingTaskManagers = new ArrayList<>();
         final JobID jobId = new JobID();
         final List<ResourceRequirement> requirements = new ArrayList<>();
         final ResourceProfile largeResource = DEFAULT_SLOT_RESOURCE.multiply(3);
         final PendingTaskManager pendingTaskManager =
                 new PendingTaskManager(DEFAULT_SLOT_RESOURCE.multiply(NUM_OF_SLOTS), NUM_OF_SLOTS);
+        final TaskManagerResourceInfoProvider taskManagerResourceInfoProvider =
+                TestingTaskManagerResourceInfoProvider.newBuilder()
+                        .setPendingTaskManagersSupplier(
+                                () -> Collections.singleton(pendingTaskManager))
+                        .build();
         requirements.add(ResourceRequirement.create(largeResource, 1));
         requirements.add(ResourceRequirement.create(ResourceProfile.UNKNOWN, 7));
-        pendingTaskManagers.add(pendingTaskManager);
 
         final ResourceAllocationResult result =
                 STRATEGY.tryFulfillRequirements(
                         Collections.singletonMap(jobId, requirements),
-                        Collections.emptyMap(),
-                        pendingTaskManagers);
+                        taskManagerResourceInfoProvider);
         assertThat(result.getUnfulfillableJobs(), is(empty()));
         assertThat(result.getAllocationsOnRegisteredResources().keySet(), is(empty()));
         assertThat(result.getPendingTaskManagersToAllocate().size(), is(1));
@@ -130,21 +132,24 @@ public class DefaultResourceAllocationStrategyTest extends TestLogger {
 
     @Test
     public void testUnfulfillableRequirement() {
-        final Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> registeredResources =
-                new HashMap<>();
+        final TaskManagerInfo taskManager =
+                new TestingTaskManagerInfo(
+                        DEFAULT_SLOT_RESOURCE.multiply(NUM_OF_SLOTS),
+                        DEFAULT_SLOT_RESOURCE.multiply(NUM_OF_SLOTS),
+                        DEFAULT_SLOT_RESOURCE);
         final JobID jobId = new JobID();
         final List<ResourceRequirement> requirements = new ArrayList<>();
         final ResourceProfile unfulfillableResource = DEFAULT_SLOT_RESOURCE.multiply(8);
+        final TaskManagerResourceInfoProvider taskManagerResourceInfoProvider =
+                TestingTaskManagerResourceInfoProvider.newBuilder()
+                        .setRegisteredTaskManagersSupplier(() -> Collections.singleton(taskManager))
+                        .build();
         requirements.add(ResourceRequirement.create(unfulfillableResource, 1));
-        registeredResources.put(
-                new InstanceID(),
-                Tuple2.of(DEFAULT_SLOT_RESOURCE.multiply(NUM_OF_SLOTS), DEFAULT_SLOT_RESOURCE));
 
         final ResourceAllocationResult result =
                 STRATEGY.tryFulfillRequirements(
                         Collections.singletonMap(jobId, requirements),
-                        registeredResources,
-                        Collections.emptyList());
+                        taskManagerResourceInfoProvider);
         assertThat(result.getUnfulfillableJobs(), contains(jobId));
         assertThat(result.getPendingTaskManagersToAllocate(), is(empty()));
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
index dbb0e42..038b9b2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
@@ -286,7 +286,7 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
         new Context() {
             {
                 resourceAllocationStrategyBuilder.setTryFulfillRequirementsFunction(
-                        ((jobIDCollectionMap, instanceIDTuple2Map, pendingTaskManagers) ->
+                        ((jobIDCollectionMap, taskManagerResourceInfoProvider) ->
                                 ResourceAllocationResult.builder()
                                         .addAllocationOnRegisteredResource(
                                                 jobId,
@@ -333,7 +333,7 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
                             return true;
                         });
                 resourceAllocationStrategyBuilder.setTryFulfillRequirementsFunction(
-                        ((jobIDCollectionMap, instanceIDTuple2Map, pendingTaskManagers) ->
+                        ((jobIDCollectionMap, taskManagerResourceInfoProvider) ->
                                 ResourceAllocationResult.builder()
                                         .addPendingTaskManagerAllocate(
                                                 new PendingTaskManager(
@@ -379,7 +379,7 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
         new Context() {
             {
                 resourceAllocationStrategyBuilder.setTryFulfillRequirementsFunction(
-                        ((jobIDCollectionMap, instanceIDTuple2Map, pendingTaskManagers) ->
+                        ((jobIDCollectionMap, taskManagerResourceInfoProvider) ->
                                 ResourceAllocationResult.builder()
                                         .addPendingTaskManagerAllocate(pendingTaskManager)
                                         .addAllocationOnPendingResource(
@@ -437,7 +437,7 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
                                 notEnoughResourceNotifications.add(
                                         Tuple2.of(jobId1, acquiredResources)));
                 resourceAllocationStrategyBuilder.setTryFulfillRequirementsFunction(
-                        ((jobIDCollectionMap, instanceIDTuple2Map, pendingTaskManagers) ->
+                        ((jobIDCollectionMap, taskManagerResourceInfoProvider) ->
                                 ResourceAllocationResult.builder()
                                         .addUnfulfillableJob(jobId)
                                         .build()));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocationStrategy.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocationStrategy.java
index cfdfcd2..a744527 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocationStrategy.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocationStrategy.java
@@ -18,31 +18,25 @@
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.function.TriFunction;
 
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
+import java.util.function.BiFunction;
 
 /** Implementation of {@link ResourceAllocationStrategy} for testing purpose. */
 public class TestingResourceAllocationStrategy implements ResourceAllocationStrategy {
-    private final TriFunction<
+    private final BiFunction<
                     Map<JobID, Collection<ResourceRequirement>>,
-                    Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>>,
-                    List<PendingTaskManager>,
+                    TaskManagerResourceInfoProvider,
                     ResourceAllocationResult>
             tryFulfillRequirementsFunction;
 
     private TestingResourceAllocationStrategy(
-            TriFunction<
+            BiFunction<
                             Map<JobID, Collection<ResourceRequirement>>,
-                            Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>>,
-                            List<PendingTaskManager>,
+                            TaskManagerResourceInfoProvider,
                             ResourceAllocationResult>
                     tryFulfillRequirementsFunction) {
         this.tryFulfillRequirementsFunction =
@@ -52,10 +46,9 @@ public class TestingResourceAllocationStrategy implements ResourceAllocationStra
     @Override
     public ResourceAllocationResult tryFulfillRequirements(
             Map<JobID, Collection<ResourceRequirement>> missingResources,
-            Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> registeredResources,
-            List<PendingTaskManager> pendingTaskManagers) {
+            TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
         return tryFulfillRequirementsFunction.apply(
-                missingResources, registeredResources, pendingTaskManagers);
+                missingResources, taskManagerResourceInfoProvider);
     }
 
     public static Builder newBuilder() {
@@ -63,20 +56,17 @@ public class TestingResourceAllocationStrategy implements ResourceAllocationStra
     }
 
     public static class Builder {
-        private TriFunction<
+        private BiFunction<
                         Map<JobID, Collection<ResourceRequirement>>,
-                        Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>>,
-                        List<PendingTaskManager>,
+                        TaskManagerResourceInfoProvider,
                         ResourceAllocationResult>
                 tryFulfillRequirementsFunction =
-                        (ignored0, ignored1, ignored2) ->
-                                ResourceAllocationResult.builder().build();
+                        (ignored0, ignored1) -> ResourceAllocationResult.builder().build();
 
         public Builder setTryFulfillRequirementsFunction(
-                TriFunction<
+                BiFunction<
                                 Map<JobID, Collection<ResourceRequirement>>,
-                                Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>>,
-                                List<PendingTaskManager>,
+                                TaskManagerResourceInfoProvider,
                                 ResourceAllocationResult>
                         tryFulfillRequirementsFunction) {
             this.tryFulfillRequirementsFunction = tryFulfillRequirementsFunction;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingTaskManagerInfo.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingTaskManagerInfo.java
new file mode 100644
index 0000000..50ad317
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingTaskManagerInfo.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.Map;
+
+/** Implementation of {@link TaskManagerInfo} for testing purpose. */
+public class TestingTaskManagerInfo implements TaskManagerInfo {
+    private final TaskExecutorConnection taskExecutorConnection;
+    private final ResourceProfile totalResource;
+    private final ResourceProfile availableResource;
+    private final ResourceProfile defaultSlotResourceProfile;
+    private final int defaultNumSlots;
+
+    public TestingTaskManagerInfo(
+            ResourceProfile totalResource,
+            ResourceProfile availableResource,
+            ResourceProfile defaultSlotResourceProfile) {
+        this.totalResource = Preconditions.checkNotNull(totalResource);
+        this.availableResource = Preconditions.checkNotNull(availableResource);
+        this.defaultSlotResourceProfile = Preconditions.checkNotNull(defaultSlotResourceProfile);
+
+        this.defaultNumSlots =
+                SlotManagerUtils.calculateDefaultNumSlots(
+                        totalResource, defaultSlotResourceProfile);
+        this.taskExecutorConnection =
+                new TaskExecutorConnection(
+                        ResourceID.generate(),
+                        new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
+    }
+
+    @Override
+    public InstanceID getInstanceId() {
+        return taskExecutorConnection.getInstanceID();
+    }
+
+    @Override
+    public TaskExecutorConnection getTaskExecutorConnection() {
+        return taskExecutorConnection;
+    }
+
+    @Override
+    public Map<AllocationID, TaskManagerSlotInformation> getAllocatedSlots() {
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public ResourceProfile getAvailableResource() {
+        return availableResource;
+    }
+
+    @Override
+    public ResourceProfile getTotalResource() {
+        return totalResource;
+    }
+
+    @Override
+    public ResourceProfile getDefaultSlotResourceProfile() {
+        return defaultSlotResourceProfile;
+    }
+
+    @Override
+    public int getDefaultNumSlots() {
+        return defaultNumSlots;
+    }
+
+    @Override
+    public long getIdleSince() {
+        return Long.MAX_VALUE;
+    }
+
+    @Override
+    public boolean isIdle() {
+        return false;
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingTaskManagerResourceInfoProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingTaskManagerResourceInfoProvider.java
new file mode 100644
index 0000000..c904cd7
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingTaskManagerResourceInfoProvider.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.util.ResourceCounter;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/** Implementation of {@link TaskManagerResourceInfoProvider} for testing purpose. */
+public class TestingTaskManagerResourceInfoProvider implements TaskManagerResourceInfoProvider {
+    private final Function<PendingTaskManagerId, Map<JobID, ResourceCounter>>
+            getPendingAllocationsOfPendingTaskManagerFunction;
+    private final Supplier<Collection<? extends TaskManagerInfo>> registeredTaskManagersSupplier;
+    private final Function<InstanceID, Optional<TaskManagerInfo>> getRegisteredTaskManagerFunction;
+    private final Supplier<Collection<PendingTaskManager>> pendingTaskManagersSupplier;
+    private final Function<AllocationID, Optional<TaskManagerSlotInformation>>
+            getAllocatedOrPendingSlotFunction;
+    private final Supplier<ClusterResourceOverview> clusterResourceOverviewSupplier;
+
+    private TestingTaskManagerResourceInfoProvider(
+            Function<PendingTaskManagerId, Map<JobID, ResourceCounter>>
+                    getPendingAllocationsOfPendingTaskManagerFunction,
+            Supplier<Collection<? extends TaskManagerInfo>> registeredTaskManagersSupplier,
+            Function<InstanceID, Optional<TaskManagerInfo>> getRegisteredTaskManagerFunction,
+            Supplier<Collection<PendingTaskManager>> pendingTaskManagersSupplier,
+            Function<AllocationID, Optional<TaskManagerSlotInformation>>
+                    getAllocatedOrPendingSlotFunction,
+            Supplier<ClusterResourceOverview> clusterResourceOverviewSupplier) {
+        this.getPendingAllocationsOfPendingTaskManagerFunction =
+                Preconditions.checkNotNull(getPendingAllocationsOfPendingTaskManagerFunction);
+        this.registeredTaskManagersSupplier =
+                Preconditions.checkNotNull(registeredTaskManagersSupplier);
+        this.getRegisteredTaskManagerFunction =
+                Preconditions.checkNotNull(getRegisteredTaskManagerFunction);
+        this.pendingTaskManagersSupplier = Preconditions.checkNotNull(pendingTaskManagersSupplier);
+        this.getAllocatedOrPendingSlotFunction =
+                Preconditions.checkNotNull(getAllocatedOrPendingSlotFunction);
+        this.clusterResourceOverviewSupplier =
+                Preconditions.checkNotNull(clusterResourceOverviewSupplier);
+    }
+
+    @Override
+    public Map<JobID, ResourceCounter> getPendingAllocationsOfPendingTaskManager(
+            PendingTaskManagerId pendingTaskManagerId) {
+        return getPendingAllocationsOfPendingTaskManagerFunction.apply(pendingTaskManagerId);
+    }
+
+    @Override
+    public Collection<? extends TaskManagerInfo> getRegisteredTaskManagers() {
+        return registeredTaskManagersSupplier.get();
+    }
+
+    @Override
+    public Optional<TaskManagerInfo> getRegisteredTaskManager(InstanceID instanceId) {
+        return getRegisteredTaskManagerFunction.apply(instanceId);
+    }
+
+    @Override
+    public Collection<PendingTaskManager> getPendingTaskManagers() {
+        return pendingTaskManagersSupplier.get();
+    }
+
+    @Override
+    public Optional<TaskManagerSlotInformation> getAllocatedOrPendingSlot(
+            AllocationID allocationId) {
+        return getAllocatedOrPendingSlotFunction.apply(allocationId);
+    }
+
+    @Override
+    public ClusterResourceOverview getClusterResourceOverview() {
+        return clusterResourceOverviewSupplier.get();
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+        private Function<PendingTaskManagerId, Map<JobID, ResourceCounter>>
+                getPendingAllocationsOfPendingTaskManagerFunction =
+                        ignore -> Collections.emptyMap();
+        private Supplier<Collection<? extends TaskManagerInfo>> registeredTaskManagersSupplier =
+                Collections::emptyList;
+        private Function<InstanceID, Optional<TaskManagerInfo>> getRegisteredTaskManagerFunction =
+                ignore -> Optional.empty();
+        private Supplier<Collection<PendingTaskManager>> pendingTaskManagersSupplier =
+                Collections::emptyList;
+        private Function<AllocationID, Optional<TaskManagerSlotInformation>>
+                getAllocatedOrPendingSlotFunction = ignore -> Optional.empty();
+        private Supplier<ClusterResourceOverview> clusterResourceOverviewSupplier =
+                () -> new ClusterResourceOverview(Collections.emptyMap());
+
+        public Builder setClusterResourceOverviewSupplier(
+                Supplier<ClusterResourceOverview> clusterResourceOverviewSupplier) {
+            this.clusterResourceOverviewSupplier = clusterResourceOverviewSupplier;
+            return this;
+        }
+
+        public Builder setGetAllocatedOrPendingSlotFunction(
+                Function<AllocationID, Optional<TaskManagerSlotInformation>>
+                        getAllocatedOrPendingSlotFunction) {
+            this.getAllocatedOrPendingSlotFunction = getAllocatedOrPendingSlotFunction;
+            return this;
+        }
+
+        public Builder setGetPendingAllocationsOfPendingTaskManagerFunction(
+                Function<PendingTaskManagerId, Map<JobID, ResourceCounter>>
+                        getPendingAllocationsOfPendingTaskManagerFunction) {
+            this.getPendingAllocationsOfPendingTaskManagerFunction =
+                    getPendingAllocationsOfPendingTaskManagerFunction;
+            return this;
+        }
+
+        public Builder setGetRegisteredTaskManagerFunction(
+                Function<InstanceID, Optional<TaskManagerInfo>> getRegisteredTaskManagerFunction) {
+            this.getRegisteredTaskManagerFunction = getRegisteredTaskManagerFunction;
+            return this;
+        }
+
+        public Builder setPendingTaskManagersSupplier(
+                Supplier<Collection<PendingTaskManager>> pendingTaskManagersSupplier) {
+            this.pendingTaskManagersSupplier = pendingTaskManagersSupplier;
+            return this;
+        }
+
+        public Builder setRegisteredTaskManagersSupplier(
+                Supplier<Collection<? extends TaskManagerInfo>> registeredTaskManagersSupplier) {
+            this.registeredTaskManagersSupplier = registeredTaskManagersSupplier;
+            return this;
+        }
+
+        public TestingTaskManagerResourceInfoProvider build() {
+            return new TestingTaskManagerResourceInfoProvider(
+                    getPendingAllocationsOfPendingTaskManagerFunction,
+                    registeredTaskManagersSupplier,
+                    getRegisteredTaskManagerFunction,
+                    pendingTaskManagersSupplier,
+                    getAllocatedOrPendingSlotFunction,
+                    clusterResourceOverviewSupplier);
+        }
+    }
+}


[flink] 02/04: [hotfix][runtime] Directly pass ResourceCounter as the totalRequirements to the RequirementMatcher

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1ba2962361a8b5fb24149711655e616a13d2d61d
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Wed Feb 24 18:32:37 2021 +0800

    [hotfix][runtime] Directly pass ResourceCounter as the totalRequirements to the RequirementMatcher
    
    As the ResourceCounter is immutable, we can directly pass it as a argument, this also give us chance to optimize the matching perfermance
---
 .../jobmaster/slotpool/DefaultDeclarativeSlotPool.java     |  2 +-
 .../slotmanager/JobScopedResourceTracker.java              |  2 +-
 .../flink/runtime/slots/DefaultRequirementMatcher.java     | 14 +++++++++++---
 .../org/apache/flink/runtime/slots/RequirementMatcher.java |  5 ++---
 4 files changed, 15 insertions(+), 8 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
index e76ccf5..24ad556 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java
@@ -212,7 +212,7 @@ public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool {
         final Optional<ResourceProfile> match =
                 requirementMatcher.match(
                         slotOffer.getResourceProfile(),
-                        totalResourceRequirements.getResourcesWithCount(),
+                        totalResourceRequirements,
                         fulfilledResourceRequirements::getResourceCount);
 
         if (match.isPresent()) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/JobScopedResourceTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/JobScopedResourceTracker.java
index d8d333d..2446f4f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/JobScopedResourceTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/JobScopedResourceTracker.java
@@ -87,7 +87,7 @@ class JobScopedResourceTracker {
     private Optional<ResourceProfile> findMatchingRequirement(ResourceProfile resourceProfile) {
         return requirementMatcher.match(
                 resourceProfile,
-                resourceRequirements.getResourcesWithCount(),
+                resourceRequirements,
                 resourceToRequirementMapping::getNumFulfillingResources);
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/slots/DefaultRequirementMatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/slots/DefaultRequirementMatcher.java
index 7698e59..36a46de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/slots/DefaultRequirementMatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/slots/DefaultRequirementMatcher.java
@@ -18,8 +18,8 @@
 package org.apache.flink.runtime.slots;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.util.ResourceCounter;
 
-import java.util.Collection;
 import java.util.Map;
 import java.util.Optional;
 import java.util.function.Function;
@@ -32,9 +32,17 @@ public class DefaultRequirementMatcher implements RequirementMatcher {
     @Override
     public Optional<ResourceProfile> match(
             ResourceProfile resourceProfile,
-            Collection<Map.Entry<ResourceProfile, Integer>> totalRequirements,
+            ResourceCounter totalRequirements,
             Function<ResourceProfile, Integer> numAssignedResourcesLookup) {
-        for (Map.Entry<ResourceProfile, Integer> requirementCandidate : totalRequirements) {
+        // Short-cut for fine-grained resource management. If there is already exactly equal
+        // requirement, we can directly match with it.
+        if (totalRequirements.getResourceCount(resourceProfile)
+                > numAssignedResourcesLookup.apply(resourceProfile)) {
+            return Optional.of(resourceProfile);
+        }
+
+        for (Map.Entry<ResourceProfile, Integer> requirementCandidate :
+                totalRequirements.getResourcesWithCount()) {
             ResourceProfile requirementProfile = requirementCandidate.getKey();
 
             // beware the order when matching resources to requirements, because
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/slots/RequirementMatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/slots/RequirementMatcher.java
index 6a08a62..1f8c1be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/slots/RequirementMatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/slots/RequirementMatcher.java
@@ -18,9 +18,8 @@
 package org.apache.flink.runtime.slots;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.util.ResourceCounter;
 
-import java.util.Collection;
-import java.util.Map;
 import java.util.Optional;
 import java.util.function.Function;
 
@@ -38,6 +37,6 @@ public interface RequirementMatcher {
      */
     Optional<ResourceProfile> match(
             ResourceProfile resourceProfile,
-            Collection<Map.Entry<ResourceProfile, Integer>> totalRequirements,
+            ResourceCounter totalRequirements,
             Function<ResourceProfile, Integer> numAssignedResourcesLookup);
 }


[flink] 03/04: [FLINK-21479][coordination] Introduce TaskManagerResourceInfoProvider as the read-only interface of TaskManagerTracker

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ac54c12800e20794a217bba829ad3821659349cc
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Thu Feb 25 10:26:33 2021 +0800

    [FLINK-21479][coordination] Introduce TaskManagerResourceInfoProvider as the read-only interface of TaskManagerTracker
---
 .../TaskManagerResourceInfoProvider.java           | 76 ++++++++++++++++++++++
 .../slotmanager/TaskManagerTracker.java            | 54 +--------------
 2 files changed, 77 insertions(+), 53 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerResourceInfoProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerResourceInfoProvider.java
new file mode 100644
index 0000000..e2bfed35
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerResourceInfoProvider.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.util.ResourceCounter;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+
+/** Provide the information of TaskManager's resource and slot status. */
+interface TaskManagerResourceInfoProvider {
+    /**
+     * Get the pending allocations of the given pending task manager.
+     *
+     * @param pendingTaskManagerId of the pending task manager
+     * @return pending allocations, mapped by jobId
+     */
+    Map<JobID, ResourceCounter> getPendingAllocationsOfPendingTaskManager(
+            PendingTaskManagerId pendingTaskManagerId);
+
+    /**
+     * Get the {@link TaskManagerInfo}s of all registered task managers.
+     *
+     * @return a collection of {@link TaskManagerInfo}s of all registered task managers.
+     */
+    Collection<? extends TaskManagerInfo> getRegisteredTaskManagers();
+
+    /**
+     * Get the {@link TaskManagerInfo} of a registered task manager with the given instanceId
+     *
+     * @param instanceId of the task manager
+     * @return An Optional of {@link TaskManagerInfo}, if find, of the task manager
+     */
+    Optional<TaskManagerInfo> getRegisteredTaskManager(InstanceID instanceId);
+
+    /**
+     * Get all pending task managers.
+     *
+     * @return a collection of {@link PendingTaskManager}s.
+     */
+    Collection<PendingTaskManager> getPendingTaskManagers();
+
+    /**
+     * Get the {@link TaskManagerSlotInformation} of the allocated slot with the given allocationId.
+     *
+     * @param allocationId of the slot
+     * @return An Optional of {@link TaskManagerSlotInformation}, if find, of the slot
+     */
+    Optional<TaskManagerSlotInformation> getAllocatedOrPendingSlot(AllocationID allocationId);
+
+    /**
+     * Get the current {@link ClusterResourceOverview}.
+     *
+     * @return the current {@link ClusterResourceOverview}
+     */
+    ClusterResourceOverview getClusterResourceOverview();
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerTracker.java
index 570f9dd..efeaa2f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerTracker.java
@@ -24,12 +24,10 @@ import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.runtime.util.ResourceCounter;
 
-import java.util.Collection;
 import java.util.Map;
-import java.util.Optional;
 
 /** Tracks TaskManager's resource and slot status. */
-interface TaskManagerTracker {
+interface TaskManagerTracker extends TaskManagerResourceInfoProvider {
 
     // ---------------------------------------------------------------------------------------------
     // Add / Remove (pending) Resource
@@ -98,56 +96,6 @@ interface TaskManagerTracker {
     void replaceAllPendingAllocations(
             Map<PendingTaskManagerId, Map<JobID, ResourceCounter>> pendingSlotAllocations);
 
-    // ---------------------------------------------------------------------------------------------
-    // Getters
-    // ---------------------------------------------------------------------------------------------
-
-    /**
-     * Get the pending allocations of the given pending task manager.
-     *
-     * @param pendingTaskManagerId of the pending task manager
-     * @return pending allocations, mapped by jobId
-     */
-    Map<JobID, ResourceCounter> getPendingAllocationsOfPendingTaskManager(
-            PendingTaskManagerId pendingTaskManagerId);
-
-    /**
-     * Get the {@link TaskManagerInfo}s of all registered task managers.
-     *
-     * @return a collection of {@link TaskManagerInfo}s of all registered task managers.
-     */
-    Collection<? extends TaskManagerInfo> getRegisteredTaskManagers();
-
-    /**
-     * Get the {@link TaskManagerInfo} of a registered task manager with the given instanceId
-     *
-     * @param instanceId of the task manager
-     * @return An Optional of {@link TaskManagerInfo}, if find, of the task manager
-     */
-    Optional<TaskManagerInfo> getRegisteredTaskManager(InstanceID instanceId);
-
-    /**
-     * Get all pending task managers.
-     *
-     * @return a collection of {@link PendingTaskManager}s.
-     */
-    Collection<PendingTaskManager> getPendingTaskManagers();
-
-    /**
-     * Get the {@link TaskManagerSlotInformation} of the allocated slot with the given allocationId.
-     *
-     * @param allocationId of the slot
-     * @return An Optional of {@link TaskManagerSlotInformation}, if find, of the slot
-     */
-    Optional<TaskManagerSlotInformation> getAllocatedOrPendingSlot(AllocationID allocationId);
-
-    /**
-     * Get the current {@link ClusterResourceOverview}.
-     *
-     * @return the current {@link ClusterResourceOverview}
-     */
-    ClusterResourceOverview getClusterResourceOverview();
-
     /** Removes all state from the tracker. */
     void clear();
 }