You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gu...@apache.org on 2021/12/21 02:02:58 UTC

[flink] branch master updated: [FLINK-25268][yarn] Support task manager node-label in Yarn deployment

This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 71ed78d  [FLINK-25268][yarn] Support task manager node-label in Yarn deployment
71ed78d is described below

commit 71ed78d8d72a76818c8c621eaf728aafda8f28f8
Author: Junfan Zhang <ju...@outlook.com>
AuthorDate: Mon Dec 13 13:53:10 2021 +0800

    [FLINK-25268][yarn] Support task manager node-label in Yarn deployment
    
    This closes #18087.
---
 .../generated/yarn_config_configuration.html       |   6 +
 .../flink/yarn/ContainerRequestReflector.java      | 110 +++++++++++++++
 .../flink/yarn/YarnResourceManagerDriver.java      |  17 ++-
 .../yarn/configuration/YarnConfigOptions.java      |   8 ++
 .../flink/yarn/ContainerRequestReflectorTest.java  | 155 +++++++++++++++++++++
 .../flink/yarn/YarnResourceManagerDriverTest.java  |   5 +-
 6 files changed, 290 insertions(+), 11 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/yarn_config_configuration.html b/docs/layouts/shortcodes/generated/yarn_config_configuration.html
index 1d2f0e6..929089d 100644
--- a/docs/layouts/shortcodes/generated/yarn_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/yarn_config_configuration.html
@@ -176,5 +176,11 @@
             <td>String</td>
             <td>A comma-separated list of tags to apply to the Flink YARN application.</td>
         </tr>
+        <tr>
+            <td><h5>yarn.taskmanager.node-label</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Specify YARN node label for the Flink TaskManagers, it will override the yarn.application.node-label for TaskManagers if both are set.</td>
+        </tr>
     </tbody>
 </table>
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/ContainerRequestReflector.java b/flink-yarn/src/main/java/org/apache/flink/yarn/ContainerRequestReflector.java
new file mode 100644
index 0000000..faa7f59
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/ContainerRequestReflector.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * Use reflection to determine whether the Hadoop supports node-label, depending on the Hadoop
+ * version, may or may not be supported. If not, nothing happened.
+ *
+ * <p>The node label mechanism is supported by Hadoop version greater than 2.6.0
+ */
+class ContainerRequestReflector {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ContainerRequestReflector.class);
+
+    static final ContainerRequestReflector INSTANCE = new ContainerRequestReflector();
+
+    @Nullable private Constructor<? extends AMRMClient.ContainerRequest> defaultConstructor;
+
+    private ContainerRequestReflector() {
+        this(AMRMClient.ContainerRequest.class);
+    }
+
+    @VisibleForTesting
+    ContainerRequestReflector(Class<? extends AMRMClient.ContainerRequest> containerRequestClass) {
+        Class<? extends AMRMClient.ContainerRequest> requestCls = containerRequestClass;
+        try {
+            /**
+             * To support node-label, using the below constructor. Please refer to
+             * https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java#L287
+             *
+             * <p>Instantiates a {@link ContainerRequest} with the given constraints.
+             *
+             * @param capability The {@link Resource} to be requested for each container.
+             * @param nodes Any hosts to request that the containers are placed on.
+             * @param racks Any racks to request that the containers are placed on. The racks
+             *     corresponding to any hosts requested will be automatically added to this list.
+             * @param priority The priority at which to request the containers. Higher priorities
+             *     have lower numerical values.
+             * @param allocationRequestId The allocationRequestId of the request. To be used as a
+             *     tracking id to match Containers allocated against this request. Will default to 0
+             *     if not specified.
+             * @param relaxLocality If true, containers for this request may be assigned on hosts
+             *     and racks other than the ones explicitly requested.
+             * @param nodeLabelsExpression Set node labels to allocate resource, now we only support
+             *     asking for only a single node label
+             */
+            defaultConstructor =
+                    requestCls.getDeclaredConstructor(
+                            Resource.class,
+                            String[].class,
+                            String[].class,
+                            Priority.class,
+                            boolean.class,
+                            String.class);
+        } catch (NoSuchMethodException exception) {
+            LOG.debug(
+                    "The node-label mechanism of Yarn don't be supported in this Hadoop version.");
+        }
+    }
+
+    public AMRMClient.ContainerRequest getContainerRequest(
+            Resource containerResource, Priority priority, String nodeLabel) {
+        if (StringUtils.isNullOrWhitespaceOnly(nodeLabel) || defaultConstructor == null) {
+            return new AMRMClient.ContainerRequest(containerResource, null, null, priority);
+        }
+
+        try {
+            /**
+             * Set the param of relaxLocality to true, which tells the Yarn ResourceManager if the
+             * application wants locality to be loose (i.e. allows fall-through to rack or any)
+             */
+            return defaultConstructor.newInstance(
+                    containerResource, null, null, priority, true, nodeLabel);
+        } catch (InvocationTargetException | InstantiationException | IllegalAccessException e) {
+            LOG.warn("Errors on creating Container Request.", e);
+        }
+
+        return new AMRMClient.ContainerRequest(containerResource, null, null, priority);
+    }
+}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
index 6dd0d65..ddbd7ad 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
@@ -56,7 +56,6 @@ import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -117,6 +116,8 @@ public class YarnResourceManagerDriver extends AbstractResourceManagerDriver<Yar
     private TaskExecutorProcessSpecContainerResourcePriorityAdapter
             taskExecutorProcessSpecContainerResourcePriorityAdapter;
 
