You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/12/14 09:07:02 UTC

[GitHub] [flink] xintongsong commented on a diff in pull request #21496: [FLINK-29869][ResourceManager] make ResourceAllocator declarative.

xintongsong commented on code in PR #21496:
URL: https://github.com/apache/flink/pull/21496#discussion_r1048173833


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerTracker.java:
##########
@@ -68,6 +68,31 @@ void addTaskManager(
      */
     Map<JobID, ResourceCounter> removePendingTaskManager(PendingTaskManagerId pendingTaskManagerId);
 
+    /**
+     * Add an unwanted task manager.
+     *
+     * @param instanceId identifier of task manager.
+     * @param cause the reason of mark this task manager as unwanted.
+     */
+    void addUnWantedTaskManager(InstanceID instanceId, Exception cause);
+
+    /**
+     * Add an unwanted task manager with resource profile.
+     *
+     * @param instanceId identifier of task manager.
+     * @param cause the reason of mark this task manager as unwanted.
+     * @param totalResourceProfile the total resource profile of task manager.
+     * @param defaultSlotResourceProfile the default resource profile of task manager.
+     */
+    void addUnWantedTaskManagerWithResource(
+            InstanceID instanceId,
+            Exception cause,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultSlotResourceProfile);

Review Comment:
   Why do we need 2 methods for this interface?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -206,15 +220,66 @@ protected Optional<WorkerType> getWorkerNodeIfAcceptRegistration(ResourceID reso
         return Optional.ofNullable(workerNodeMap.get(resourceID));
     }
 
+    @VisibleForTesting
+    public void declareResourceNeeded(Collection<ResourceDeclaration> resourceDeclarations) {
+        for (ResourceDeclaration resourceDeclaration : resourceDeclarations) {
+            WorkerResourceSpec workerResourceSpec = resourceDeclaration.getSpec();
+            int declaredWorkerNumber = resourceDeclaration.getNumNeeded();
+
+            final int releaseOrRequestWorkerNumber =
+                    allocatedWorkerCounter.getNum(workerResourceSpec)
+                            + pendingRequestCounter.getNum(workerResourceSpec)
+                            - declaredWorkerNumber;
+
+            if (releaseOrRequestWorkerNumber > 0) {
+                log.debug(
+                        "need release {} workers, current pending requested worker number {}, allocated worker number {}, declared worker number {}",
+                        releaseOrRequestWorkerNumber,
+                        pendingRequestCounter.getNum(workerResourceSpec),
+                        allocatedWorkerCounter.getNum(workerResourceSpec),
+                        declaredWorkerNumber);
+
+                // release unwanted workers.
+                int remainingReleasingWorkerNumber =
+                        releaseUnWantedResources(
+                                resourceDeclaration.getUnwanted(), releaseOrRequestWorkerNumber);
+
+                // TODO, release pending/starting/running workers to exceed declared worker number.
+                if (remainingReleasingWorkerNumber > 0) {
+                    log.debug(
+                            "need release {} workers after release unwanted workers.",
+                            remainingReleasingWorkerNumber);
+                }
+            } else if (releaseOrRequestWorkerNumber < 0) {
+                int requestWorkerNumber = -releaseOrRequestWorkerNumber;
+                log.debug(
+                        "need request {} new workers, current pending requested worker number {}, allocated worker number {}, declared worker number {}",
+                        requestWorkerNumber,
+                        pendingRequestCounter.getNum(workerResourceSpec),
+                        allocatedWorkerCounter.getNum(workerResourceSpec),
+                        declaredWorkerNumber);
+                for (int i = 0; i < requestWorkerNumber; i++) {
+                    requestNewWorker(workerResourceSpec);
+                }
+            } else {
+                log.debug(
+                        "current pending worker {} and allocated worker {} meets the declared worker {}",
+                        pendingRequestCounter.getNum(workerResourceSpec),
+                        allocatedWorkerCounter.getNum(workerResourceSpec),
+                        declaredWorkerNumber);
+            }
+        }
+    }
+
     @Override
     protected void onWorkerRegistered(WorkerType worker) {
         final ResourceID resourceId = worker.getResourceID();
         log.info("Worker {} is registered.", resourceId.getStringWithMetadata());
 
-        final WorkerResourceSpec workerResourceSpec =
-                currentAttemptUnregisteredWorkers.remove(resourceId);
         tryRemovePreviousPendingRecoveryTaskManager(resourceId);
-        if (workerResourceSpec != null) {
+        if (currentAttemptUnregisteredWorkers.remove(resourceId)) {
+            final WorkerResourceSpec workerResourceSpec =
+                    checkNotNull(currentAttemptWorkerSpec.get(resourceId));

Review Comment:
   How do we deal with the recovered workers? For a recovered worker, its `WorkerResourceSpec` is unknown until registration.
   
   I think the current implementation will lead to inconsistency between slot manager and active resource manager, slot manager knows the resource of the recovered and registered workers while active resource manager doesn't.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceDeclaration.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+
+import java.util.Collection;
+
+/** ResourceDeclaration for {@link ResourceAllocator}. */
+public class ResourceDeclaration {
+    private final WorkerResourceSpec spec;
+    private final int numNeeded;
+    private final Collection<UnwantedWorker> unwanted;

Review Comment:
   We should explain the definition of `unwanted` in JavaDocs.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/UnwantedWorkerWithResourceProfile.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+
+/** Unwanted workers with specific resource profile. */
+public class UnwantedWorkerWithResourceProfile {
+    private final UnwantedWorker unwantedWorker;
+    private final ResourceProfile totalResourceProfile;
+    private final ResourceProfile defaultSlotResourceProfile;
+    private final int numSlots;

Review Comment:
   Moreover, we may consider make this an inner class of `TaskManagerTracker`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/UnwantedWorkerWithResourceProfile.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+
+/** Unwanted workers with specific resource profile. */
+public class UnwantedWorkerWithResourceProfile {
+    private final UnwantedWorker unwantedWorker;
+    private final ResourceProfile totalResourceProfile;
+    private final ResourceProfile defaultSlotResourceProfile;
+    private final int numSlots;

Review Comment:
   It seems these are only used for generating `WorkerResourceSpec`. Then why not just store the `WorkerResourceSpec`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -89,8 +91,17 @@
     /** Number of requested and not registered workers per worker resource spec. */
     private final WorkerCounter pendingWorkerCounter;
 
-    /** Identifiers and worker resource spec of requested not registered workers. */
-    private final Map<ResourceID, WorkerResourceSpec> currentAttemptUnregisteredWorkers;
+    /** Number of requested but not allocated workers per worker resource spec. */
+    private final WorkerCounter pendingRequestCounter;

Review Comment:
   Could you explain how is `pendingRequestCounter` different from `pendingWorkerCounter`, and why do we need it?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/UnwantedWorker.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.instance.InstanceID;
+
+import java.util.Objects;
+
+/** Unwanted workers of {@link SlotManager}. */
+public class UnwantedWorker {
+    private final InstanceID id;
+    private final Exception exception;

Review Comment:
   I'm not sure about adding an exception for each unwanted worker in the declaration. It seems assuming the unwanted workers will be closed with the given exception, which is against the idea that the unwanted workers should only be a hint.
   
   Currently, there are only two possible reasons for the exception here: reaching max resource limitation, and TM idle timeout. I think for these two, we can simply close the TM connection with a reason something like "slot manager has determined that the resource is no longer needed", and log the reason why such determination is made on the slot manager side.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##########
@@ -384,6 +396,82 @@ && isMaxTotalResourceExceededAfterAdding(totalResourceProfile)) {
         }
     }
 
+    private void declareNeededResourcesWithDelay() {
+        if (declareNeededResourceDelay.toMillis() <= 0) {

Review Comment:
   Might be better to check that allocator is supported. IIUC, that's expected whenever this method is called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org