You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "huwh (via GitHub)" <gi...@apache.org> on 2023/03/16 13:59:46 UTC

[GitHub] [flink] huwh opened a new pull request, #22196: [FLINK-31445][runtime]Split resource allocate/release related logic from FineGrainedSlotManager to TaskManagerTracker

huwh opened a new pull request, #22196:
URL: https://github.com/apache/flink/pull/22196

   ## What is the purpose of the change
   Currently the FineGrainedSlotManager is response to slots allocations and resources request/release. This makes the logical of FineGrainedSlotManager complicated, So we will move task manager related work from FineGrainedSlotManager to TaskManagerTracker, which already tracks task managers but not including request/release.
   
   
   ## Brief change log
   
   *(for example:)*
     - *migrate tests to Junit5*
     - *move resource allocation/ resource declaration to TaskManagerTracker*
     - *move (un-)register task manager to TaskManagerTracker*
     - *move task manager idle release check to TaskManagerTracker*
   
   
   ## Verifying this change
     - *Add unit tests in FineGrainedTaskManagerTrackerTest*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #22196: [FLINK-31445][runtime]Split resource allocate/release related logic from FineGrainedSlotManager to TaskManagerTracker

Posted by "flinkbot (via GitHub)" <gi...@apache.org>.
flinkbot commented on PR #22196:
URL: https://github.com/apache/flink/pull/22196#issuecomment-1472044733

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "61a6fc42bb3d9a3ddf8e71c5c83fe516d9469b70",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "61a6fc42bb3d9a3ddf8e71c5c83fe516d9469b70",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 61a6fc42bb3d9a3ddf8e71c5c83fe516d9469b70 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22196: [FLINK-31445][runtime]Split resource allocate/release related logic from FineGrainedSlotManager to TaskManagerTracker

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22196:
URL: https://github.com/apache/flink/pull/22196#discussion_r1142918984


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java:
##########
@@ -83,7 +83,13 @@ private static SlotManager createSlotManager(
                     slotManagerConfiguration,
                     slotManagerMetricGroup,
                     new DefaultResourceTracker(),
-                    new FineGrainedTaskManagerTracker(),
+                    new FineGrainedTaskManagerTracker(
+                            slotManagerConfiguration.getMaxTotalCpu(),
+                            slotManagerConfiguration.getMaxTotalMem(),
+                            slotManagerConfiguration.isWaitResultConsumedBeforeRelease(),
+                            slotManagerConfiguration.getTaskManagerTimeout(),
+                            slotManagerConfiguration.getDeclareNeededResourceDelay(),

Review Comment:
   It makes sense to refactor this after `DeclarativeSlotManager` removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] huwh closed pull request #22196: [FLINK-31445][runtime]Split resource allocate/release related logic from FineGrainedSlotManager to TaskManagerTracker

Posted by "huwh (via GitHub)" <gi...@apache.org>.
huwh closed pull request #22196: [FLINK-31445][runtime]Split resource allocate/release related logic from FineGrainedSlotManager to TaskManagerTracker
URL: https://github.com/apache/flink/pull/22196


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] huwh commented on pull request #22196: [FLINK-31445][runtime]Split resource allocate/release related logic from FineGrainedSlotManager to TaskManagerTracker

Posted by "huwh (via GitHub)" <gi...@apache.org>.
huwh commented on PR #22196:
URL: https://github.com/apache/flink/pull/22196#issuecomment-1477286542

   Thanks @reswqa , squash some commits, and address the first comments 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22196: [FLINK-31445][runtime]Split resource allocate/release related logic from FineGrainedSlotManager to TaskManagerTracker

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22196:
URL: https://github.com/apache/flink/pull/22196#discussion_r1143047050


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTracker.java:
##########
@@ -387,14 +520,187 @@ public ResourceProfile getPendingResource() {
         return totalPendingResource;
     }
 