+    private String taskManagerNodeLabel;
+
     public YarnResourceManagerDriver(
             Configuration flinkConfig,
             YarnResourceManagerDriverConfiguration configuration,
@@ -148,6 +149,9 @@ public class YarnResourceManagerDriver extends AbstractResourceManagerDriver<Yar
                 flinkConfig.getInteger(
                         YarnConfigOptions.CONTAINER_REQUEST_HEARTBEAT_INTERVAL_MILLISECONDS);
 
+        this.taskManagerNodeLabel =
+                flinkConfig.getString(YarnConfigOptions.TASK_MANAGER_NODE_LABEL);
+
         this.registerApplicationMasterResponseReflector =
                 new RegisterApplicationMasterResponseReflector(log);
 
@@ -259,7 +263,9 @@ public class YarnResourceManagerDriver extends AbstractResourceManagerDriver<Yar
         } else {
             final Priority priority = priorityAndResourceOpt.get().getPriority();
             final Resource resource = priorityAndResourceOpt.get().getResource();
-            resourceManagerClient.addContainerRequest(getContainerRequest(resource, priority));
+            resourceManagerClient.addContainerRequest(
+                    ContainerRequestReflector.INSTANCE.getContainerRequest(
+                            resource, priority, taskManagerNodeLabel));
 
             // make sure we transmit the request fast and receive fast news of granted allocations
             resourceManagerClient.setHeartbeatInterval(containerRequestHeartbeatIntervalMillis);
@@ -547,13 +553,6 @@ public class YarnResourceManagerDriver extends AbstractResourceManagerDriver<Yar
         }
     }
 
