You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "xintongsong (via GitHub)" <gi...@apache.org> on 2023/04/12 07:31:52 UTC

[GitHub] [flink] xintongsong commented on a diff in pull request #22305: [FLINK-31443][runtime] Maintain redundant taskmanagers to speed up failover

xintongsong commented on code in PR #22305:
URL: https://github.com/apache/flink/pull/22305#discussion_r1163716537


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/RequiredRedundantResource.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.util.Preconditions;
+
+/** Immutable profile of the required redundant resources. */
+public final class RequiredRedundantResource {

Review Comment:
   It's probably not necessary to introduce a new class for redundant resources. We already have quite many classes that represent different combinations of resources (`ResourceProfile`, `ResourceSpec`, `WorkerResourceSpec`, `JobManager/TaskExecutorProcessSpec`, etc.) and it's already hard to understand the differences between them. Wouldn't it be good enough to pass `redundantTaskManagerNum` into `DefaultResourceAllocationStrategy` and let the strategy make the decision?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java:
##########
@@ -91,6 +92,12 @@ public SlotManagerConfiguration(
         this.maxTotalMem = Preconditions.checkNotNull(maxTotalMem);
         Preconditions.checkState(redundantTaskManagerNum >= 0);
         this.redundantTaskManagerNum = redundantTaskManagerNum;
+        this.requiredRedundantResource =
+                new RequiredRedundantResource(
+                        defaultWorkerResourceSpec.getCpuCores().multiply(redundantTaskManagerNum),
+                        defaultWorkerResourceSpec
+                                .getTotalMemSize()
+                                .multiply(redundantTaskManagerNum));

Review Comment:
   I'd suggest to keep all calculations and deriving in `fromConfiguration`. Maybe also fix this for `slotMatchingStrategy` in a hotfix commit.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java:
##########
@@ -46,4 +46,14 @@ ResourceAllocationResult tryFulfillRequirements(
             Map<JobID, Collection<ResourceRequirement>> missingResources,
             TaskManagerResourceInfoProvider taskManagerResourceInfoProvider,
             BlockedTaskManagerChecker blockedTaskManagerChecker);
+
+    /**
+     * Try to fulfill the missing redundant requirements.
+     *
+     * @param missingRedundantResource redundant resources that are not yet fulfilled
+     * @return the new pending task managers that need to allocated to fulfill redundant
+     *     requirements.
+     */
+    Collection<PendingTaskManager> tryFulfillRedundantRequirements(
+            RequiredRedundantResource missingRedundantResource);

Review Comment:
   I wonder if it make sense to let the strategy decide whether they should allocate for redundant workers. To be specific, we construct the `DefaultResourceAllocationStrategy.java` with an argument `redundantTaskManagerNum`. Then `ResourceAllocationResult` returned from the strategy should already include redundant workers.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/RequiredRedundantResource.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.util.Preconditions;
+
+/** Immutable profile of the required redundant resources. */
+public final class RequiredRedundantResource {
+    private final CPUResource cpuResource;
+    private final MemorySize memorySize;
+
+    public RequiredRedundantResource(CPUResource cpuResource, MemorySize memorySize) {
+        this.cpuResource = Preconditions.checkNotNull(cpuResource);
+        this.memorySize = Preconditions.checkNotNull(memorySize);

Review Comment:
   I don't think using total cpu/memory only or using a detailed resource profile will make a big difference in behavior. In that sense, maybe we should go with the resource profile approach, given that it simplifies the codes.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java:
##########
@@ -46,4 +46,14 @@ ResourceAllocationResult tryFulfillRequirements(
             Map<JobID, Collection<ResourceRequirement>> missingResources,
             TaskManagerResourceInfoProvider taskManagerResourceInfoProvider,
             BlockedTaskManagerChecker blockedTaskManagerChecker);
+
+    /**
+     * Try to fulfill the missing redundant requirements.
+     *
+     * @param missingRedundantResource redundant resources that are not yet fulfilled
+     * @return the new pending task managers that need to allocated to fulfill redundant
+     *     requirements.
+     */
+    Collection<PendingTaskManager> tryFulfillRedundantRequirements(
+            RequiredRedundantResource missingRedundantResource);

Review Comment:
   That leads to another question, how do we check for redundant workers when releasing idle workers. Maybe we should also let the strategy to decide whether an idle worker should be released.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org