You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "huwh (via GitHub)" <gi...@apache.org> on 2023/03/30 05:13:32 UTC

[GitHub] [flink] huwh opened a new pull request, #22305: [FLINK-31443][runtime] Maintain redundant taskmanagers to speed up failover

huwh opened a new pull request, #22305:
URL: https://github.com/apache/flink/pull/22305

   ## What is the purpose of the change
   implementation of [FLINK-18625](https://issues.apache.org/jira/browse/FLINK-18625) in FineGrainedSlotManager.
   
   
   ## Brief change log
     - *check that redundancy requirements are fulfilled periodically*
   
   
   ## Verifying this change
     - *Added unit test: FineGrainedSlotManagerTest#testRedundantTaskManagers*
     - *Manually verified the change by running a session cluster on Kubernetes. No redundant task manager will request at first, after submit a job, one redundant task manager will be requested.*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] huwh commented on pull request #22305: [FLINK-31443][runtime] Maintain redundant taskmanagers to speed up failover

Posted by "huwh (via GitHub)" <gi...@apache.org>.
huwh commented on PR #22305:
URL: https://github.com/apache/flink/pull/22305#issuecomment-1507864516

   Thanks @xintongsong and @reswqa for the review. 
   I have updated this pr as suggested


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] huwh commented on a diff in pull request #22305: [FLINK-31443][runtime] Maintain redundant taskmanagers to speed up failover

