You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "xintongsong (via GitHub)" <gi...@apache.org> on 2023/03/23 06:04:01 UTC

[GitHub] [flink] xintongsong commented on a diff in pull request #22176: [FLINK-31441][runtime]FineGrainedSlotManager support evenly slot selection.

xintongsong commented on code in PR #22176:
URL: https://github.com/apache/flink/pull/22176#discussion_r1145655305


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotMatchingStrategy.java:
##########
@@ -41,4 +42,18 @@ <T extends TaskManagerSlotInformation> Optional<T> findMatchingSlot(
             ResourceProfile requestedProfile,
             Collection<T> freeSlots,
             Function<InstanceID, Integer> numberRegisteredSlotsLookup);
+
+    /**
+     * Finds a matching resource instance for the request {@link ResourceProfile} given the
+     * collection of available instances.
+     *
+     * @param resourceMatchingPredicate Predicate of whether instance resource is matching
+     * @param availableInstances collection of available instances
+     * @param instanceResourceInfoLookup lookup for the current resource info of instance
+     * @return Returns a matching instance or {@link Optional#empty()} if there is none
+     */
+    Optional<InstanceID> findMatchingResource(
+            Predicate<InstanceID> resourceMatchingPredicate,
+            Collection<InstanceID> availableInstances,
+            Function<InstanceID, TaskManagerResourceInfoSnapshot> instanceResourceInfoLookup);

Review Comment:
   I'd suggest to move this to `DefaultResourceAllocationStrategy` as an internal interface.
   - The new method does not match the description of `SlotMatchingStrategy`, which is "Strategy how to find a matching slot". Even if we change the name, the implementation of the new method is completely independent from the existing ones, which is usually an indicator that the interface has two responsibilities that can be separated.
   - A side benefit is that, by making this matching strategy and its implementations internal to `DefaultResourceAllocationStrategy`, we can simply maintain the resource utilization in `InternalResourceInfo`, which prevents lots of redundant calculations and introducing of `TaskManagerResourceInfoSnapshot`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/LeastUtilizationSlotMatchingStrategy.java:
##########
@@ -79,4 +94,21 @@ private static double calculateUtilization(
 
         return (double) (numberRegisteredSlots - numberFreeSlots) / numberRegisteredSlots;
     }
+
+    private static double calculateUtilization(
+            TaskManagerResourceInfoSnapshot instanceResourceInfoSnapshot) {
+        if (instanceResourceInfoSnapshot.getTotalProfile() == ResourceProfile.UNKNOWN
+                || instanceResourceInfoSnapshot.getAvailableProfile() == ResourceProfile.UNKNOWN) {
+            return instanceResourceInfoSnapshot.getAllocatedSlotNumber();
+        } else {
+            // Since CPU and memory are proportional in most scenarios,
+            // use cpu usage to represent utilization to simplify the logic in the first version
+            return instanceResourceInfoSnapshot
+                    .getTotalProfile()
+                    .getCpuCores()
+                    .subtract(instanceResourceInfoSnapshot.getAvailableProfile().getCpuCores())
+                    .getValue()
+                    .doubleValue();
+        }
+    }

