You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/20 07:15:02 UTC

[GitHub] [flink] zhuzhurk commented on a diff in pull request #20218: [FLINK-28145][runtime] Let ResourceManager supports blocklist

zhuzhurk commented on code in PR #20218:
URL: https://github.com/apache/flink/pull/20218#discussion_r924482382


##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java:
##########
@@ -66,25 +62,25 @@ public void testFulfillRequirementWithRegisteredResources() {
                 STRATEGY.tryFulfillRequirements(
                         Collections.singletonMap(jobId, requirements),
                         taskManagerResourceInfoProvider);
-        assertThat(result.getUnfulfillableJobs(), is(empty()));
-        assertThat(result.getAllocationsOnPendingResources().keySet(), is(empty()));
-        assertThat(result.getPendingTaskManagersToAllocate(), is(empty()));
+        assertThat(result.getUnfulfillableJobs()).isEmpty();
+        assertThat(result.getAllocationsOnPendingResources().keySet()).isEmpty();

Review Comment:
   nit: -> assertThat(result.getAllocationsOnPendingResources()).isEmpty();



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java:
##########
@@ -102,9 +98,9 @@ public void testFulfillRequirementWithPendingResources() {
                 STRATEGY.tryFulfillRequirements(
                         Collections.singletonMap(jobId, requirements),
                         taskManagerResourceInfoProvider);
-        assertThat(result.getUnfulfillableJobs(), is(empty()));
-        assertThat(result.getAllocationsOnRegisteredResources().keySet(), is(empty()));
-        assertThat(result.getPendingTaskManagersToAllocate().size(), is(1));
+        assertThat(result.getUnfulfillableJobs()).isEmpty();
+        assertThat(result.getAllocationsOnRegisteredResources().keySet()).isEmpty();

Review Comment:
   nit: -> assertThat(result.getAllocationsOnPendingResources()).isEmpty();



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java:
##########
@@ -204,6 +204,49 @@ private void testRequirementDeclaration(RequirementDeclarationScenario scenario)
         };
     }
 