-    @Nonnull
-    @VisibleForTesting
-    static AMRMClient.ContainerRequest getContainerRequest(
-            Resource containerResource, Priority priority) {
-        return new AMRMClient.ContainerRequest(containerResource, null, null, priority);
-    }
-
     @VisibleForTesting
     private static ResourceID getContainerResourceId(Container container) {
         return new ResourceID(container.getId().toString(), container.getNodeId().toString());
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index 8533e63..c98e106 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -296,6 +296,14 @@ public class YarnConfigOptions {
                     .noDefaultValue()
                     .withDescription("Specify YARN node label for the YARN application.");
 
+    public static final ConfigOption<String> TASK_MANAGER_NODE_LABEL =
+            key("yarn.taskmanager.node-label")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Specify YARN node label for the Flink TaskManagers, it will "
+                                    + "override the yarn.application.node-label for TaskManagers if both are set.");
+
     public static final ConfigOption<Boolean> SHIP_LOCAL_KEYTAB =
             key("yarn.security.kerberos.ship-local-keytab")
                     .booleanType()
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/ContainerRequestReflectorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/ContainerRequestReflectorTest.java
new file mode 100644
index 0000000..7fdb45b
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/ContainerRequestReflectorTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.junit.Test;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/** Tests for {@link ContainerRequestReflector}. */
+public class ContainerRequestReflectorTest extends TestLogger {
+
+    @Test
+    public void testGetContainerRequestIfConstructorPresent() {
+        final ContainerRequestReflector containerRequestReflector =
+                new ContainerRequestReflector(ContainerRequestWithConstructor.class);
+        Resource resource = Resource.newInstance(100, 1);
+        Priority priority = Priority.newInstance(1);
+
+        AMRMClient.ContainerRequest containerRequest =
+                containerRequestReflector.getContainerRequest(resource, priority, "GPU");
+        assertTrue(containerRequest instanceof ContainerRequestWithConstructor);
+        ContainerRequestWithConstructor containerRequestWithConstructor =
+                (ContainerRequestWithConstructor) containerRequest;
+        assertEquals("GPU", containerRequestWithConstructor.getNodeLabelsExpression());
+
+        containerRequest = containerRequestReflector.getContainerRequest(resource, priority, null);
+        assertFalse(containerRequest instanceof ContainerRequestWithConstructor);
+
+        containerRequest = containerRequestReflector.getContainerRequest(resource, priority, "");
+        assertFalse(containerRequest instanceof ContainerRequestWithConstructor);
+    }
+
+    @Test
+    public void testGetContainerRequestIfConstructorAbsent() {
+        final ContainerRequestReflector containerRequestReflector =
+                new ContainerRequestReflector(ContainerRequestWithoutConstructor.class);
+        Resource resource = Resource.newInstance(100, 1);
+        Priority priority = Priority.newInstance(1);
+
+        AMRMClient.ContainerRequest containerRequest =
+                containerRequestReflector.getContainerRequest(resource, priority, "GPU");
+        assertFalse(containerRequest instanceof ContainerRequestWithoutConstructor);
+
+        containerRequest = containerRequestReflector.getContainerRequest(resource, priority, null);
+        assertFalse(containerRequest instanceof ContainerRequestWithoutConstructor);
+
+        containerRequest = containerRequestReflector.getContainerRequest(resource, priority, "");
+        assertFalse(containerRequest instanceof ContainerRequestWithoutConstructor);
+    }
+
+    @Test
+    public void testGetContainerRequestWithoutYarnSupport() {
+        assumeTrue(HadoopUtils.isMaxHadoopVersion(2, 6));
+
+        Resource resource = Resource.newInstance(100, 1);
+        Priority priority = Priority.newInstance(1);
+
+        ContainerRequestReflector.INSTANCE.getContainerRequest(resource, priority, "GPU");
+        ContainerRequestReflector.INSTANCE.getContainerRequest(resource, priority, null);
+        ContainerRequestReflector.INSTANCE.getContainerRequest(resource, priority, "");
+    }
+
+    @Test
+    public void testGetContainerRequestWithYarnSupport()
+            throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+        assumeTrue(HadoopUtils.isMinHadoopVersion(2, 6));
+
+        Resource resource = Resource.newInstance(100, 1);
+        Priority priority = Priority.newInstance(1);
+
+        AMRMClient.ContainerRequest containerRequest =
+                ContainerRequestReflector.INSTANCE.getContainerRequest(resource, priority, "GPU");
+        assertEquals("GPU", getNodeLabelExpressionWithReflector(containerRequest));
+
+        containerRequest =
+                ContainerRequestReflector.INSTANCE.getContainerRequest(resource, priority, null);
+        assertNull(getNodeLabelExpressionWithReflector(containerRequest));
+
+        containerRequest =
+                ContainerRequestReflector.INSTANCE.getContainerRequest(resource, priority, "");
+        assertNull(getNodeLabelExpressionWithReflector(containerRequest));
+    }
+
+    private String getNodeLabelExpressionWithReflector(AMRMClient.ContainerRequest containerRequest)
+            throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+        Method method = containerRequest.getClass().getMethod("getNodeLabelExpression");
+        return (String) method.invoke(containerRequest);
+    }
+
+    /** Class which does not have required constructor. */
+    private static class ContainerRequestWithoutConstructor extends AMRMClient.ContainerRequest {
+
+        public ContainerRequestWithoutConstructor(
+                Resource capability, String[] nodes, String[] racks, Priority priority) {
+            super(capability, nodes, racks, priority);
+        }
+    }
+
+    /**
+     * Class which has constructor with the same signature as {@link
+     * org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest} in Hadoop 2.6+.
+     */
+    private static class ContainerRequestWithConstructor extends AMRMClient.ContainerRequest {
+        private String nodeLabelsExpression;
+
+        public ContainerRequestWithConstructor(
+                Resource capability, String[] nodes, String[] racks, Priority priority) {
+            super(capability, nodes, racks, priority);
+        }
+
+        public ContainerRequestWithConstructor(
+                Resource capability,
+                String[] nodes,
+                String[] racks,
+                Priority priority,
+                boolean relaxLocality,
+                String nodeLabelsExpression) {
+            super(capability, nodes, racks, priority);
+            this.nodeLabelsExpression = nodeLabelsExpression;
+        }
+
+        public String getNodeLabelsExpression() {
+            return nodeLabelsExpression;
+        }
+    }
+}
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerDriverTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerDriverTest.java
index 3de3655..3b823d8 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerDriverTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerDriverTest.java
@@ -515,10 +515,11 @@ public class YarnResourceManagerDriverTest extends ResourceManagerDriverTestBase
                                 ignored ->
                                         Collections.singletonList(
                                                 Collections.singletonList(
-                                                        YarnResourceManagerDriver
+                                                        ContainerRequestReflector.INSTANCE
                                                                 .getContainerRequest(
                                                                         testingResource,
-                                                                        Priority.UNDEFINED))))
+                                                                        Priority.UNDEFINED,
+                                                                        null))))
                         .setRemoveContainerRequestConsumer(
                                 (request, handler) -> removeContainerRequestFuture.complete(null))
                         .setReleaseAssignedContainerConsumer(