-    @Override
-    public void clear() {
-        slots.clear();
-        taskManagerRegistrations.clear();
-        totalRegisteredResource = ResourceProfile.ZERO;
-        pendingTaskManagers.clear();
-        totalPendingResource = ResourceProfile.ZERO;
-        pendingSlotAllocationRecords.clear();
-        unWantedTaskManagers.clear();
+    // ---------------------------------------------------------------------------------------------
+    // Resource allocations.
+    // ---------------------------------------------------------------------------------------------
+
+    private Set<PendingTaskManagerId> allocateTaskManagersAccordingTo(
+            List<PendingTaskManager> pendingTaskManagers) {
+        final Set<PendingTaskManagerId> failedAllocations = new HashSet<>();
+        for (PendingTaskManager pendingTaskManager : pendingTaskManagers) {
+            if (!allocateResource(pendingTaskManager)) {
+                failedAllocations.add(pendingTaskManager.getPendingTaskManagerId());
+            }
+        }
+        return failedAllocations;
+    }
+
+    private boolean allocateResource(PendingTaskManager pendingTaskManager) {
+        checkInit();
+        Preconditions.checkState(resourceAllocator.isSupported());
+        if (isMaxTotalResourceExceededAfterAdding(pendingTaskManager.getTotalResourceProfile())) {
+            LOG.info(
+                    "Could not allocate {}. Max total resource limitation <{}, {}> is reached.",
+                    pendingTaskManager,
+                    maxTotalCpu,
+                    maxTotalMem.toHumanReadableString());
+            return false;
+        }
+
+        addPendingTaskManager(pendingTaskManager);
+        return true;
+    }
+
+    // ---------------------------------------------------------------------------------------------
+    // Internal periodic check methods
+    // ---------------------------------------------------------------------------------------------
+
+    private void checkTaskManagerTimeouts() {
+        for (TaskManagerInfo timeoutTaskManager : getTimeOutTaskManagers()) {
+            if (waitResultConsumedBeforeRelease) {
+                releaseIdleTaskExecutorIfPossible(timeoutTaskManager);
+            } else {
+                releaseIdleTaskExecutor(timeoutTaskManager.getInstanceId());
+            }
+        }
+    }
+
+    private Collection<TaskManagerInfo> getTimeOutTaskManagers() {
+        long currentTime = System.currentTimeMillis();
+        return getRegisteredTaskManagers().stream()
+                .filter(
+                        taskManager ->
+                                taskManager.isIdle()
+                                        && currentTime - taskManager.getIdleSince()
+                                                >= taskManagerTimeout.toMilliseconds())
+                .collect(Collectors.toList());
+    }
+
+    private void releaseIdleTaskExecutorIfPossible(TaskManagerInfo taskManagerInfo) {
+        final long idleSince = taskManagerInfo.getIdleSince();
+        taskManagerInfo
+                .getTaskExecutorConnection()
+                .getTaskExecutorGateway()
+                .canBeReleased()
+                .thenAcceptAsync(
+                        canBeReleased -> {
+                            boolean stillIdle = idleSince == taskManagerInfo.getIdleSince();
+                            if (stillIdle && canBeReleased) {
+                                releaseIdleTaskExecutor(taskManagerInfo.getInstanceId());
+                            }
+                        },
+                        mainThreadExecutor);
+    }
+
+    private void releaseIdleTaskExecutor(InstanceID timedOutTaskManagerId) {
+        checkInit();
+        if (resourceAllocator.isSupported()) {
+            addUnWantedTaskManager(timedOutTaskManagerId);
+            declareNeededResourcesWithDelay();
+        }
+    }
+
+    private void addUnWantedTaskManager(InstanceID instanceId) {
+        final FineGrainedTaskManagerRegistration taskManager =
+                taskManagerRegistrations.get(instanceId);
+        if (taskManager != null) {
+            unWantedTaskManagers.put(
+                    instanceId,
+                    WorkerResourceSpec.fromTotalResourceProfile(
+                            taskManager.getTotalResource(),
+                            SlotManagerUtils.calculateDefaultNumSlots(
+                                    taskManager.getTotalResource(),
+                                    taskManager.getDefaultSlotResourceProfile())));
+        } else {
+            LOG.debug("Unwanted task manager {} does not exists.", instanceId);
+        }
+    }
+
+    @VisibleForTesting
+    public Map<InstanceID, WorkerResourceSpec> getUnWantedTaskManager() {
+        return unWantedTaskManagers;
+    }
+
+    void declareNeededResourcesWithDelay() {
+        Preconditions.checkState(resourceAllocator.isSupported());
+
+        if (declareNeededResourceDelay.toMillis() <= 0) {
+            declareNeededResources();
+        } else {
+            if (declareNeededResourceFuture == null || declareNeededResourceFuture.isDone()) {
+                declareNeededResourceFuture = new CompletableFuture<>();
+                scheduledExecutor.schedule(
+                        () ->
+                                mainThreadExecutor.execute(
+                                        () -> {
+                                            declareNeededResources();
+                                            Preconditions.checkNotNull(declareNeededResourceFuture)
+                                                    .complete(null);
+                                        }),
+                        declareNeededResourceDelay.toMillis(),
+                        TimeUnit.MILLISECONDS);
+            }
+        }
+    }
+
+    /** DO NOT call this method directly. Use {@link #declareNeededResourcesWithDelay()} instead. */
+    private void declareNeededResources() {
+        Map<InstanceID, WorkerResourceSpec> unWantedTaskManagers = this.getUnWantedTaskManager();
+        Map<WorkerResourceSpec, Set<InstanceID>> unWantedTaskManagerBySpec =
+                unWantedTaskManagers.entrySet().stream()
+                        .collect(
+                                Collectors.groupingBy(
+                                        Map.Entry::getValue,
+                                        Collectors.mapping(Map.Entry::getKey, Collectors.toSet())));
+
+        // registered TaskManagers except unwanted worker.
+        Stream<WorkerResourceSpec> registeredTaskManagerStream =
+                this.getRegisteredTaskManagers().stream()

Review Comment:
   Maybe some `this` pointer for method calls in `FineGrainedTaskManagerTracker` should not be needed.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTrackerTest.java:
##########
@@ -319,14 +435,118 @@ public void testGetStatistics() {
                 TASK_EXECUTOR_CONNECTION.getInstanceID(),
                 defaultSlotResource,
                 SlotState.ALLOCATED);
-        taskManagerTracker.addPendingTaskManager(
-                new PendingTaskManager(ResourceProfile.fromResources(4, 200), 1));
 
-        assertThat(taskManagerTracker.getFreeResource(), is(ResourceProfile.fromResources(6, 700)));
-        assertThat(taskManagerTracker.getRegisteredResource(), is(totalResource));
-        assertThat(taskManagerTracker.getNumberRegisteredSlots(), is(10));
-        assertThat(taskManagerTracker.getNumberFreeSlots(), is(8));
+        PendingTaskManager pendingTaskManager =
+                new PendingTaskManager(ResourceProfile.fromResources(4, 200), 1);
+        taskManagerTracker.allocateTaskManagersAccordingTo(
+                new ResourceAllocationResult.Builder()
+                        .addPendingTaskManagerAllocate(pendingTaskManager)
+                        .addAllocationOnPendingResource(
+                                jobId,
+                                pendingTaskManager.getPendingTaskManagerId(),
+                                ResourceProfile.fromResources(4, 200))
+                        .build());
+
+        assertThat(taskManagerTracker.getFreeResource())
+                .isEqualTo(ResourceProfile.fromResources(6, 700));
+        assertThat(taskManagerTracker.getRegisteredResource()).isEqualTo(totalResource);
+        assertThat(taskManagerTracker.getNumberRegisteredSlots()).isEqualTo(10);
+        assertThat(taskManagerTracker.getNumberFreeSlots()).isEqualTo(8);
+        assertThat(taskManagerTracker.getPendingResource())
+                .isEqualTo(ResourceProfile.fromResources(4, 200));
+    }
+
+    @Test
+    void testTimeoutForUnusedTaskManager() throws Exception {
+        final Time taskManagerTimeout = Time.milliseconds(50L);
+
+        final CompletableFuture<InstanceID> releaseResourceFuture = new CompletableFuture<>();
+
+        ResourceAllocator resourceAllocator =
+                new TestingResourceAllocatorBuilder()
+                        .setDeclareResourceNeededConsumer(
+                                (resourceDeclarations) -> {
+                                    assertThat(resourceDeclarations.size()).isEqualTo(1);
+                                    ResourceDeclaration resourceDeclaration =
+                                            resourceDeclarations.iterator().next();
+                                    assertThat(resourceDeclaration.getNumNeeded()).isEqualTo(0);
+                                    assertThat(resourceDeclaration.getUnwantedWorkers().size())
+                                            .isEqualTo(1);
+
+                                    releaseResourceFuture.complete(
+                                            resourceDeclaration
+                                                    .getUnwantedWorkers()
+                                                    .iterator()
+                                                    .next());
+                                })
+                        .build();
+
+        FineGrainedTaskManagerTracker taskManagerTracker =
+                new FineGrainedTaskManagerTrackerBuilder(
+                                new ScheduledExecutorServiceAdapter(
+                                        EXECUTOR_RESOURCE.getExecutor()))
+                        .setTaskManagerTimeout(taskManagerTimeout)
+                        .build();
+        taskManagerTracker.initialize(resourceAllocator, EXECUTOR_RESOURCE.getExecutor());
+
+        taskManagerTracker.registerTaskManager(
+                TASK_EXECUTOR_CONNECTION,
+                DEFAULT_TOTAL_RESOURCE_PROFILE,
+                DEFAULT_SLOT_RESOURCE_PROFILE,
+                null);
+
+        AllocationID allocationID = new AllocationID();
+        JobID jobID = new JobID();
+        taskManagerTracker.notifySlotStatus(
+                allocationID,
+                jobID,
+                TASK_EXECUTOR_CONNECTION.getInstanceID(),
+                ResourceProfile.fromResources(3, 200),
+                SlotState.ALLOCATED);
+
+        assertThat(
+                        taskManagerTracker.getRegisteredTaskManager(
+                                TASK_EXECUTOR_CONNECTION.getInstanceID()))
+                .hasValueSatisfying(
+                        taskManagerInfo ->
+                                assertThat(taskManagerInfo.getIdleSince())
+                                        .isEqualTo(Long.MAX_VALUE));
+
+        taskManagerTracker.notifySlotStatus(
+                allocationID,
+                jobID,
+                TASK_EXECUTOR_CONNECTION.getInstanceID(),
+                ResourceProfile.fromResources(3, 200),
+                SlotState.FREE);
+
         assertThat(
-                taskManagerTracker.getPendingResource(), is(ResourceProfile.fromResources(4, 200)));
+                        taskManagerTracker.getRegisteredTaskManager(
+                                TASK_EXECUTOR_CONNECTION.getInstanceID()))
+                .hasValueSatisfying(
+                        taskManagerInfo ->
+                                assertThat(taskManagerInfo.getIdleSince())
+                                        .isNotEqualTo(Long.MAX_VALUE));
+
+        assertThat(releaseResourceFuture.get(1, TimeUnit.SECONDS))

Review Comment:
   We'd better use `FlinkCompletableFutureAssert.assertThatFuture(releaseResourceFuture).eventuallySucceeds()` to get rid of the notorious timeout here.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTracker.java:
##########
@@ -77,30 +130,129 @@ public FineGrainedTaskManagerTracker() {
     }
 
     @Override
-    public void replaceAllPendingAllocations(
+    public void initialize(ResourceAllocator resourceAllocator, Executor mainThreadExecutor) {
+        this.resourceAllocator = resourceAllocator;
+        this.mainThreadExecutor = mainThreadExecutor;
+        this.started = true;
+
+        taskManagerTimeoutsCheck =
+                scheduledExecutor.scheduleWithFixedDelay(
+                        () -> mainThreadExecutor.execute(this::checkTaskManagerTimeouts),
+                        0L,
+                        taskManagerTimeout.toMilliseconds(),
+                        TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void close() {
+        // stop the timeout checks for the TaskManagers
+        if (taskManagerTimeoutsCheck != null) {
+            taskManagerTimeoutsCheck.cancel(false);
+            taskManagerTimeoutsCheck = null;
+        }
+
+        slots.clear();
+        taskManagerRegistrations.clear();
+        totalRegisteredResource = ResourceProfile.ZERO;
+        pendingTaskManagers.clear();
+        totalPendingResource = ResourceProfile.ZERO;
+        pendingSlotAllocationRecords.clear();
+        unWantedTaskManagers.clear();
+        mainThreadExecutor = null;
+        resourceAllocator = null;
+        started = false;
+    }
+
+    private void replaceAllPendingAllocations(
             Map<PendingTaskManagerId, Map<JobID, ResourceCounter>> pendingSlotAllocations) {
         Preconditions.checkNotNull(pendingSlotAllocations);
+        Preconditions.checkState(resourceAllocator.isSupported());
         LOG.trace("Record the pending allocations {}.", pendingSlotAllocations);
         pendingSlotAllocationRecords.clear();
         pendingSlotAllocationRecords.putAll(pendingSlotAllocations);
         removeUnusedPendingTaskManagers();

Review Comment:
   How about moving `declareNeededResourcesWithDelay` to the end of this method to align with `clearAllPendingAllocations`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTracker.java:
##########
@@ -387,14 +520,187 @@ public ResourceProfile getPendingResource() {
         return totalPendingResource;
     }
 
-    @Override
-    public void clear() {
-        slots.clear();
-        taskManagerRegistrations.clear();
-        totalRegisteredResource = ResourceProfile.ZERO;
-        pendingTaskManagers.clear();
-        totalPendingResource = ResourceProfile.ZERO;
-        pendingSlotAllocationRecords.clear();
-        unWantedTaskManagers.clear();
+    // ---------------------------------------------------------------------------------------------
+    // Resource allocations.
+    // ---------------------------------------------------------------------------------------------
+
+    private Set<PendingTaskManagerId> allocateTaskManagersAccordingTo(
+            List<PendingTaskManager> pendingTaskManagers) {
+        final Set<PendingTaskManagerId> failedAllocations = new HashSet<>();
+        for (PendingTaskManager pendingTaskManager : pendingTaskManagers) {
+            if (!allocateResource(pendingTaskManager)) {
+                failedAllocations.add(pendingTaskManager.getPendingTaskManagerId());
+            }
+        }
+        return failedAllocations;
+    }
+
+    private boolean allocateResource(PendingTaskManager pendingTaskManager) {
+        checkInit();
+        Preconditions.checkState(resourceAllocator.isSupported());
+        if (isMaxTotalResourceExceededAfterAdding(pendingTaskManager.getTotalResourceProfile())) {
+            LOG.info(
+                    "Could not allocate {}. Max total resource limitation <{}, {}> is reached.",
+                    pendingTaskManager,
+                    maxTotalCpu,
+                    maxTotalMem.toHumanReadableString());
+            return false;
+        }
+
+        addPendingTaskManager(pendingTaskManager);
+        return true;
+    }
+
+    // ---------------------------------------------------------------------------------------------
+    // Internal periodic check methods
+    // ---------------------------------------------------------------------------------------------
+
+    private void checkTaskManagerTimeouts() {
+        for (TaskManagerInfo timeoutTaskManager : getTimeOutTaskManagers()) {
+            if (waitResultConsumedBeforeRelease) {
+                releaseIdleTaskExecutorIfPossible(timeoutTaskManager);
+            } else {
+                releaseIdleTaskExecutor(timeoutTaskManager.getInstanceId());
+            }
+        }
+    }
+
+    private Collection<TaskManagerInfo> getTimeOutTaskManagers() {
+        long currentTime = System.currentTimeMillis();
+        return getRegisteredTaskManagers().stream()
+                .filter(
+                        taskManager ->
+                                taskManager.isIdle()
+                                        && currentTime - taskManager.getIdleSince()
+                                                >= taskManagerTimeout.toMilliseconds())
+                .collect(Collectors.toList());
+    }
+
+    private void releaseIdleTaskExecutorIfPossible(TaskManagerInfo taskManagerInfo) {
+        final long idleSince = taskManagerInfo.getIdleSince();
+        taskManagerInfo
+                .getTaskExecutorConnection()
+                .getTaskExecutorGateway()
+                .canBeReleased()
+                .thenAcceptAsync(
+                        canBeReleased -> {
+                            boolean stillIdle = idleSince == taskManagerInfo.getIdleSince();
+                            if (stillIdle && canBeReleased) {
+                                releaseIdleTaskExecutor(taskManagerInfo.getInstanceId());
+                            }
+                        },
+                        mainThreadExecutor);
+    }
+
+    private void releaseIdleTaskExecutor(InstanceID timedOutTaskManagerId) {
+        checkInit();
+        if (resourceAllocator.isSupported()) {
+            addUnWantedTaskManager(timedOutTaskManagerId);
+            declareNeededResourcesWithDelay();
+        }
+    }
+
+    private void addUnWantedTaskManager(InstanceID instanceId) {
+        final FineGrainedTaskManagerRegistration taskManager =
+                taskManagerRegistrations.get(instanceId);
+        if (taskManager != null) {
+            unWantedTaskManagers.put(
+                    instanceId,
+                    WorkerResourceSpec.fromTotalResourceProfile(
+                            taskManager.getTotalResource(),
+                            SlotManagerUtils.calculateDefaultNumSlots(
+                                    taskManager.getTotalResource(),
+                                    taskManager.getDefaultSlotResourceProfile())));
+        } else {
+            LOG.debug("Unwanted task manager {} does not exists.", instanceId);
+        }
+    }
+
+    @VisibleForTesting
+    public Map<InstanceID, WorkerResourceSpec> getUnWantedTaskManager() {
+        return unWantedTaskManagers;
+    }
+
+    void declareNeededResourcesWithDelay() {
+        Preconditions.checkState(resourceAllocator.isSupported());
+
+        if (declareNeededResourceDelay.toMillis() <= 0) {
+            declareNeededResources();
+        } else {
+            if (declareNeededResourceFuture == null || declareNeededResourceFuture.isDone()) {
+                declareNeededResourceFuture = new CompletableFuture<>();
+                scheduledExecutor.schedule(
+                        () ->
+                                mainThreadExecutor.execute(
+                                        () -> {
+                                            declareNeededResources();
+                                            Preconditions.checkNotNull(declareNeededResourceFuture)
+                                                    .complete(null);
+                                        }),
+                        declareNeededResourceDelay.toMillis(),
+                        TimeUnit.MILLISECONDS);
+            }
+        }
+    }
+
+    /** DO NOT call this method directly. Use {@link #declareNeededResourcesWithDelay()} instead. */
+    private void declareNeededResources() {

Review Comment:
   Maybe we also need to `checkInit` in this method as `resourceAllocator` is used here.
   
   Because this method was submitted to executor, even though `checkInit` at the caller does not guarantee that there is no problem.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTrackerTest.java:
##########
@@ -319,14 +435,118 @@ public void testGetStatistics() {
                 TASK_EXECUTOR_CONNECTION.getInstanceID(),
                 defaultSlotResource,
                 SlotState.ALLOCATED);
-        taskManagerTracker.addPendingTaskManager(
-                new PendingTaskManager(ResourceProfile.fromResources(4, 200), 1));
 
-        assertThat(taskManagerTracker.getFreeResource(), is(ResourceProfile.fromResources(6, 700)));
-        assertThat(taskManagerTracker.getRegisteredResource(), is(totalResource));
-        assertThat(taskManagerTracker.getNumberRegisteredSlots(), is(10));
-        assertThat(taskManagerTracker.getNumberFreeSlots(), is(8));
+        PendingTaskManager pendingTaskManager =
+                new PendingTaskManager(ResourceProfile.fromResources(4, 200), 1);
+        taskManagerTracker.allocateTaskManagersAccordingTo(
+                new ResourceAllocationResult.Builder()
+                        .addPendingTaskManagerAllocate(pendingTaskManager)
+                        .addAllocationOnPendingResource(
+                                jobId,
+                                pendingTaskManager.getPendingTaskManagerId(),
+                                ResourceProfile.fromResources(4, 200))
+                        .build());
+
+        assertThat(taskManagerTracker.getFreeResource())
+                .isEqualTo(ResourceProfile.fromResources(6, 700));
+        assertThat(taskManagerTracker.getRegisteredResource()).isEqualTo(totalResource);
+        assertThat(taskManagerTracker.getNumberRegisteredSlots()).isEqualTo(10);
+        assertThat(taskManagerTracker.getNumberFreeSlots()).isEqualTo(8);
+        assertThat(taskManagerTracker.getPendingResource())
+                .isEqualTo(ResourceProfile.fromResources(4, 200));
+    }
+
+    @Test
+    void testTimeoutForUnusedTaskManager() throws Exception {

Review Comment:
   It seems that `testTimeoutForUnusedTaskManager` in `FineGrainedSlotManagerTest` is not needed.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTracker.java:
##########
@@ -387,14 +520,187 @@ public ResourceProfile getPendingResource() {
         return totalPendingResource;
     }
 
-    @Override
-    public void clear() {
-        slots.clear();
-        taskManagerRegistrations.clear();
-        totalRegisteredResource = ResourceProfile.ZERO;
-        pendingTaskManagers.clear();
-        totalPendingResource = ResourceProfile.ZERO;
-        pendingSlotAllocationRecords.clear();
-        unWantedTaskManagers.clear();
+    // ---------------------------------------------------------------------------------------------
+    // Resource allocations.
+    // ---------------------------------------------------------------------------------------------
+
+    private Set<PendingTaskManagerId> allocateTaskManagersAccordingTo(
+            List<PendingTaskManager> pendingTaskManagers) {
+        final Set<PendingTaskManagerId> failedAllocations = new HashSet<>();
+        for (PendingTaskManager pendingTaskManager : pendingTaskManagers) {
+            if (!allocateResource(pendingTaskManager)) {
+                failedAllocations.add(pendingTaskManager.getPendingTaskManagerId());
+            }
+        }
+        return failedAllocations;
+    }
+
+    private boolean allocateResource(PendingTaskManager pendingTaskManager) {
+        checkInit();
+        Preconditions.checkState(resourceAllocator.isSupported());
+        if (isMaxTotalResourceExceededAfterAdding(pendingTaskManager.getTotalResourceProfile())) {
+            LOG.info(
+                    "Could not allocate {}. Max total resource limitation <{}, {}> is reached.",
+                    pendingTaskManager,
+                    maxTotalCpu,
+                    maxTotalMem.toHumanReadableString());
+            return false;
+        }
+
+        addPendingTaskManager(pendingTaskManager);
+        return true;
+    }
+
+    // ---------------------------------------------------------------------------------------------
+    // Internal periodic check methods
+    // ---------------------------------------------------------------------------------------------
+
+    private void checkTaskManagerTimeouts() {
+        for (TaskManagerInfo timeoutTaskManager : getTimeOutTaskManagers()) {
+            if (waitResultConsumedBeforeRelease) {
+                releaseIdleTaskExecutorIfPossible(timeoutTaskManager);
+            } else {
+                releaseIdleTaskExecutor(timeoutTaskManager.getInstanceId());
+            }
+        }
+    }
+
+    private Collection<TaskManagerInfo> getTimeOutTaskManagers() {
+        long currentTime = System.currentTimeMillis();
+        return getRegisteredTaskManagers().stream()
+                .filter(
+                        taskManager ->
+                                taskManager.isIdle()
+                                        && currentTime - taskManager.getIdleSince()
+                                                >= taskManagerTimeout.toMilliseconds())
+                .collect(Collectors.toList());
+    }
+
+    private void releaseIdleTaskExecutorIfPossible(TaskManagerInfo taskManagerInfo) {
+        final long idleSince = taskManagerInfo.getIdleSince();
+        taskManagerInfo
+                .getTaskExecutorConnection()
+                .getTaskExecutorGateway()
+                .canBeReleased()
+                .thenAcceptAsync(
+                        canBeReleased -> {
+                            boolean stillIdle = idleSince == taskManagerInfo.getIdleSince();
+                            if (stillIdle && canBeReleased) {
+                                releaseIdleTaskExecutor(taskManagerInfo.getInstanceId());
+                            }
+                        },
+                        mainThreadExecutor);
+    }
+
+    private void releaseIdleTaskExecutor(InstanceID timedOutTaskManagerId) {
+        checkInit();
+        if (resourceAllocator.isSupported()) {
+            addUnWantedTaskManager(timedOutTaskManagerId);
+            declareNeededResourcesWithDelay();
+        }
+    }
+
+    private void addUnWantedTaskManager(InstanceID instanceId) {
+        final FineGrainedTaskManagerRegistration taskManager =
+                taskManagerRegistrations.get(instanceId);
+        if (taskManager != null) {
+            unWantedTaskManagers.put(
+                    instanceId,
+                    WorkerResourceSpec.fromTotalResourceProfile(
+                            taskManager.getTotalResource(),
+                            SlotManagerUtils.calculateDefaultNumSlots(
+                                    taskManager.getTotalResource(),
+                                    taskManager.getDefaultSlotResourceProfile())));
+        } else {
+            LOG.debug("Unwanted task manager {} does not exists.", instanceId);
+        }
+    }
+
+    @VisibleForTesting
+    public Map<InstanceID, WorkerResourceSpec> getUnWantedTaskManager() {

Review Comment:
   ```suggestion
       private Map<InstanceID, WorkerResourceSpec> getUnWantedTaskManager() {
   ```
   I believe this can be private if we do not depend on this method in test.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceDeclaration.java:
##########
@@ -51,7 +51,7 @@ public int getNumNeeded() {
         return numNeeded;
     }
 
-    public Collection<InstanceID> getUnwantedWorkers() {
+    public Set<InstanceID> getUnwantedWorkers() {

Review Comment:
   What is the purpose of this change? It is best to add some description in the commit message.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTrackerTest.java:
##########
@@ -319,14 +435,118 @@ public void testGetStatistics() {
                 TASK_EXECUTOR_CONNECTION.getInstanceID(),
                 defaultSlotResource,
                 SlotState.ALLOCATED);
-        taskManagerTracker.addPendingTaskManager(
-                new PendingTaskManager(ResourceProfile.fromResources(4, 200), 1));
 
-        assertThat(taskManagerTracker.getFreeResource(), is(ResourceProfile.fromResources(6, 700)));
-        assertThat(taskManagerTracker.getRegisteredResource(), is(totalResource));
-        assertThat(taskManagerTracker.getNumberRegisteredSlots(), is(10));
-        assertThat(taskManagerTracker.getNumberFreeSlots(), is(8));
+        PendingTaskManager pendingTaskManager =
+                new PendingTaskManager(ResourceProfile.fromResources(4, 200), 1);
+        taskManagerTracker.allocateTaskManagersAccordingTo(
+                new ResourceAllocationResult.Builder()
+                        .addPendingTaskManagerAllocate(pendingTaskManager)
+                        .addAllocationOnPendingResource(
+                                jobId,
+                                pendingTaskManager.getPendingTaskManagerId(),
+                                ResourceProfile.fromResources(4, 200))
+                        .build());
+
+        assertThat(taskManagerTracker.getFreeResource())
+                .isEqualTo(ResourceProfile.fromResources(6, 700));
+        assertThat(taskManagerTracker.getRegisteredResource()).isEqualTo(totalResource);
+        assertThat(taskManagerTracker.getNumberRegisteredSlots()).isEqualTo(10);
+        assertThat(taskManagerTracker.getNumberFreeSlots()).isEqualTo(8);
+        assertThat(taskManagerTracker.getPendingResource())
+                .isEqualTo(ResourceProfile.fromResources(4, 200));
+    }
+
+    @Test
+    void testTimeoutForUnusedTaskManager() throws Exception {
+        final Time taskManagerTimeout = Time.milliseconds(50L);
+
+        final CompletableFuture<InstanceID> releaseResourceFuture = new CompletableFuture<>();
+
+        ResourceAllocator resourceAllocator =
+                new TestingResourceAllocatorBuilder()
+                        .setDeclareResourceNeededConsumer(
+                                (resourceDeclarations) -> {
+                                    assertThat(resourceDeclarations.size()).isEqualTo(1);
+                                    ResourceDeclaration resourceDeclaration =
+                                            resourceDeclarations.iterator().next();
+                                    assertThat(resourceDeclaration.getNumNeeded()).isEqualTo(0);
+                                    assertThat(resourceDeclaration.getUnwantedWorkers().size())
+                                            .isEqualTo(1);
+
+                                    releaseResourceFuture.complete(
+                                            resourceDeclaration
+                                                    .getUnwantedWorkers()
+                                                    .iterator()
+                                                    .next());
+                                })
+                        .build();
+
+        FineGrainedTaskManagerTracker taskManagerTracker =
+                new FineGrainedTaskManagerTrackerBuilder(
+                                new ScheduledExecutorServiceAdapter(
+                                        EXECUTOR_RESOURCE.getExecutor()))
+                        .setTaskManagerTimeout(taskManagerTimeout)
+                        .build();
+        taskManagerTracker.initialize(resourceAllocator, EXECUTOR_RESOURCE.getExecutor());
+
+        taskManagerTracker.registerTaskManager(
+                TASK_EXECUTOR_CONNECTION,
+                DEFAULT_TOTAL_RESOURCE_PROFILE,
+                DEFAULT_SLOT_RESOURCE_PROFILE,
+                null);
+
+        AllocationID allocationID = new AllocationID();
+        JobID jobID = new JobID();
+        taskManagerTracker.notifySlotStatus(
+                allocationID,
+                jobID,
+                TASK_EXECUTOR_CONNECTION.getInstanceID(),
+                ResourceProfile.fromResources(3, 200),
+                SlotState.ALLOCATED);
+
+        assertThat(
+                        taskManagerTracker.getRegisteredTaskManager(
+                                TASK_EXECUTOR_CONNECTION.getInstanceID()))
+                .hasValueSatisfying(
+                        taskManagerInfo ->
+                                assertThat(taskManagerInfo.getIdleSince())
+                                        .isEqualTo(Long.MAX_VALUE));
+
+        taskManagerTracker.notifySlotStatus(
+                allocationID,
+                jobID,
+                TASK_EXECUTOR_CONNECTION.getInstanceID(),
+                ResourceProfile.fromResources(3, 200),
+                SlotState.FREE);
+
         assertThat(
-                taskManagerTracker.getPendingResource(), is(ResourceProfile.fromResources(4, 200)));
+                        taskManagerTracker.getRegisteredTaskManager(
+                                TASK_EXECUTOR_CONNECTION.getInstanceID()))
+                .hasValueSatisfying(
+                        taskManagerInfo ->
+                                assertThat(taskManagerInfo.getIdleSince())
+                                        .isNotEqualTo(Long.MAX_VALUE));
+
+        assertThat(releaseResourceFuture.get(1, TimeUnit.SECONDS))
+                .isEqualTo(TASK_EXECUTOR_CONNECTION.getInstanceID());
+        assertThat(taskManagerTracker.getUnWantedTaskManager())

Review Comment:
   IMO, We should try to keep our test is a black box, and if there are other approach that can verify the expected behavior, try not to introduce `VisibleForTesting`. 
   
   As this case, the following assertion is enough to check the `unwantedTaskManager` is expected, right? 
   
   ```
   assertThat(releaseResourceFuture.get(1, TimeUnit.SECONDS))
        .isEqualTo(TASK_EXECUTOR_CONNECTION.getInstanceID());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] huwh commented on a diff in pull request #22196: [FLINK-31445][runtime]Split resource allocate/release related logic from FineGrainedSlotManager to TaskManagerTracker

Posted by "huwh (via GitHub)" <gi...@apache.org>.
huwh commented on code in PR #22196:
URL: https://github.com/apache/flink/pull/22196#discussion_r1143240702


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceDeclaration.java:
##########
@@ -51,7 +51,7 @@ public int getNumNeeded() {
         return numNeeded;
     }
 
-    public Collection<InstanceID> getUnwantedWorkers() {
+    public Set<InstanceID> getUnwantedWorkers() {

Review Comment:
   unmodifiableCollection does not implement equals method. This make unit test hard to compare whether ResourceDeclaration is the same value.
   
   also updated in commit message



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] huwh commented on a diff in pull request #22196: [FLINK-31445][runtime]Split resource allocate/release related logic from FineGrainedSlotManager to TaskManagerTracker

Posted by "huwh (via GitHub)" <gi...@apache.org>.
huwh commented on code in PR #22196:
URL: https://github.com/apache/flink/pull/22196#discussion_r1142891394


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java:
##########
@@ -83,7 +83,13 @@ private static SlotManager createSlotManager(
                     slotManagerConfiguration,
                     slotManagerMetricGroup,
                     new DefaultResourceTracker(),
-                    new FineGrainedTaskManagerTracker(),
+                    new FineGrainedTaskManagerTracker(
+                            slotManagerConfiguration.getMaxTotalCpu(),
+                            slotManagerConfiguration.getMaxTotalMem(),
+                            slotManagerConfiguration.isWaitResultConsumedBeforeRelease(),
+                            slotManagerConfiguration.getTaskManagerTimeout(),
+                            slotManagerConfiguration.getDeclareNeededResourceDelay(),

Review Comment:
   Good idea, but these configuration are used both in DeclarativeSlotManager and FineGrainedSlotManager. How about create a new issue to do this after we remove DeclarativeSlotManager?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22196: [FLINK-31445][runtime]Split resource allocate/release related logic from FineGrainedSlotManager to TaskManagerTracker

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22196:
URL: https://github.com/apache/flink/pull/22196#discussion_r1141829794


##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTrackerTest.java:
##########
@@ -18,144 +18,191 @@
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
 import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.runtime.util.ResourceCounter;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
+import java.util.Collection;
 import java.util.Collections;
-import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
-import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 /** Tests for the {@link FineGrainedTaskManagerTracker}. */
-public class FineGrainedTaskManagerTrackerTest extends TestLogger {
+class FineGrainedTaskManagerTrackerTest {
     private static final TaskExecutorConnection TASK_EXECUTOR_CONNECTION =
             new TaskExecutorConnection(
                     ResourceID.generate(),
                     new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
 
+    static final WorkerResourceSpec DEFAULT_WORKER_RESOURCE_SPEC =
+            new WorkerResourceSpec.Builder()
+                    .setCpuCores(10.0)
+                    .setTaskHeapMemoryMB(1000)
+                    .setTaskOffHeapMemoryMB(1000)
+                    .setNetworkMemoryMB(1000)
+                    .setManagedMemoryMB(1000)
+                    .build();
+    static final int DEFAULT_NUM_SLOTS_PER_WORKER = 2;
+    static final ResourceProfile DEFAULT_TOTAL_RESOURCE_PROFILE =
+            SlotManagerUtils.generateTaskManagerTotalResourceProfile(DEFAULT_WORKER_RESOURCE_SPEC);
+    static final ResourceProfile DEFAULT_SLOT_RESOURCE_PROFILE =
+            SlotManagerUtils.generateDefaultSlotResourceProfile(
+                    DEFAULT_WORKER_RESOURCE_SPEC, DEFAULT_NUM_SLOTS_PER_WORKER);
+
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
+
     @Test
-    public void testInitState() {
-        final FineGrainedTaskManagerTracker taskManagerTracker =
-                new FineGrainedTaskManagerTracker();
-        assertThat(taskManagerTracker.getPendingTaskManagers(), is(empty()));
-        assertThat(taskManagerTracker.getRegisteredTaskManagers(), is(empty()));
+    void testInitState() {
+        final FineGrainedTaskManagerTracker taskManagerTracker = createAndStartTaskManagerTracker();
+        assertThat(taskManagerTracker.getPendingTaskManagers()).isEmpty();
+        assertThat(taskManagerTracker.getRegisteredTaskManagers()).isEmpty();
     }
 
     @Test
-    public void testAddAndRemoveTaskManager() {
-        final FineGrainedTaskManagerTracker taskManagerTracker =
-                new FineGrainedTaskManagerTracker();
+    void testAllocateTaskManagersAccordingToResultWithNoResourceAllocator() {
+        final FineGrainedTaskManagerTracker taskManagerTracker = createTaskManagerTracker();
 
-        // Add task manager
-        taskManagerTracker.addTaskManager(
-                TASK_EXECUTOR_CONNECTION, ResourceProfile.ANY, ResourceProfile.ANY);
-        assertThat(taskManagerTracker.getRegisteredTaskManagers().size(), is(1));
-        assertTrue(
-                taskManagerTracker
-                        .getRegisteredTaskManager(TASK_EXECUTOR_CONNECTION.getInstanceID())
-                        .isPresent());
+        taskManagerTracker.initialize(
+                NonSupportedResourceAllocatorImpl.INSTANCE, EXECUTOR_RESOURCE.getExecutor());
 
-        // Remove task manager
-        taskManagerTracker.removeTaskManager(TASK_EXECUTOR_CONNECTION.getInstanceID());
-        assertThat(taskManagerTracker.getRegisteredTaskManagers().size(), is(0));
+        PendingTaskManager pendingTaskManager =
+                new PendingTaskManager(DEFAULT_TOTAL_RESOURCE_PROFILE, 1);
+        ResourceAllocationResult result =
+                new ResourceAllocationResult.Builder()
+                        .addPendingTaskManagerAllocate(pendingTaskManager)
+                        .addAllocationOnPendingResource(
+                                new JobID(),
+                                pendingTaskManager.getPendingTaskManagerId(),
+                                DEFAULT_SLOT_RESOURCE_PROFILE)
+                        .build();
+
+        Set<PendingTaskManagerId> failedAllocations =
+                taskManagerTracker.allocateTaskManagersAccordingTo(result);
+        assertThat(failedAllocations).containsExactly(pendingTaskManager.getPendingTaskManagerId());
     }
 
-    @Test(expected = NullPointerException.class)
-    public void testRemoveUnknownTaskManager() {
-        final FineGrainedTaskManagerTracker taskManagerTracker =
-                new FineGrainedTaskManagerTracker();
+    @Test
+    void testAllocateTaskManagersAccordingToResult() {
+        final FineGrainedTaskManagerTracker taskManagerTracker = createTaskManagerTracker();
+        CompletableFuture<ResourceDeclaration> declarationFuture = new CompletableFuture<>();
+        Consumer<Collection<ResourceDeclaration>> declareResourceNeededConsumer =
+                resourceDeclarations -> {
+                    assertThat(resourceDeclarations).hasSize(1);
+                    declarationFuture.complete(resourceDeclarations.stream().findFirst().get());
+                };
+
+        PendingTaskManager pendingTaskManager =
+                new PendingTaskManager(DEFAULT_TOTAL_RESOURCE_PROFILE, 1);
+        taskManagerTracker.initialize(
+                new TestingResourceAllocatorBuilder()
+                        .setDeclareResourceNeededConsumer(declareResourceNeededConsumer)
+                        .build(),
+                EXECUTOR_RESOURCE.getExecutor());
+        ResourceAllocationResult result =
+                new ResourceAllocationResult.Builder()
+                        .addPendingTaskManagerAllocate(pendingTaskManager)
+                        .addAllocationOnPendingResource(
+                                new JobID(),
+                                pendingTaskManager.getPendingTaskManagerId(),
+                                DEFAULT_SLOT_RESOURCE_PROFILE)
+                        .build();
+
+        Set<PendingTaskManagerId> failedAllocations =
+                taskManagerTracker.allocateTaskManagersAccordingTo(result);
+        assertThat(failedAllocations).isEmpty();
+        assertThat(declarationFuture)
+                .isCompletedWithValue(
+                        new ResourceDeclaration(
+                                DEFAULT_WORKER_RESOURCE_SPEC, 1, Collections.emptySet()));
+        assertThat(taskManagerTracker.getPendingTaskManagers()).hasSize(1);
 
-        taskManagerTracker.removeTaskManager(new InstanceID());
+        boolean registered =
+                taskManagerTracker.registerTaskManager(
+                        TASK_EXECUTOR_CONNECTION,
+                        DEFAULT_TOTAL_RESOURCE_PROFILE,
+                        DEFAULT_SLOT_RESOURCE_PROFILE,
+                        pendingTaskManager.getPendingTaskManagerId());
+        assertThat(registered).isTrue();
+        assertThat(taskManagerTracker.getPendingTaskManagers()).hasSize(0);
     }
 
     @Test
-    public void testAddAndRemovePendingTaskManager() {
-        final PendingTaskManager pendingTaskManager =
-                new PendingTaskManager(ResourceProfile.ANY, 1);
-        final FineGrainedTaskManagerTracker taskManagerTracker =
-                new FineGrainedTaskManagerTracker();
-        final JobID jobId = new JobID();
-        final ResourceCounter resourceCounter =
-                ResourceCounter.withResource(ResourceProfile.ANY, 1);
-
-        // Add pending task manager
-        taskManagerTracker.addPendingTaskManager(pendingTaskManager);
-        taskManagerTracker.replaceAllPendingAllocations(
-                Collections.singletonMap(
-                        pendingTaskManager.getPendingTaskManagerId(),
-                        Collections.singletonMap(jobId, resourceCounter)));
-        assertThat(taskManagerTracker.getPendingTaskManagers().size(), is(1));
-        assertThat(
-                taskManagerTracker
-                        .getPendingTaskManagersByTotalAndDefaultSlotResourceProfile(
-                                ResourceProfile.ANY, ResourceProfile.ANY)
-                        .size(),
-                is(1));
-
-        // Remove pending task manager
-        final Map<JobID, ResourceCounter> records =
-                taskManagerTracker.removePendingTaskManager(
-                        pendingTaskManager.getPendingTaskManagerId());
-        assertThat(taskManagerTracker.getPendingTaskManagers(), is(empty()));
-        assertThat(
-                taskManagerTracker
-                        .getPendingAllocationsOfPendingTaskManager(
-                                pendingTaskManager.getPendingTaskManagerId())
-                        .size(),
-                is(0));
+    void testRegisterAndUnregisterTaskManager() {
+        final FineGrainedTaskManagerTracker taskManagerTracker = createAndStartTaskManagerTracker();
+
+        // Add task manager
+        taskManagerTracker.registerTaskManager(
+                TASK_EXECUTOR_CONNECTION,
+                DEFAULT_TOTAL_RESOURCE_PROFILE,
+                DEFAULT_SLOT_RESOURCE_PROFILE,
+                null);
+        assertThat(taskManagerTracker.getRegisteredTaskManagers()).hasSize(1);
         assertThat(
-                taskManagerTracker
-                        .getPendingTaskManagersByTotalAndDefaultSlotResourceProfile(
-                                ResourceProfile.ANY, ResourceProfile.ANY)
-                        .size(),
-                is(0));
-        assertTrue(records.containsKey(jobId));
-        assertThat(records.get(jobId).getResourceCount(ResourceProfile.ANY), is(1));
+                        taskManagerTracker.getRegisteredTaskManager(
+                                TASK_EXECUTOR_CONNECTION.getInstanceID()))
+                .isPresent();
+
+        // Remove task manager
+        taskManagerTracker.unregisterTaskManager(TASK_EXECUTOR_CONNECTION.getInstanceID());
+        assertThat(taskManagerTracker.getRegisteredTaskManagers()).isEmpty();
     }
 
-    @Test(expected = NullPointerException.class)
-    public void testRemoveUnknownPendingTaskManager() {
-        final FineGrainedTaskManagerTracker taskManagerTracker =
-                new FineGrainedTaskManagerTracker();
+    @Test
+    void testUnregisterUnknownTaskManager() {
+        assertThrows(

Review Comment:
   Please use Assertions.assertThatThrowBy in assertJ.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncerTest.java:
##########
@@ -33,43 +33,39 @@
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
 import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
 
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.util.Arrays;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
 
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.core.Is.is;
-import static org.hamcrest.core.IsEqual.equalTo;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;

Review Comment:
   ```suggestion
   
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerTracker.java:
##########
@@ -21,63 +21,62 @@
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
 import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
-import org.apache.flink.runtime.util.ResourceCounter;
 
-import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
 
 /** Tracks TaskManager's resource and slot status. */
 interface TaskManagerTracker
         extends TaskManagerResourceInfoProvider, ClusterResourceStatisticsProvider {
 
     // ---------------------------------------------------------------------------------------------
-    // Add / Remove (pending) Resource
+    // initialize
     // ---------------------------------------------------------------------------------------------
 
     /**
-     * Register a new task manager.
+     * Initialize the TaskManagerTracker.
      *
-     * @param taskExecutorConnection of the new task manager
-     * @param totalResourceProfile of the new task manager
-     * @param defaultSlotResourceProfile of the new task manager
+     * @param resourceAllocator to use for resource (de-)allocations
+     * @param mainThreadExecutor to use to run code in the ResourceManager's main thread
      */
-    void addTaskManager(
-            TaskExecutorConnection taskExecutorConnection,
-            ResourceProfile totalResourceProfile,
-            ResourceProfile defaultSlotResourceProfile);
+    void initialize(ResourceAllocator resourceAllocator, Executor mainThreadExecutor);
 
-    /**
-     * Unregister a task manager with the given instance id.
-     *
-     * @param instanceId of the task manager
-     */
-    void removeTaskManager(InstanceID instanceId);
+    /** Removes all state from the tracker. */
+    void close();

Review Comment:
   Why this methods belong to `the comments of initialize`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java:
##########
@@ -83,7 +83,13 @@ private static SlotManager createSlotManager(
                     slotManagerConfiguration,
                     slotManagerMetricGroup,
                     new DefaultResourceTracker(),
-                    new FineGrainedTaskManagerTracker(),
+                    new FineGrainedTaskManagerTracker(
+                            slotManagerConfiguration.getMaxTotalCpu(),
+                            slotManagerConfiguration.getMaxTotalMem(),
+                            slotManagerConfiguration.isWaitResultConsumedBeforeRelease(),
+                            slotManagerConfiguration.getTaskManagerTimeout(),
+                            slotManagerConfiguration.getDeclareNeededResourceDelay(),

Review Comment:
   How about introduce `TaskManagerTrackerConfiguration` and derive it from `SlotManagerConfiguration`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##########
@@ -212,21 +171,13 @@ public void start(
 
         resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
         mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
-        resourceAllocator = Preconditions.checkNotNull(newResourceAllocator);
         resourceEventListener = Preconditions.checkNotNull(newResourceEventListener);
         slotStatusSyncer.initialize(
                 taskManagerTracker, resourceTracker, resourceManagerId, mainThreadExecutor);
         blockedTaskManagerChecker = Preconditions.checkNotNull(newBlockedTaskManagerChecker);
+        taskManagerTracker.initialize(newResourceAllocator, mainThreadExecutor);

Review Comment:
   ```suggestion
           taskManagerTracker.initialize(Preconditions.checkNotNull(newResourceAllocator), mainThreadExecutor);       
   ```
   We'd better do this check before pass parameter, otherwise we have to rely on all implementations.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java:
##########
@@ -129,19 +129,20 @@ static <T> T assertFutureCompleteAndReturn(CompletableFuture<T> completableFutur
     }
 
     static void assertFutureNotComplete(CompletableFuture<?> completableFuture) throws Exception {
-        assertThatThrownBy(
-                        () ->
-                                completableFuture.get(
-                                        FUTURE_EXPECT_TIMEOUT_MS, TimeUnit.MILLISECONDS),
-                        "Expected to fail with a timeout.")
-                .isInstanceOf(TimeoutException.class);
+        assertThrows(

Review Comment:
   Why did you make this change? The original usage is correct.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncerTest.java:
##########
@@ -281,23 +263,34 @@ public void testSlotStatusProcessing() {
         // allocationId1 should still be allocated; allocationId2 should be freed; allocationId3
         // should continue to be in a pending state;
         slotStatusSyncer.reportSlotStatus(taskExecutorConnection.getInstanceID(), slotReport2);
+        assertThat(resourceTracker.getAcquiredResources(jobId))
+                .contains(ResourceRequirement.create(resource, 2));
         assertThat(
-                resourceTracker.getAcquiredResources(jobId),
-                contains(ResourceRequirement.create(resource, 2)));
-        assertThat(
-                taskManagerTracker
-                        .getRegisteredTaskManager(taskExecutorConnection.getInstanceID())
-                        .get()
-                        .getAvailableResource(),
-                equalTo(ResourceProfile.fromResources(3, 12)));
-        assertTrue(taskManagerTracker.getAllocatedOrPendingSlot(allocationId1).isPresent());
-        assertFalse(taskManagerTracker.getAllocatedOrPendingSlot(allocationId2).isPresent());
-        assertTrue(taskManagerTracker.getAllocatedOrPendingSlot(allocationId3).isPresent());
-        assertThat(
-                taskManagerTracker.getAllocatedOrPendingSlot(allocationId1).get().getState(),
-                is(SlotState.ALLOCATED));
-        assertThat(
-                taskManagerTracker.getAllocatedOrPendingSlot(allocationId3).get().getState(),
-                is(SlotState.PENDING));
+                        taskManagerTracker.getRegisteredTaskManager(
+                                taskExecutorConnection.getInstanceID()))
+                .hasValueSatisfying(
+                        taskManagerInfo ->
+                                assertThat(taskManagerInfo.getAvailableResource())
+                                        .isEqualTo(ResourceProfile.fromResources(3, 12)));
+        assertThat(taskManagerTracker.getAllocatedOrPendingSlot(allocationId1)).isPresent();
+        assertThat(taskManagerTracker.getAllocatedOrPendingSlot(allocationId2)).isNotPresent();
+        assertThat(taskManagerTracker.getAllocatedOrPendingSlot(allocationId3)).isPresent();
+        assertThat(taskManagerTracker.getAllocatedOrPendingSlot(allocationId1))
+                .hasValueSatisfying(
+                        slot -> assertThat(slot.getState()).isEqualTo(SlotState.ALLOCATED));

Review Comment:
   These have ensured that the option presents a value, so `assertThat(taskManagerTracker.getAllocatedOrPendingSlot(allocationId1)).isPresent()` can be safely removed.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerTracker.java:
##########
@@ -21,63 +21,62 @@
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
 import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
-import org.apache.flink.runtime.util.ResourceCounter;
 
-import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
 
 /** Tracks TaskManager's resource and slot status. */

Review Comment:
   We need to describe this class in more detail, similar to the `TaskExecutorManager`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org