+    /**
+     * Tests that blocked slots cannot be used to fulfill requirements, will trigger the new
+     * resource allocation.
+     */
+    @Test
+    void testRequirementDeclarationWithBlockedSlotsTriggersWorkerAllocation() throws Exception {
+

Review Comment:
   nit: better to remove this empty new line



##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParameters.java:
##########
@@ -178,4 +183,12 @@ public String getJvmMemOptsEnv() {
     public ContaineredTaskManagerParameters getContaineredTaskManagerParameters() {
         return containeredTaskManagerParameters;
     }
+
+    public Set<String> getBlockedNodes() {
+        return blockedNodes;
+    }
+
+    public String getNodeNameLabel() {
+        return flinkConfig.get(KubernetesConfigOptions.KUBERNETES_NODE_NAME_LABEL);

Review Comment:
   checkNotNull



##########
flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlockedNodeRetriever.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import java.util.Set;
+
+/** This class help to retrieve the blocked nodes. */

Review Comment:
   help -> helps



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java:
##########
@@ -37,10 +38,32 @@ public interface ResourceAllocationStrategy {
      * @param missingResources resource requirements that are not yet fulfilled, indexed by jobId
      * @param taskManagerResourceInfoProvider provide the registered/pending resources of the
      *     current cluster
+     * @param blockedTaskManagerChecker blocked task manager checker
      * @return a {@link ResourceAllocationResult} based on the current status, which contains
      *     whether the requirements can be fulfilled and the actions to take
      */
     ResourceAllocationResult tryFulfillRequirements(
             Map<JobID, Collection<ResourceRequirement>> missingResources,
-            TaskManagerResourceInfoProvider taskManagerResourceInfoProvider);
+            TaskManagerResourceInfoProvider taskManagerResourceInfoProvider,
+            BlockedTaskManagerChecker blockedTaskManagerChecker);
+
+    /**
+     * Try to make an allocation decision to fulfill the resource requirements. The strategy
+     * generates a series of actions to take, based on the current status.
+     *
+     * <p>Notice: For performance considerations, modifications might be performed directly on the
+     * input arguments. If the arguments are reused elsewhere, please make a deep copy in advance.
+     *
+     * @param missingResources resource requirements that are not yet fulfilled, indexed by jobId
+     * @param taskManagerResourceInfoProvider provide the registered/pending resources of the
+     *     current cluster
+     * @return a {@link ResourceAllocationResult} based on the current status, which contains
+     *     whether the requirements can be fulfilled and the actions to take
+     */
+    default ResourceAllocationResult tryFulfillRequirements(

Review Comment:
   I prefer to not add this default implementation to add complication, because it is only invoked in tests in very limited times.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -92,10 +95,15 @@ public ResourceAllocationResult tryFulfillRequirements(
         return resultBuilder.build();
     }
 
-    private static List<InternalResourceInfo> getRegisteredResources(
+    private static List<InternalResourceInfo> getAvailableRegisteredResources(

Review Comment:
   nit: maybe `getAvailableResources` to be short? Because `available` already means the resource is `registered`.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java:
##########
@@ -207,6 +208,49 @@ void testRequirementDeclarationWithoutFreeSlotsTriggersWorkerAllocation() throws
         }
     }
 
+    /**
+     * Tests that blocked slots cannot be used to fulfill requirements, will trigger the new
+     * resource allocation.
+     */
+    @Test
+    void testRequirementDeclarationWithBlockedSlotsTriggersWorkerAllocation() throws Exception {
+

Review Comment:
   nit: better to remove this empty new line



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java:
##########
@@ -79,11 +80,27 @@ public interface SlotManager extends AutoCloseable {
      * @param newResourceManagerId to use for communication with the task managers
      * @param newMainThreadExecutor to use to run code in the ResourceManager's main thread
      * @param newResourceActions to use for resource (de-)allocations
+     * @param newBlockedTaskManagerChecker to query whether a task manager is blocked
      */
     void start(
             ResourceManagerId newResourceManagerId,
             Executor newMainThreadExecutor,
-            ResourceActions newResourceActions);
+            ResourceActions newResourceActions,
+            BlockedTaskManagerChecker newBlockedTaskManagerChecker);
+
+    /**
+     * Starts the slot manager with the given leader id and resource manager actions.
+     *
+     * @param newResourceManagerId to use for communication with the task managers
+     * @param newMainThreadExecutor to use to run code in the ResourceManager's main thread
+     * @param newResourceActions to use for resource (de-)allocations
+     */
+    default void start(

Review Comment:
   I prefer to not add this default implementation to add complication, because it is only invoked in tests and in very limited times..



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesTaskManagerTestBase.java:
##########
@@ -48,6 +52,9 @@ public class KubernetesTaskManagerTestBase extends KubernetesPodTestBase {
 
     protected KubernetesTaskManagerParameters kubernetesTaskManagerParameters;
 
+    protected static final Set<String> BLOCKED_NODES =
+            new HashSet<>(Arrays.asList("node1", "node2"));

Review Comment:
   maybe -> "blockedNode1", "blockedNode2" so that it would be easier to understand the tests.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java:
##########
@@ -150,4 +150,31 @@ void testUnfulfillableRequirement() {
         assertThat(result.getUnfulfillableJobs()).containsExactly(jobId);
         assertThat(result.getPendingTaskManagersToAllocate()).isEmpty();
     }
+
+    /** Tests that blocked task manager cannot fulfill requirements. */
+    @Test
+    void testBlockedTaskManagerCannotFulfillRequirements() {
+        final TaskManagerInfo taskManager =
+                new TestingTaskManagerInfo(
+                        DEFAULT_SLOT_RESOURCE.multiply(10),
+                        DEFAULT_SLOT_RESOURCE.multiply(10),
+                        DEFAULT_SLOT_RESOURCE);
+        final JobID jobId = new JobID();
+        final List<ResourceRequirement> requirements = new ArrayList<>();
+        final TaskManagerResourceInfoProvider taskManagerResourceInfoProvider =
+                TestingTaskManagerResourceInfoProvider.newBuilder()
+                        .setRegisteredTaskManagersSupplier(() -> Collections.singleton(taskManager))
+                        .build();
+        requirements.add(ResourceRequirement.create(ResourceProfile.UNKNOWN, 10));
+
+        final ResourceAllocationResult result =
+                STRATEGY.tryFulfillRequirements(
+                        Collections.singletonMap(jobId, requirements),
+                        taskManagerResourceInfoProvider,
+                        taskManager.getTaskExecutorConnection().getResourceID()::equals);
+
+        assertThat(result.getUnfulfillableJobs()).isEmpty();
+        assertThat(result.getAllocationsOnRegisteredResources().keySet()).isEmpty();
+        assertThat(result.getPendingTaskManagersToAllocate()).hasSize(2);

Review Comment:
   Why does it need to allocate 2 taskmanagers?



##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParameters.java:
##########
@@ -178,4 +183,12 @@ public String getJvmMemOptsEnv() {
     public ContaineredTaskManagerParameters getContaineredTaskManagerParameters() {
         return containeredTaskManagerParameters;
     }
+
+    public Set<String> getBlockedNodes() {
+        return blockedNodes;

Review Comment:
   better to make it unmodifiable.



##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java:
##########
@@ -115,6 +136,20 @@ public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
                 .build();
     }
 
+    private NodeAffinity generateNodeAffinity(String labelKey, Collection<String> blockedNodes) {

Review Comment:
   Collection -> Set 
   
   It can clearly show that there is no duplication.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org