You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "xintongsong (via GitHub)" <gi...@apache.org> on 2023/03/23 07:14:52 UTC

[GitHub] [flink] xintongsong commented on a diff in pull request #22183: [FLINK-31444][runtime]FineGrainedSlotManager reclaims slots when job is finished

xintongsong commented on code in PR #22183:
URL: https://github.com/apache/flink/pull/22183#discussion_r1145753804


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerTracker.java:
##########
@@ -79,6 +80,15 @@ void addTaskManager(
     /** Get unwanted task managers. */
     Map<InstanceID, WorkerResourceSpec> getUnWantedTaskManager();
 
+    /**
+     * Returns all task executors that have at least 1 pending/completed allocation for the given
+     * job.
+     *
+     * @param jobId the job for which the task executors must have a slot
+     * @return task executors with at least 1 slot for the job
+     */
+    Collection<TaskExecutorConnection> getTaskExecutorsWithAllocatedSlotsForJob(JobID jobId);

Review Comment:
   Might be better to return a collection of `TaskManagerInfo`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotStatusSyncer.java:
##########
@@ -82,4 +83,12 @@ CompletableFuture<Void> allocateSlot(
      * @return whether the previous allocations can be applied
      */
     boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport);
+
+    /**
+     * Frees all currently inactive slot allocated for the given job and task executor.
+     *
+     * @param jobId of the job
+     * @param taskExecutorConnection of the task manager
+     */
+    void freeInactiveSlots(JobID jobId, TaskExecutorConnection taskExecutorConnection);

Review Comment:
   I think we can make this interface per-job, so that we free inactive slots on all TMs for the given job id with 1 call. The `SlotStatusSyncer` has access to `TaskManagerTracker`, which means it has all the information needed.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTracker.java:
##########
@@ -186,6 +186,15 @@ public Map<JobID, ResourceCounter> removePendingTaskManager(
                 .orElse(Collections.emptyMap());
     }
 
+    @Override
+    public Collection<TaskExecutorConnection> getTaskExecutorsWithAllocatedSlotsForJob(
+            JobID jobId) {
+        return slots.values().stream()
+                .filter(slot -> jobId.equals(slot.getJobId()))
+                .map(FineGrainedTaskManagerSlot::getTaskManagerConnection)
+                .collect(Collectors.toList());
+    }

Review Comment:
   The returned list may contain repeated task managers.
   
   It might be better to iterate and filter over `taskManagerRegistrations` rather than `slots`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncer.java:
##########
@@ -308,6 +309,14 @@ private boolean syncAllocatedSlotStatus(SlotStatus slotStatus, TaskManagerInfo t
         }
     }
 
+    @Override
+    public void freeInactiveSlots(JobID jobId, TaskExecutorConnection taskExecutorConnection) {
+        checkStarted();
+        taskExecutorConnection
+                .getTaskExecutorGateway()
+                .freeInactiveSlots(jobId, taskManagerRequestTimeout);
+    }

Review Comment:
   Shall we eagerly notify the task manager tracker and resource tracker about the freeing of the slots? Or we rely on task managers to report the slots becoming available later to keep the states consistent? I personally lean towards eagerly sync the status. Either way, it should be documented.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org