You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gu...@apache.org on 2021/12/21 02:02:58 UTC
[flink] branch master updated: [FLINK-25268][yarn] Support task manager node-label in Yarn deployment
This is an automated email from the ASF dual-hosted git repository.
guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 71ed78d [FLINK-25268][yarn] Support task manager node-label in Yarn deployment
71ed78d is described below
commit 71ed78d8d72a76818c8c621eaf728aafda8f28f8
Author: Junfan Zhang <ju...@outlook.com>
AuthorDate: Mon Dec 13 13:53:10 2021 +0800
[FLINK-25268][yarn] Support task manager node-label in Yarn deployment
This closes #18087.
---
.../generated/yarn_config_configuration.html | 6 +
.../flink/yarn/ContainerRequestReflector.java | 110 +++++++++++++++
.../flink/yarn/YarnResourceManagerDriver.java | 17 ++-
.../yarn/configuration/YarnConfigOptions.java | 8 ++
.../flink/yarn/ContainerRequestReflectorTest.java | 155 +++++++++++++++++++++
.../flink/yarn/YarnResourceManagerDriverTest.java | 5 +-
6 files changed, 290 insertions(+), 11 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/yarn_config_configuration.html b/docs/layouts/shortcodes/generated/yarn_config_configuration.html
index 1d2f0e6..929089d 100644
--- a/docs/layouts/shortcodes/generated/yarn_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/yarn_config_configuration.html
@@ -176,5 +176,11 @@
<td>String</td>
<td>A comma-separated list of tags to apply to the Flink YARN application.</td>
</tr>
+ <tr>
+ <td><h5>yarn.taskmanager.node-label</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Specify YARN node label for the Flink TaskManagers, it will override the yarn.application.node-label for TaskManagers if both are set.</td>
+ </tr>
</tbody>
</table>
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/ContainerRequestReflector.java b/flink-yarn/src/main/java/org/apache/flink/yarn/ContainerRequestReflector.java
new file mode 100644
index 0000000..faa7f59
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/ContainerRequestReflector.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * Use reflection to determine whether the Hadoop supports node-label, depending on the Hadoop
+ * version, may or may not be supported. If not, nothing happened.
+ *
+ * <p>The node label mechanism is supported by Hadoop version greater than 2.6.0
+ */
+class ContainerRequestReflector {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ContainerRequestReflector.class);
+
+ static final ContainerRequestReflector INSTANCE = new ContainerRequestReflector();
+
+ @Nullable private Constructor<? extends AMRMClient.ContainerRequest> defaultConstructor;
+
+ private ContainerRequestReflector() {
+ this(AMRMClient.ContainerRequest.class);
+ }
+
+ @VisibleForTesting
+ ContainerRequestReflector(Class<? extends AMRMClient.ContainerRequest> containerRequestClass) {
+ Class<? extends AMRMClient.ContainerRequest> requestCls = containerRequestClass;
+ try {
+ /**
+ * To support node-label, using the below constructor. Please refer to
+ * https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java#L287
+ *
+ * <p>Instantiates a {@link ContainerRequest} with the given constraints.
+ *
+ * @param capability The {@link Resource} to be requested for each container.
+ * @param nodes Any hosts to request that the containers are placed on.
+ * @param racks Any racks to request that the containers are placed on. The racks
+ * corresponding to any hosts requested will be automatically added to this list.
+ * @param priority The priority at which to request the containers. Higher priorities
+ * have lower numerical values.
+ * @param allocationRequestId The allocationRequestId of the request. To be used as a
+ * tracking id to match Containers allocated against this request. Will default to 0
+ * if not specified.
+ * @param relaxLocality If true, containers for this request may be assigned on hosts
+ * and racks other than the ones explicitly requested.
+ * @param nodeLabelsExpression Set node labels to allocate resource, now we only support
+ * asking for only a single node label
+ */
+ defaultConstructor =
+ requestCls.getDeclaredConstructor(
+ Resource.class,
+ String[].class,
+ String[].class,
+ Priority.class,
+ boolean.class,
+ String.class);
+ } catch (NoSuchMethodException exception) {
+ LOG.debug(
+ "The node-label mechanism of Yarn don't be supported in this Hadoop version.");
+ }
+ }
+
+ public AMRMClient.ContainerRequest getContainerRequest(
+ Resource containerResource, Priority priority, String nodeLabel) {
+ if (StringUtils.isNullOrWhitespaceOnly(nodeLabel) || defaultConstructor == null) {
+ return new AMRMClient.ContainerRequest(containerResource, null, null, priority);
+ }
+
+ try {
+ /**
+ * Set the param of relaxLocality to true, which tells the Yarn ResourceManager if the
+ * application wants locality to be loose (i.e. allows fall-through to rack or any)
+ */
+ return defaultConstructor.newInstance(
+ containerResource, null, null, priority, true, nodeLabel);
+ } catch (InvocationTargetException | InstantiationException | IllegalAccessException e) {
+ LOG.warn("Errors on creating Container Request.", e);
+ }
+
+ return new AMRMClient.ContainerRequest(containerResource, null, null, priority);
+ }
+}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
index 6dd0d65..ddbd7ad 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
@@ -56,7 +56,6 @@ import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
@@ -117,6 +116,8 @@ public class YarnResourceManagerDriver extends AbstractResourceManagerDriver<Yar
private TaskExecutorProcessSpecContainerResourcePriorityAdapter
taskExecutorProcessSpecContainerResourcePriorityAdapter;
+ private String taskManagerNodeLabel;
+
public YarnResourceManagerDriver(
Configuration flinkConfig,
YarnResourceManagerDriverConfiguration configuration,
@@ -148,6 +149,9 @@ public class YarnResourceManagerDriver extends AbstractResourceManagerDriver<Yar
flinkConfig.getInteger(
YarnConfigOptions.CONTAINER_REQUEST_HEARTBEAT_INTERVAL_MILLISECONDS);
+ this.taskManagerNodeLabel =
+ flinkConfig.getString(YarnConfigOptions.TASK_MANAGER_NODE_LABEL);
+
this.registerApplicationMasterResponseReflector =
new RegisterApplicationMasterResponseReflector(log);
@@ -259,7 +263,9 @@ public class YarnResourceManagerDriver extends AbstractResourceManagerDriver<Yar
} else {
final Priority priority = priorityAndResourceOpt.get().getPriority();
final Resource resource = priorityAndResourceOpt.get().getResource();
- resourceManagerClient.addContainerRequest(getContainerRequest(resource, priority));
+ resourceManagerClient.addContainerRequest(
+ ContainerRequestReflector.INSTANCE.getContainerRequest(
+ resource, priority, taskManagerNodeLabel));
// make sure we transmit the request fast and receive fast news of granted allocations
resourceManagerClient.setHeartbeatInterval(containerRequestHeartbeatIntervalMillis);
@@ -547,13 +553,6 @@ public class YarnResourceManagerDriver extends AbstractResourceManagerDriver<Yar
}
}
- @Nonnull
- @VisibleForTesting
- static AMRMClient.ContainerRequest getContainerRequest(
- Resource containerResource, Priority priority) {
- return new AMRMClient.ContainerRequest(containerResource, null, null, priority);
- }
-
@VisibleForTesting
private static ResourceID getContainerResourceId(Container container) {
return new ResourceID(container.getId().toString(), container.getNodeId().toString());
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index 8533e63..c98e106 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -296,6 +296,14 @@ public class YarnConfigOptions {
.noDefaultValue()
.withDescription("Specify YARN node label for the YARN application.");
+ public static final ConfigOption<String> TASK_MANAGER_NODE_LABEL =
+ key("yarn.taskmanager.node-label")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Specify YARN node label for the Flink TaskManagers, it will "
+ + "override the yarn.application.node-label for TaskManagers if both are set.");
+
public static final ConfigOption<Boolean> SHIP_LOCAL_KEYTAB =
key("yarn.security.kerberos.ship-local-keytab")
.booleanType()
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/ContainerRequestReflectorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/ContainerRequestReflectorTest.java
new file mode 100644
index 0000000..7fdb45b
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/ContainerRequestReflectorTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.junit.Test;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/** Tests for {@link ContainerRequestReflector}. */
+public class ContainerRequestReflectorTest extends TestLogger {
+
+ @Test
+ public void testGetContainerRequestIfConstructorPresent() {
+ final ContainerRequestReflector containerRequestReflector =
+ new ContainerRequestReflector(ContainerRequestWithConstructor.class);
+ Resource resource = Resource.newInstance(100, 1);
+ Priority priority = Priority.newInstance(1);
+
+ AMRMClient.ContainerRequest containerRequest =
+ containerRequestReflector.getContainerRequest(resource, priority, "GPU");
+ assertTrue(containerRequest instanceof ContainerRequestWithConstructor);
+ ContainerRequestWithConstructor containerRequestWithConstructor =
+ (ContainerRequestWithConstructor) containerRequest;
+ assertEquals("GPU", containerRequestWithConstructor.getNodeLabelsExpression());
+
+ containerRequest = containerRequestReflector.getContainerRequest(resource, priority, null);
+ assertFalse(containerRequest instanceof ContainerRequestWithConstructor);
+
+ containerRequest = containerRequestReflector.getContainerRequest(resource, priority, "");
+ assertFalse(containerRequest instanceof ContainerRequestWithConstructor);
+ }
+
+ @Test
+ public void testGetContainerRequestIfConstructorAbsent() {
+ final ContainerRequestReflector containerRequestReflector =
+ new ContainerRequestReflector(ContainerRequestWithoutConstructor.class);
+ Resource resource = Resource.newInstance(100, 1);
+ Priority priority = Priority.newInstance(1);
+
+ AMRMClient.ContainerRequest containerRequest =
+ containerRequestReflector.getContainerRequest(resource, priority, "GPU");
+ assertFalse(containerRequest instanceof ContainerRequestWithoutConstructor);
+
+ containerRequest = containerRequestReflector.getContainerRequest(resource, priority, null);
+ assertFalse(containerRequest instanceof ContainerRequestWithoutConstructor);
+
+ containerRequest = containerRequestReflector.getContainerRequest(resource, priority, "");
+ assertFalse(containerRequest instanceof ContainerRequestWithoutConstructor);
+ }
+
+ @Test
+ public void testGetContainerRequestWithoutYarnSupport() {
+ assumeTrue(HadoopUtils.isMaxHadoopVersion(2, 6));
+
+ Resource resource = Resource.newInstance(100, 1);
+ Priority priority = Priority.newInstance(1);
+
+ ContainerRequestReflector.INSTANCE.getContainerRequest(resource, priority, "GPU");
+ ContainerRequestReflector.INSTANCE.getContainerRequest(resource, priority, null);
+ ContainerRequestReflector.INSTANCE.getContainerRequest(resource, priority, "");
+ }
+
+ @Test
+ public void testGetContainerRequestWithYarnSupport()
+ throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+ assumeTrue(HadoopUtils.isMinHadoopVersion(2, 6));
+
+ Resource resource = Resource.newInstance(100, 1);
+ Priority priority = Priority.newInstance(1);
+
+ AMRMClient.ContainerRequest containerRequest =
+ ContainerRequestReflector.INSTANCE.getContainerRequest(resource, priority, "GPU");
+ assertEquals("GPU", getNodeLabelExpressionWithReflector(containerRequest));
+
+ containerRequest =
+ ContainerRequestReflector.INSTANCE.getContainerRequest(resource, priority, null);
+ assertNull(getNodeLabelExpressionWithReflector(containerRequest));
+
+ containerRequest =
+ ContainerRequestReflector.INSTANCE.getContainerRequest(resource, priority, "");
+ assertNull(getNodeLabelExpressionWithReflector(containerRequest));
+ }
+
+ private String getNodeLabelExpressionWithReflector(AMRMClient.ContainerRequest containerRequest)
+ throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+ Method method = containerRequest.getClass().getMethod("getNodeLabelExpression");
+ return (String) method.invoke(containerRequest);
+ }
+
+ /** Class which does not have required constructor. */
+ private static class ContainerRequestWithoutConstructor extends AMRMClient.ContainerRequest {
+
+ public ContainerRequestWithoutConstructor(
+ Resource capability, String[] nodes, String[] racks, Priority priority) {
+ super(capability, nodes, racks, priority);
+ }
+ }
+
+ /**
+ * Class which has constructor with the same signature as {@link
+ * org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest} in Hadoop 2.6+.
+ */
+ private static class ContainerRequestWithConstructor extends AMRMClient.ContainerRequest {
+ private String nodeLabelsExpression;
+
+ public ContainerRequestWithConstructor(
+ Resource capability, String[] nodes, String[] racks, Priority priority) {
+ super(capability, nodes, racks, priority);
+ }
+
+ public ContainerRequestWithConstructor(
+ Resource capability,
+ String[] nodes,
+ String[] racks,
+ Priority priority,
+ boolean relaxLocality,
+ String nodeLabelsExpression) {
+ super(capability, nodes, racks, priority);
+ this.nodeLabelsExpression = nodeLabelsExpression;
+ }
+
+ public String getNodeLabelsExpression() {
+ return nodeLabelsExpression;
+ }
+ }
+}
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerDriverTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerDriverTest.java
index 3de3655..3b823d8 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerDriverTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerDriverTest.java
@@ -515,10 +515,11 @@ public class YarnResourceManagerDriverTest extends ResourceManagerDriverTestBase
ignored ->
Collections.singletonList(
Collections.singletonList(
- YarnResourceManagerDriver
+ ContainerRequestReflector.INSTANCE
.getContainerRequest(
testingResource,
- Priority.UNDEFINED))))
+ Priority.UNDEFINED,
+ null))))
.setRemoveContainerRequestConsumer(
(request, handler) -> removeContainerRequestFuture.complete(null))
.setReleaseAssignedContainerConsumer(