Review Comment:
   The utilization can be maintained in `InternalResourceInfo` and re-calculated only upon allocation changes.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -73,9 +80,28 @@ public ResourceAllocationResult tryFulfillRequirements(
             BlockedTaskManagerChecker blockedTaskManagerChecker) {
         final ResourceAllocationResult.Builder resultBuilder = ResourceAllocationResult.builder();
 
-        final List<InternalResourceInfo> registeredResources =
-                getAvailableResources(
-                        taskManagerResourceInfoProvider, resultBuilder, blockedTaskManagerChecker);
+        final List<TaskManagerInfo> registeredResources =
+                getAvailableResources(taskManagerResourceInfoProvider, blockedTaskManagerChecker);
+        final List<InstanceID> registeredResourceIdInOrder =
+                registeredResources.stream()
+                        .map(TaskManagerInfo::getInstanceId)
+                        .collect(Collectors.toList());
+        final Map<InstanceID, InternalResourceInfo> registeredResourcesInfo =
+                registeredResources.stream()
+                        .collect(
+                                Collectors.toMap(
+                                        TaskManagerInfo::getInstanceId,
+                                        t ->
+                                                new InternalResourceInfo(
+                                                        t.getDefaultSlotResourceProfile(),
+                                                        t.getTotalResource(),
+                                                        t.getAvailableResource(),
+                                                        (jobID, slotProfile) ->
+                                                                resultBuilder
+                                                                        .addAllocationOnRegisteredResource(
+                                                                                jobID,
+                                                                                t.getInstanceId(),
+                                                                                slotProfile))));

Review Comment:
   I'm not entirely sure why this is moved out from `getAvailableResources`. IIUC, this is because we need both a list of `InstanceID` and a map from `InstanceID` to `InternalResourceInfo` for `tryFulfillRequirementsForJobWithResources`. But isn't the list the same as the keyset of the map?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/LeastUtilizationSlotMatchingStrategy.java:
##########
@@ -79,4 +94,21 @@ private static double calculateUtilization(
 
         return (double) (numberRegisteredSlots - numberFreeSlots) / numberRegisteredSlots;
     }
+
+    private static double calculateUtilization(
+            TaskManagerResourceInfoSnapshot instanceResourceInfoSnapshot) {
+        if (instanceResourceInfoSnapshot.getTotalProfile() == ResourceProfile.UNKNOWN
+                || instanceResourceInfoSnapshot.getAvailableProfile() == ResourceProfile.UNKNOWN) {
+            return instanceResourceInfoSnapshot.getAllocatedSlotNumber();
+        } else {
+            // Since CPU and memory are proportional in most scenarios,
+            // use cpu usage to represent utilization to simplify the logic in the first version
+            return instanceResourceInfoSnapshot
+                    .getTotalProfile()
+                    .getCpuCores()
+                    .subtract(instanceResourceInfoSnapshot.getAvailableProfile().getCpuCores())
+                    .getValue()
+                    .doubleValue();
+        }
+    }

Review Comment:
   Moreover, I'm not sure about assuming cpu & memory are proportional. In fact, it's one of the major reason why people need fine-grained resource management that they are not always proportional.
   
   A simple way of calculating the utilization while considering both cpu & memory, is to calculate the utilization of cpu and memory separately and use whichever is larger.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -135,7 +154,45 @@ private static List<InternalResourceInfo> getPendingResources(
                 .collect(Collectors.toList());
     }
 
-    private static int tryFulfilledRequirementWithResource(
+    private int tryFulfilledRequirementWithAvailableResources(

Review Comment:
   I think there's not much differences between fulfilling requirements from existing available resources and pending resources. We only need an argument to tell which strategy should be used.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -73,9 +80,28 @@ public ResourceAllocationResult tryFulfillRequirements(
             BlockedTaskManagerChecker blockedTaskManagerChecker) {
         final ResourceAllocationResult.Builder resultBuilder = ResourceAllocationResult.builder();
 
-        final List<InternalResourceInfo> registeredResources =
-                getAvailableResources(
-                        taskManagerResourceInfoProvider, resultBuilder, blockedTaskManagerChecker);
+        final List<TaskManagerInfo> registeredResources =
+                getAvailableResources(taskManagerResourceInfoProvider, blockedTaskManagerChecker);
+        final List<InstanceID> registeredResourceIdInOrder =
+                registeredResources.stream()
+                        .map(TaskManagerInfo::getInstanceId)
+                        .collect(Collectors.toList());

Review Comment:
   The variable is named `registeredResourceIdInOrder`. However, I don't see what is the order.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -73,9 +80,28 @@ public ResourceAllocationResult tryFulfillRequirements(
             BlockedTaskManagerChecker blockedTaskManagerChecker) {
         final ResourceAllocationResult.Builder resultBuilder = ResourceAllocationResult.builder();
 
-        final List<InternalResourceInfo> registeredResources =
-                getAvailableResources(
-                        taskManagerResourceInfoProvider, resultBuilder, blockedTaskManagerChecker);
+        final List<TaskManagerInfo> registeredResources =
+                getAvailableResources(taskManagerResourceInfoProvider, blockedTaskManagerChecker);
+        final List<InstanceID> registeredResourceIdInOrder =
+                registeredResources.stream()
+                        .map(TaskManagerInfo::getInstanceId)
+                        .collect(Collectors.toList());
+        final Map<InstanceID, InternalResourceInfo> registeredResourcesInfo =
+                registeredResources.stream()
+                        .collect(
+                                Collectors.toMap(
+                                        TaskManagerInfo::getInstanceId,
+                                        t ->
+                                                new InternalResourceInfo(
+                                                        t.getDefaultSlotResourceProfile(),
+                                                        t.getTotalResource(),
+                                                        t.getAvailableResource(),
+                                                        (jobID, slotProfile) ->
+                                                                resultBuilder
+                                                                        .addAllocationOnRegisteredResource(
+                                                                                jobID,
+                                                                                t.getInstanceId(),
+                                                                                slotProfile))));

Review Comment:
   And if we work on `InternalResourceInfo` directly for selecting the less utilization resource, this complexity can be avoided.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org