Posted by "huwh (via GitHub)" <gi...@apache.org>.
huwh commented on code in PR #22305:
URL: https://github.com/apache/flink/pull/22305#discussion_r1159241282


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/RequiredRedundantResource.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.util.Preconditions;
+
+/** Immutable profile of the required redundant resources. */
+public final class RequiredRedundantResource {
+    private final CPUResource cpuResource;
+    private final MemorySize memorySize;
+
+    public RequiredRedundantResource(CPUResource cpuResource, MemorySize memorySize) {
+        this.cpuResource = Preconditions.checkNotNull(cpuResource);
+        this.memorySize = Preconditions.checkNotNull(memorySize);

Review Comment:
   Good suggestion.
   
   It makes more sense to consider subdivision resource from a semantic point of view.
   
   Then I don't need to introduce a new RequiredRedundantResource to manage the use of redundant resources. Instead, using ResourceProfile is sufficient
   
   Looking forward to your opinion as well @xintongsong 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] huwh commented on pull request #22305: [FLINK-31443][runtime] Maintain redundant taskmanagers to speed up failover

Posted by "huwh (via GitHub)" <gi...@apache.org>.
huwh commented on PR #22305:
URL: https://github.com/apache/flink/pull/22305#issuecomment-1535778467

   @xintongsong Thanks for review, comments addressed. PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-31443][runtime] Maintain redundant taskmanagers to speed up failover [flink]

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22305:
URL: https://github.com/apache/flink/pull/22305#discussion_r1514700523


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##########
@@ -623,6 +622,7 @@ private void checkResourceRequirements() {
             if (resourceAllocator.isSupported()
                     && !taskManagerTracker.getPendingTaskManagers().isEmpty()) {
                 taskManagerTracker.replaceAllPendingAllocations(Collections.emptyMap());
+                checkTaskManagerReleasable();

Review Comment:
   I created FLINK-34588 to cover this issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong closed pull request #22305: [FLINK-31443][runtime] Maintain redundant taskmanagers to speed up failover

Posted by "xintongsong (via GitHub)" <gi...@apache.org>.
xintongsong closed pull request #22305: [FLINK-31443][runtime] Maintain redundant taskmanagers to speed up failover
URL: https://github.com/apache/flink/pull/22305


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-31443][runtime] Maintain redundant taskmanagers to speed up failover [flink]

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22305:
URL: https://github.com/apache/flink/pull/22305#discussion_r1514669497


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##########
@@ -623,6 +622,7 @@ private void checkResourceRequirements() {
             if (resourceAllocator.isSupported()
                     && !taskManagerTracker.getPendingTaskManagers().isEmpty()) {
                 taskManagerTracker.replaceAllPendingAllocations(Collections.emptyMap());
+                checkTaskManagerReleasable();

Review Comment:
   @huwh `checkTaskManagerReleasable()` returns a boolean value that is not verified in any way? Is this intentionally done? The same applies to [line 680](https://github.com/apache/flink/pull/22305/files#diff-76ee49bc1a8829c2944084c64b738ed35e2db217f0981de435fef04816952be6R680).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #22305: [FLINK-31443][runtime] Maintain redundant taskmanagers to speed up failover

Posted by "xintongsong (via GitHub)" <gi...@apache.org>.
xintongsong commented on code in PR #22305:
URL: https://github.com/apache/flink/pull/22305#discussion_r1168233660


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##########
@@ -315,6 +318,7 @@ public void processResourceRequirements(ResourceRequirements resourceRequirement
             jobMasterTargetAddresses.remove(resourceRequirements.getJobId());
             if (resourceAllocator.isSupported()) {
                 taskManagerTracker.clearPendingAllocationsOfJob(resourceRequirements.getJobId());
+                checkTaskManagerReleasable();

Review Comment:
   This is probably not necessary, because `checkResourceRequirementsWithDelay` will be called anyway by the end of this method.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -236,6 +323,52 @@ && canFulfillRequirement(effectiveProfile, remainResource)) {
         }
     }
 
+    private void tryFulFillRedundantResources(
+            ResourceProfile requiredRedundantResource,
+            List<InternalResourceInfo> availableRegisteredResources,
+            List<InternalResourceInfo> availablePendingResources,
+            ResourceAllocationResult.Builder resultBuilder) {
+        ResourceProfile totalAvailableResources =
+                Stream.concat(
+                                availableRegisteredResources.stream(),
+                                availablePendingResources.stream())
+                        .map(internalResourceInfo -> internalResourceInfo.availableProfile)
+                        .reduce(ResourceProfile.ZERO, ResourceProfile::merge);
+
+        while (!canFulfillRequirement(requiredRedundantResource, totalAvailableResources)) {
+            PendingTaskManager pendingTaskManager =
+                    new PendingTaskManager(totalResourceProfile, numSlotsPerWorker);
+            resultBuilder.addPendingTaskManagerAllocate(pendingTaskManager);
+            totalAvailableResources = totalAvailableResources.merge(totalResourceProfile);
+        }
+    }
+
+    private ResourceProfile getAvailableResourceOfTaskManagers(List<TaskManagerInfo> taskManagers) {
+        return taskManagers.stream()
+                .map(TaskManagerInfo::getAvailableResource)
+                .reduce(ResourceProfile.ZERO, ResourceProfile::merge);
+    }
+
+    private ResourceProfile getAvailableResourceOfPendingTaskManagers(
+            List<PendingTaskManager> pendingTaskManagers,
+            TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
+        return pendingTaskManagers.stream()
+                .map(
+                        pendingTaskManager -> {
+                            ResourceProfile usedResource =
+                                    taskManagerResourceInfoProvider
+                                            .getPendingAllocationsOfPendingTaskManager(
+                                                    pendingTaskManager.getPendingTaskManagerId())
+                                            .values().stream()
+                                            .map(ResourceCounter::getTotalResource)
+                                            .reduce(ResourceProfile.ZERO, ResourceProfile::merge);

Review Comment:
   This is expensive. We probably should maintain the available resources, or total allocated resources, in `PendingTaskManager`, like what we do with `FineGrainedTaskManagerRegistration`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java:
##########
@@ -46,4 +46,15 @@ ResourceAllocationResult tryFulfillRequirements(
             Map<JobID, Collection<ResourceRequirement>> missingResources,
             TaskManagerResourceInfoProvider taskManagerResourceInfoProvider,
             BlockedTaskManagerChecker blockedTaskManagerChecker);
+
+    /**
+     * Try to make a release decision to release useless PendingTaskManagers and TaskManagers.
+     *
+     * @param taskManagerResourceInfoProvider provide the registered/pending resources of the
+     *     current cluster
+     * @return a {@link ResourceReleaseResult} based on the current status, which contains the
+     *     actions to take
+     */
+    ResourceReleaseResult tryReleaseUselessResources(

Review Comment:
   I think we should explain how this is different from `tryFulfillRequirements` in JavaDoc. I.e., only consider empty registered / pending workers, assume all requirements are fulfilled by registered / pending workers, more light weighted, etc.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java:
##########
@@ -46,4 +46,15 @@ ResourceAllocationResult tryFulfillRequirements(
             Map<JobID, Collection<ResourceRequirement>> missingResources,
             TaskManagerResourceInfoProvider taskManagerResourceInfoProvider,
             BlockedTaskManagerChecker blockedTaskManagerChecker);
+
+    /**
+     * Try to make a release decision to release useless PendingTaskManagers and TaskManagers.
+     *
+     * @param taskManagerResourceInfoProvider provide the registered/pending resources of the
+     *     current cluster
+     * @return a {@link ResourceReleaseResult} based on the current status, which contains the
+     *     actions to take
+     */
+    ResourceReleaseResult tryReleaseUselessResources(

Review Comment:
   ```suggestion
       ResourceReleaseResult tryReleaseUnusedResources(
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##########
@@ -288,6 +290,7 @@ public void clearResourceRequirements(JobID jobId) {
         resourceTracker.notifyResourceRequirements(jobId, Collections.emptyList());
         if (resourceAllocator.isSupported()) {
             taskManagerTracker.clearPendingAllocationsOfJob(jobId);
+            checkTaskManagerReleasable();

Review Comment:
   It's a bit unclear to me what is the relationship between `checkTaskManagerReleasable` and `declareNeededResourcesWithDelay`.
   - The pattern appears 3 times that `declareNeededResourcesWithDelay` is called right after `checkTaskManagerReleasable`.
   - Inside `checkTaskManagerReleasable`, it also calls `declareNeededResourcesWithDelay`, in the if branch and in `releaseIdleTaskExecutor`.
   
   Would it be better that `checkTaskManagerReleasable` only mark releasable workers as unwanted, and make the declaration afterwards. That would make `releaseIdleTaskExecutorIfPossible` the only exception where we would need a declaration after the `canBeRelease` future completes.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -110,9 +122,84 @@ public ResourceAllocationResult tryFulfillRequirements(
                         jobId, unfulfilledJobRequirements, pendingResources, resultBuilder);
             }
         }
+
+        tryFulFillRedundantResources(
+                totalResourceProfile.multiply(redundantTaskManagerNum),
+                registeredResources,
+                pendingResources,
+                resultBuilder);
+
         return resultBuilder.build();
     }
 
+    @Override
+    public ResourceReleaseResult tryReleaseUselessResources(
+            TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
+        List<TaskManagerInfo> taskManagersIdleTimeout = new ArrayList<>();
+        List<TaskManagerInfo> taskManagersNonTimeout = new ArrayList<>();
+        long currentTime = System.currentTimeMillis();
+        taskManagerResourceInfoProvider
+                .getRegisteredTaskManagers()
+                .forEach(
+                        taskManagerInfo -> {
+                            if (taskManagerInfo.isIdle()
+                                    && currentTime - taskManagerInfo.getIdleSince()
+                                            >= taskManagerTimeout.toMilliseconds()) {
+                                taskManagersIdleTimeout.add(taskManagerInfo);
+                            } else {
+                                taskManagersNonTimeout.add(taskManagerInfo);
+                            }
+                        });
+
+        List<PendingTaskManager> pendingTaskManagersNonUse = new ArrayList<>();
+        List<PendingTaskManager> pendingTaskManagersInuse = new ArrayList<>();
+        taskManagerResourceInfoProvider
+                .getPendingTaskManagers()
+                .forEach(
+                        pendingTaskManager -> {
+                            if (taskManagerResourceInfoProvider
+                                    .getPendingAllocationsOfPendingTaskManager(
+                                            pendingTaskManager.getPendingTaskManagerId())
+                                    .isEmpty()) {
+                                pendingTaskManagersNonUse.add(pendingTaskManager);
+                            } else {
+                                pendingTaskManagersInuse.add(pendingTaskManager);
+                            }
+                        });
+
+        // summary total available resources of using (pending) task managers
+        ResourceProfile resourcesToKeep = ResourceProfile.ZERO;
+        ResourceProfile availableResourcesOfNonIdle =
+                getAvailableResourceOfTaskManagers(taskManagersNonTimeout);
+        resourcesToKeep = resourcesToKeep.merge(availableResourcesOfNonIdle);
+        ResourceProfile availableResourcesOfNonIdlePendingTaskManager =
+                getAvailableResourceOfPendingTaskManagers(
+                        pendingTaskManagersInuse, taskManagerResourceInfoProvider);
+        resourcesToKeep = resourcesToKeep.merge(availableResourcesOfNonIdlePendingTaskManager);
+

Review Comment:
   We might want to do some pruning here.
   - If the available resources from non-idle registered resources has already meet the redundant requirements, we won't need to calculated the available resources of pending resources.
   - If there's no idle registered / pending resources, we won't need to release anything.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -110,9 +122,84 @@ public ResourceAllocationResult tryFulfillRequirements(
                         jobId, unfulfilledJobRequirements, pendingResources, resultBuilder);
             }
         }
+
+        tryFulFillRedundantResources(
+                totalResourceProfile.multiply(redundantTaskManagerNum),
+                registeredResources,
+                pendingResources,
+                resultBuilder);
+
         return resultBuilder.build();
     }
 
+    @Override
+    public ResourceReleaseResult tryReleaseUselessResources(
+            TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
+        List<TaskManagerInfo> taskManagersIdleTimeout = new ArrayList<>();
+        List<TaskManagerInfo> taskManagersNonTimeout = new ArrayList<>();
+        long currentTime = System.currentTimeMillis();
+        taskManagerResourceInfoProvider
+                .getRegisteredTaskManagers()
+                .forEach(
+                        taskManagerInfo -> {
+                            if (taskManagerInfo.isIdle()
+                                    && currentTime - taskManagerInfo.getIdleSince()
+                                            >= taskManagerTimeout.toMilliseconds()) {
+                                taskManagersIdleTimeout.add(taskManagerInfo);
+                            } else {
+                                taskManagersNonTimeout.add(taskManagerInfo);
+                            }
+                        });
+
+        List<PendingTaskManager> pendingTaskManagersNonUse = new ArrayList<>();
+        List<PendingTaskManager> pendingTaskManagersInuse = new ArrayList<>();
+        taskManagerResourceInfoProvider
+                .getPendingTaskManagers()
+                .forEach(
+                        pendingTaskManager -> {
+                            if (taskManagerResourceInfoProvider
+                                    .getPendingAllocationsOfPendingTaskManager(

Review Comment:
   This is expensive. We are querying a map for each pending TM. I think we can modify `TaskManagerResourceInfoProvider` to return a map of pending TMs and their allocations (including empty ones) and iterate over the map entries.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -236,6 +323,52 @@ && canFulfillRequirement(effectiveProfile, remainResource)) {
         }
     }
 
+    private void tryFulFillRedundantResources(
+            ResourceProfile requiredRedundantResource,
+            List<InternalResourceInfo> availableRegisteredResources,
+            List<InternalResourceInfo> availablePendingResources,
+            ResourceAllocationResult.Builder resultBuilder) {
+        ResourceProfile totalAvailableResources =
+                Stream.concat(
+                                availableRegisteredResources.stream(),
+                                availablePendingResources.stream())
+                        .map(internalResourceInfo -> internalResourceInfo.availableProfile)
+                        .reduce(ResourceProfile.ZERO, ResourceProfile::merge);
+
+        while (!canFulfillRequirement(requiredRedundantResource, totalAvailableResources)) {
+            PendingTaskManager pendingTaskManager =
+                    new PendingTaskManager(totalResourceProfile, numSlotsPerWorker);
+            resultBuilder.addPendingTaskManagerAllocate(pendingTaskManager);
+            totalAvailableResources = totalAvailableResources.merge(totalResourceProfile);

Review Comment:
   I noticed that `tryFulfillRequirementsForJobWithPendingResources` will add new pending resources to `availableResources`, so that the passed-in `availableResources` always reflect the latest status and can be used across the for-loop in `tryFulfillRequirements`, while `tryFulFillRedundantResources` does not keep `availableResources` up-to-date. This is understandable because we no longer need `availableResources` after calling `tryFulFillRedundantResources` in `tryFulfillRequirements`. However, it might be nicer to add a comment here about this, to make it easier for future developers to understand.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-31443][runtime] Maintain redundant taskmanagers to speed up failover [flink]

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22305:
URL: https://github.com/apache/flink/pull/22305#discussion_r1514669497


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##########
@@ -623,6 +622,7 @@ private void checkResourceRequirements() {
             if (resourceAllocator.isSupported()
                     && !taskManagerTracker.getPendingTaskManagers().isEmpty()) {
                 taskManagerTracker.replaceAllPendingAllocations(Collections.emptyMap());
+                checkTaskManagerReleasable();

Review Comment:
   @huwh `checkTaskManagerReleasable()` returns a boolean value that is not verified in any way? Is this intentionally done? The same applies to line 680.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #22305: [FLINK-31443][runtime] Maintain redundant taskmanagers to speed up failover

Posted by "xintongsong (via GitHub)" <gi...@apache.org>.
xintongsong commented on code in PR #22305:
URL: https://github.com/apache/flink/pull/22305#discussion_r1163716537


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/RequiredRedundantResource.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.util.Preconditions;
+
+/** Immutable profile of the required redundant resources. */
+public final class RequiredRedundantResource {

Review Comment:
   It's probably not necessary to introduce a new class for redundant resources. We already have quite many classes that represent different combinations of resources (`ResourceProfile`, `ResourceSpec`, `WorkerResourceSpec`, `JobManager/TaskExecutorProcessSpec`, etc.) and it's already hard to understand the differences between them. Wouldn't it be good enough to pass `redundantTaskManagerNum` into `DefaultResourceAllocationStrategy` and let the strategy make the decision?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java:
##########
@@ -91,6 +92,12 @@ public SlotManagerConfiguration(
         this.maxTotalMem = Preconditions.checkNotNull(maxTotalMem);
         Preconditions.checkState(redundantTaskManagerNum >= 0);
         this.redundantTaskManagerNum = redundantTaskManagerNum;
+        this.requiredRedundantResource =
+                new RequiredRedundantResource(
+                        defaultWorkerResourceSpec.getCpuCores().multiply(redundantTaskManagerNum),
+                        defaultWorkerResourceSpec
+                                .getTotalMemSize()
+                                .multiply(redundantTaskManagerNum));

Review Comment:
   I'd suggest to keep all calculations and deriving in `fromConfiguration`. Maybe also fix this for `slotMatchingStrategy` in a hotfix commit.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java:
##########
@@ -46,4 +46,14 @@ ResourceAllocationResult tryFulfillRequirements(
             Map<JobID, Collection<ResourceRequirement>> missingResources,
             TaskManagerResourceInfoProvider taskManagerResourceInfoProvider,
             BlockedTaskManagerChecker blockedTaskManagerChecker);
+
+    /**
+     * Try to fulfill the missing redundant requirements.
+     *
+     * @param missingRedundantResource redundant resources that are not yet fulfilled
+     * @return the new pending task managers that need to allocated to fulfill redundant
+     *     requirements.
+     */
+    Collection<PendingTaskManager> tryFulfillRedundantRequirements(
+            RequiredRedundantResource missingRedundantResource);

Review Comment:
   I wonder if it make sense to let the strategy decide whether they should allocate for redundant workers. To be specific, we construct the `DefaultResourceAllocationStrategy.java` with an argument `redundantTaskManagerNum`. Then `ResourceAllocationResult` returned from the strategy should already include redundant workers.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/RequiredRedundantResource.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.util.Preconditions;
+
+/** Immutable profile of the required redundant resources. */
+public final class RequiredRedundantResource {
+    private final CPUResource cpuResource;
+    private final MemorySize memorySize;
+
+    public RequiredRedundantResource(CPUResource cpuResource, MemorySize memorySize) {
+        this.cpuResource = Preconditions.checkNotNull(cpuResource);
+        this.memorySize = Preconditions.checkNotNull(memorySize);

Review Comment:
   I don't think using total cpu/memory only or using a detailed resource profile will make a big difference in behavior. In that sense, maybe we should go with the resource profile approach, given that it simplifies the codes.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java:
##########
@@ -46,4 +46,14 @@ ResourceAllocationResult tryFulfillRequirements(
             Map<JobID, Collection<ResourceRequirement>> missingResources,
             TaskManagerResourceInfoProvider taskManagerResourceInfoProvider,
             BlockedTaskManagerChecker blockedTaskManagerChecker);
+
+    /**
+     * Try to fulfill the missing redundant requirements.
+     *
+     * @param missingRedundantResource redundant resources that are not yet fulfilled
+     * @return the new pending task managers that need to allocated to fulfill redundant
+     *     requirements.
+     */
+    Collection<PendingTaskManager> tryFulfillRedundantRequirements(
+            RequiredRedundantResource missingRedundantResource);

Review Comment:
   That leads to another question, how do we check for redundant workers when releasing idle workers. Maybe we should also let the strategy to decide whether an idle worker should be released.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #22305: [FLINK-31443][runtime] Maintain redundant taskmanagers to speed up failover

Posted by "flinkbot (via GitHub)" <gi...@apache.org>.
flinkbot commented on PR #22305:
URL: https://github.com/apache/flink/pull/22305#issuecomment-1489711115

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "f84a771b4186acb7a41a82e9554681fe802ed651",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f84a771b4186acb7a41a82e9554681fe802ed651",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f84a771b4186acb7a41a82e9554681fe802ed651 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] huwh commented on pull request #22305: [FLINK-31443][runtime] Maintain redundant taskmanagers to speed up failover

Posted by "huwh (via GitHub)" <gi...@apache.org>.
huwh commented on PR #22305:
URL: https://github.com/apache/flink/pull/22305#issuecomment-1513206663

   Thanks @xintongsong ,I have updated this pr.
   
   address comments on checkTaskManagerReleasable in : https://github.com/apache/flink/pull/22305/commits/f4efa560447abf9bed8c3c72a3345f5c2e260b04
   Maintain pendingSlotAllocationRecords in PendingTaskManager in : https://github.com/apache/flink/pull/22305/commits/7bb6e1bfddf6c16fc123b4182f3c240bac685fdf and https://github.com/apache/flink/pull/22305/commits/b0a51af576b7a65e35f647514a198b54ef6fbb25
   address comments on tryReleaseUnusedResources, reduce the heavy operation: https://github.com/apache/flink/pull/22305/commits/f6cea5ec89f7ceb86463b95a436180dbd70f4fde
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22305: [FLINK-31443][runtime] Maintain redundant taskmanagers to speed up failover

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22305:
URL: https://github.com/apache/flink/pull/22305#discussion_r1155753707


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##########
@@ -220,12 +220,14 @@ public void start(
 
         started = true;
 
-        taskManagerTimeoutsCheck =
-                scheduledExecutor.scheduleWithFixedDelay(
-                        () -> mainThreadExecutor.execute(this::checkTaskManagerTimeouts),
-                        0L,
-                        taskManagerTimeout.toMilliseconds(),
-                        TimeUnit.MILLISECONDS);
+        if (resourceAllocator.isSupported()) {

Review Comment:
   Why this optimization is not also port to `TaskExecutorManager`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/RequiredRedundantResource.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.util.Preconditions;
+
+/** Immutable profile of the required redundant resources. */
+public final class RequiredRedundantResource {
+    private final CPUResource cpuResource;
+    private final MemorySize memorySize;
+
+    public RequiredRedundantResource(CPUResource cpuResource, MemorySize memorySize) {
+        this.cpuResource = Preconditions.checkNotNull(cpuResource);
+        this.memorySize = Preconditions.checkNotNull(memorySize);

Review Comment:
   I'm not sure if considering only the total memory is enough. And I am a bit concerned that this may result in resources for certain tasks that need to be redeployed not being met, even if we ensure that the redundant total memory is sufficient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #22305: [FLINK-31443][runtime] Maintain redundant taskmanagers to speed up failover

Posted by "xintongsong (via GitHub)" <gi...@apache.org>.
xintongsong commented on code in PR #22305:
URL: https://github.com/apache/flink/pull/22305#discussion_r1181534754


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -110,9 +121,93 @@ public ResourceAllocationResult tryFulfillRequirements(
                         jobId, unfulfilledJobRequirements, pendingResources, resultBuilder);
             }
         }
+
+        // Unlike tryFulfillRequirementsForJobWithPendingResources, which updates pendingResources
+        // to the latest state after a new PendingTaskManager is created,
+        // tryFulFillRedundantResources will not update pendingResources even after new
+        // PendingTaskManagers are created.
+        // This is because the pendingResources are no longer needed afterwards.
+        tryFulFillRedundantResources(
+                totalResourceProfile.multiply(redundantTaskManagerNum),
+                registeredResources,
+                pendingResources,
+                resultBuilder);
+
         return resultBuilder.build();
     }
 
+    @Override
+    public ResourceReleaseResult tryReleaseUnusedResources(
+            TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
+        ResourceProfile requiredRedundantResources =
+                totalResourceProfile.multiply(redundantTaskManagerNum);
+        ResourceReleaseResult.Builder builder = ResourceReleaseResult.builder();
+
+        List<TaskManagerInfo> taskManagersIdleTimeout = new ArrayList<>();
+        List<TaskManagerInfo> taskManagersNonTimeout = new ArrayList<>();
+        long currentTime = System.currentTimeMillis();
+        taskManagerResourceInfoProvider
+                .getRegisteredTaskManagers()
+                .forEach(
+                        taskManagerInfo -> {
+                            if (taskManagerInfo.isIdle()
+                                    && currentTime - taskManagerInfo.getIdleSince()
+                                            >= taskManagerTimeout.toMilliseconds()) {
+                                taskManagersIdleTimeout.add(taskManagerInfo);
+                            } else {
+                                taskManagersNonTimeout.add(taskManagerInfo);
+                            }
+                        });
+
+        List<PendingTaskManager> pendingTaskManagersNonUse = new ArrayList<>();
+        List<PendingTaskManager> pendingTaskManagersInuse = new ArrayList<>();
+        taskManagerResourceInfoProvider
+                .getPendingTaskManagers()
+                .forEach(
+                        pendingTaskManager -> {
+                            if (pendingTaskManager.getPendingSlotAllocationRecords().isEmpty()) {
+                                pendingTaskManagersNonUse.add(pendingTaskManager);
+                            } else {
+                                pendingTaskManagersInuse.add(pendingTaskManager);
+                            }
+                        });
+
+        if (taskManagersIdleTimeout.isEmpty() && pendingTaskManagersNonUse.isEmpty()) {
+            // short-cut for nothing to release
+            return builder.build();
+        }
+
+        ResourceProfile resourcesToKeep = ResourceProfile.ZERO;
+
+        // check whether available resources of used (pending) task manager is enough.
+        ResourceProfile availableResourcesOfNonIdle =
+                getAvailableResourceOfTaskManagers(taskManagersNonTimeout);
+        resourcesToKeep = resourcesToKeep.merge(availableResourcesOfNonIdle);
+        if (!canFulfillRequirement(requiredRedundantResources, resourcesToKeep)) {
+            ResourceProfile availableResourcesOfNonIdlePendingTaskManager =
+                    getAvailableResourceOfPendingTaskManagers(pendingTaskManagersInuse);
+            resourcesToKeep = resourcesToKeep.merge(availableResourcesOfNonIdlePendingTaskManager);
+        }
+
+        // try reserve or release unused (pending) task managers
+        for (TaskManagerInfo taskManagerInfo : taskManagersIdleTimeout) {
+            if (canFulfillRequirement(requiredRedundantResources, resourcesToKeep)) {

Review Comment:
   Minor: we can use a local variable to skip unnecessary `canFulfillRequirement` calls. `if (redundantFulfilled || canFulfillRequirement(xxx, xxx))`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java:
##########
@@ -46,4 +46,17 @@ ResourceAllocationResult tryFulfillRequirements(
             Map<JobID, Collection<ResourceRequirement>> missingResources,
             TaskManagerResourceInfoProvider taskManagerResourceInfoProvider,
             BlockedTaskManagerChecker blockedTaskManagerChecker);
+
+    /**
+     * Try to make a release decision to release useless PendingTaskManagers and TaskManagers. This

Review Comment:
   ```suggestion
        * Try to make a release decision to release unused PendingTaskManagers and TaskManagers. This
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java:
##########
@@ -135,19 +135,25 @@ class TaskExecutorManager implements AutoCloseable {
         this.unWantedWorkers = new HashSet<>();
         this.resourceAllocator = Preconditions.checkNotNull(resourceAllocator);
         this.mainThreadExecutor = mainThreadExecutor;
-        taskManagerTimeoutsAndRedundancyCheck =
-                scheduledExecutor.scheduleWithFixedDelay(
-                        () ->
-                                mainThreadExecutor.execute(
-                                        this::checkTaskManagerTimeoutsAndRedundancy),
-                        0L,
-                        taskManagerTimeout.toMilliseconds(),
-                        TimeUnit.MILLISECONDS);
+        if (resourceAllocator.isSupported()) {
+            taskManagerTimeoutsAndRedundancyCheck =
+                    scheduledExecutor.scheduleWithFixedDelay(
+                            () ->
+                                    mainThreadExecutor.execute(
+                                            this::checkTaskManagerTimeoutsAndRedundancy),
+                            0L,
+                            taskManagerTimeout.toMilliseconds(),
+                            TimeUnit.MILLISECONDS);
+        } else {
+            taskManagerTimeoutsAndRedundancyCheck = null;

Review Comment:
   Let's mark `taskManagerTimeoutsAndRedundancyCheck` as `@Nullable`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java:
##########
@@ -46,4 +46,17 @@ ResourceAllocationResult tryFulfillRequirements(
             Map<JobID, Collection<ResourceRequirement>> missingResources,
             TaskManagerResourceInfoProvider taskManagerResourceInfoProvider,
             BlockedTaskManagerChecker blockedTaskManagerChecker);
+
+    /**
+     * Try to make a release decision to release useless PendingTaskManagers and TaskManagers. This

Review Comment:
   And there seems to be a few more occurrences of `useless`. Please replace all of them with `unused` or `unwanted`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org