You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2018/08/25 15:49:46 UTC
[29/50] [abbrv] hadoop git commit: YARN-7871. Node attributes
reporting from NM to RM. Contributed by Weiwei Yang.
YARN-7871. Node attributes reporting from NM to RM. Contributed by Weiwei Yang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2b4c0901
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2b4c0901
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2b4c0901
Branch: refs/heads/YARN-3409
Commit: 2b4c09017fdda6c58b845b2059bc404fa3621198
Parents: 0d7a75e
Author: Naganarasimha <na...@apache.org>
Authored: Mon Mar 12 08:05:53 2018 +0800
Committer: Sunil G <su...@apache.org>
Committed: Sat Aug 25 21:10:56 2018 +0530
----------------------------------------------------------------------
.../hadoop/yarn/conf/YarnConfiguration.java | 10 +-
.../yarn/nodelabels/NodeAttributesManager.java | 17 +-
.../hadoop/yarn/nodelabels/NodeLabelUtil.java | 19 ++
.../src/main/resources/yarn-default.xml | 24 +++
.../yarn/server/nodemanager/NodeManager.java | 70 +++++--
.../server/nodemanager/NodeStatusUpdater.java | 14 ++
.../nodemanager/NodeStatusUpdaterImpl.java | 70 ++++++-
.../ConfigurationNodeAttributesProvider.java | 90 +++++++++
.../server/nodemanager/TestNodeManager.java | 2 +-
.../TestNodeStatusUpdaterForLabels.java | 10 +-
...TestConfigurationNodeAttributesProvider.java | 185 +++++++++++++++++++
.../resourcemanager/ResourceTrackerService.java | 30 +++
.../nodelabels/NodeAttributesManagerImpl.java | 52 ++++--
.../TestResourceTrackerService.java | 78 ++++++++
.../nodelabels/TestNodeAttributesManager.java | 99 ++++++++++
15 files changed, 718 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b4c0901/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 15f8b3b..b331381 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -3531,9 +3531,12 @@ public class YarnConfiguration extends Configuration {
public static final String NM_NODE_LABELS_PROVIDER_CONFIG =
NM_NODE_LABELS_PREFIX + "provider";
+ public static final String NM_NODE_ATTRIBUTES_PROVIDER_CONFIG =
+ NM_NODE_ATTRIBUTES_PREFIX + "provider";
+
// whitelist names for the yarn.nodemanager.node-labels.provider
- public static final String CONFIG_NODE_LABELS_PROVIDER = "config";
- public static final String SCRIPT_NODE_LABELS_PROVIDER = "script";
+ public static final String CONFIG_NODE_DESCRIPTOR_PROVIDER = "config";
+ public static final String SCRIPT_NODE_DESCRIPTOR_PROVIDER = "script";
private static final String NM_NODE_LABELS_PROVIDER_PREFIX =
NM_NODE_LABELS_PREFIX + "provider.";
@@ -3565,6 +3568,9 @@ public class YarnConfiguration extends Configuration {
public static final String NM_PROVIDER_CONFIGURED_NODE_PARTITION =
NM_NODE_LABELS_PROVIDER_PREFIX + "configured-node-partition";
+ public static final String NM_PROVIDER_CONFIGURED_NODE_ATTRIBUTES =
+ NM_NODE_ATTRIBUTES_PROVIDER_PREFIX + "configured-node-attributes";
+
private static final String RM_NODE_LABELS_PREFIX = RM_PREFIX
+ "node-labels.";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b4c0901/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java
index effda9b..ffa33cf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java
@@ -35,15 +35,18 @@ public abstract class NodeAttributesManager extends AbstractService {
/**
* To completely replace the mappings for a given node with the new Set of
- * Attributes. If the mapping contains an attribute whose type does not match
- * a previously existing Attribute under the same prefix (name space) then
- * exception is thrown. Key would be name of the node and value would be set
- * of Attributes to be mapped.
+ * Attributes which are under a given prefix. If the mapping contains an
+ * attribute whose type does not match a previously existing Attribute
+ * under the same prefix (name space) then exception is thrown.
+ * Key would be name of the node and value would be set of Attributes to
+ * be mapped. If the prefix is null, then all node attributes will be
+ * replaced regardless of what prefix they have.
*
- * @param nodeAttributeMapping
- * @throws IOException
+ * @param prefix node attribute prefix
+ * @param nodeAttributeMapping host name to a set of node attributes mapping
+ * @throws IOException if failed to replace attributes
*/
- public abstract void replaceNodeAttributes(
+ public abstract void replaceNodeAttributes(String prefix,
Map<String, Set<NodeAttribute>> nodeAttributeMapping) throws IOException;
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b4c0901/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelUtil.java
index fdfd0ce..93a27a9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelUtil.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.NodeAttribute;
import java.io.IOException;
import java.util.Set;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
/**
* Utility class for all NodeLabel and NodeAttribute operations.
@@ -125,4 +126,22 @@ public final class NodeLabelUtil {
}
}
}
+
+ /**
+ * Filter a set of node attributes by a given prefix. Returns a filtered
+ * set of node attributes whose prefix equals the given prefix.
+ * If the prefix is null or empty, then the original set is returned.
+ * @param attributeSet node attribute set
+ * @param prefix node attribute prefix
+ * @return a filtered set of node attributes
+ */
+ public static Set<NodeAttribute> filterAttributesByPrefix(
+ Set<NodeAttribute> attributeSet, String prefix) {
+ if (Strings.isNullOrEmpty(prefix)) {
+ return attributeSet;
+ }
+ return attributeSet.stream().filter(
+ nodeAttribute -> prefix.equals(nodeAttribute.getAttributePrefix()))
+ .collect(Collectors.toSet());
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b4c0901/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 7278273..e6d708f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2897,6 +2897,20 @@
<!-- Distributed Node Attributes Configuration -->
<property>
<description>
+ This property determines which provider will be plugged by the
+ node manager to collect node-attributes. Administrators can
+ configure "config", "script" or the class name of the provider.
+ Configured class needs to extend
+ org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvider.
+ If "config" is configured, then "ConfigurationNodeLabelsProvider" and if
+ "script" is configured, then "ScriptBasedNodeAttributesProvider"
+ will be used.
+ </description>
+ <name>yarn.nodemanager.node-attributes.provider</name>
+ </property>
+
+ <property>
+ <description>
The node attribute script NM runs to collect node attributes.
Script output Line starting with "NODE_ATTRIBUTE:" will be
considered as a record of node attribute, attribute name, type
@@ -2934,6 +2948,16 @@
<property>
<description>
+ When "yarn.nodemanager.node-attributes.provider" is configured with
+ "config" then ConfigurationNodeAttributesProvider fetches node attributes
+ from this parameter.
+ </description>
+ <name>yarn.nodemanager.node-attributes.provider.configured-node-attributes</name>
+ <value></value>
+ </property>
+
+ <property>
+ <description>
Timeout in seconds for YARN node graceful decommission.
This is the maximal time to wait for running containers and applications to complete
before transition a DECOMMISSIONING node into DECOMMISSIONED.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b4c0901/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index b54a6b7..6eda4a8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -66,6 +66,9 @@ import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ScriptBasedNodeLabelsProvider;
+import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ScriptBasedNodeAttributesProvider;
+import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvider;
+import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeAttributesProvider;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
@@ -123,6 +126,7 @@ public class NodeManager extends CompositeService
private ApplicationACLsManager aclsManager;
private NodeHealthCheckerService nodeHealthChecker;
private NodeLabelsProvider nodeLabelsProvider;
+ private NodeAttributesProvider nodeAttributesProvider;
private LocalDirsHandlerService dirsHandler;
private Context context;
private AsyncDispatcher dispatcher;
@@ -162,14 +166,45 @@ public class NodeManager extends CompositeService
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
- metrics, nodeLabelsProvider);
+ metrics);
}
- protected NodeStatusUpdater createNodeStatusUpdater(Context context,
- Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
- NodeLabelsProvider nodeLabelsProvider) {
- return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
- metrics, nodeLabelsProvider);
+ protected NodeAttributesProvider createNodeAttributesProvider(
+ Configuration conf) throws IOException {
+ NodeAttributesProvider attributesProvider = null;
+ String providerString =
+ conf.get(YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_CONFIG, null);
+ if (providerString == null || providerString.trim().length() == 0) {
+ return attributesProvider;
+ }
+ switch (providerString.trim().toLowerCase()) {
+ case YarnConfiguration.CONFIG_NODE_DESCRIPTOR_PROVIDER:
+ attributesProvider = new ConfigurationNodeAttributesProvider();
+ break;
+ case YarnConfiguration.SCRIPT_NODE_DESCRIPTOR_PROVIDER:
+ attributesProvider = new ScriptBasedNodeAttributesProvider();
+ break;
+ default:
+ try {
+ Class<? extends NodeAttributesProvider> labelsProviderClass =
+ conf.getClass(YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_CONFIG,
+ null, NodeAttributesProvider.class);
+ attributesProvider = labelsProviderClass.newInstance();
+ } catch (InstantiationException | IllegalAccessException
+ | RuntimeException e) {
+ LOG.error("Failed to create NodeAttributesProvider"
+ + " based on Configuration", e);
+ throw new IOException(
+ "Failed to create NodeAttributesProvider : "
+ + e.getMessage(), e);
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Distributed Node Attributes is enabled"
+ + " with provider class as : "
+ + attributesProvider.getClass().toString());
+ }
+ return attributesProvider;
}
protected NodeLabelsProvider createNodeLabelsProvider(Configuration conf)
@@ -182,10 +217,10 @@ public class NodeManager extends CompositeService
return provider;
}
switch (providerString.trim().toLowerCase()) {
- case YarnConfiguration.CONFIG_NODE_LABELS_PROVIDER:
+ case YarnConfiguration.CONFIG_NODE_DESCRIPTOR_PROVIDER:
provider = new ConfigurationNodeLabelsProvider();
break;
- case YarnConfiguration.SCRIPT_NODE_LABELS_PROVIDER:
+ case YarnConfiguration.SCRIPT_NODE_DESCRIPTOR_PROVIDER:
provider = new ScriptBasedNodeLabelsProvider();
break;
default:
@@ -407,16 +442,19 @@ public class NodeManager extends CompositeService
((NMContext)context).setContainerExecutor(exec);
((NMContext)context).setDeletionService(del);
- nodeLabelsProvider = createNodeLabelsProvider(conf);
+ nodeStatusUpdater =
+ createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
- if (null == nodeLabelsProvider) {
- nodeStatusUpdater =
- createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
- } else {
+ nodeLabelsProvider = createNodeLabelsProvider(conf);
+ if (nodeLabelsProvider != null) {
addIfService(nodeLabelsProvider);
- nodeStatusUpdater =
- createNodeStatusUpdater(context, dispatcher, nodeHealthChecker,
- nodeLabelsProvider);
+ nodeStatusUpdater.setNodeLabelsProvider(nodeLabelsProvider);
+ }
+
+ nodeAttributesProvider = createNodeAttributesProvider(conf);
+ if (nodeAttributesProvider != null) {
+ addIfService(nodeAttributesProvider);
+ nodeStatusUpdater.setNodeAttributesProvider(nodeAttributesProvider);
}
nodeResourceMonitor = createNodeResourceMonitor();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b4c0901/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
index 08892d2..142cbbc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.nodemanager;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvider;
+import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
public interface NodeStatusUpdater extends Service {
@@ -59,4 +61,16 @@ public interface NodeStatusUpdater extends Service {
* @param ex exception that makes the node unhealthy
*/
void reportException(Exception ex);
+
+ /**
+ * Sets a node attributes provider to node manager.
+ * @param provider
+ */
+ void setNodeAttributesProvider(NodeAttributesProvider provider);
+
+ /**
+ * Sets a node labels provider to the node manager.
+ * @param provider
+ */
+ void setNodeLabelsProvider(NodeLabelsProvider provider);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b4c0901/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 7be9ef7..df76ed7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -85,6 +86,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvider;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
@@ -152,21 +154,16 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
Set<ContainerId> pendingContainersToRemove = new HashSet<ContainerId>();
private NMNodeLabelsHandler nodeLabelsHandler;
- private final NodeLabelsProvider nodeLabelsProvider;
+ private NMNodeAttributesHandler nodeAttributesHandler;
+ private NodeLabelsProvider nodeLabelsProvider;
+ private NodeAttributesProvider nodeAttributesProvider;
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
- this(context, dispatcher, healthChecker, metrics, null);
- }
-
- public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
- NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
- NodeLabelsProvider nodeLabelsProvider) {
super(NodeStatusUpdaterImpl.class.getName());
this.healthChecker = healthChecker;
this.context = context;
this.dispatcher = dispatcher;
- this.nodeLabelsProvider = nodeLabelsProvider;
this.metrics = metrics;
this.recentlyStoppedContainers = new LinkedHashMap<ContainerId, Long>();
this.pendingCompletedContainers =
@@ -176,6 +173,16 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
}
@Override
+ public void setNodeAttributesProvider(NodeAttributesProvider provider) {
+ this.nodeAttributesProvider = provider;
+ }
+
+ @Override
+ public void setNodeLabelsProvider(NodeLabelsProvider provider) {
+ this.nodeLabelsProvider = provider;
+ }
+
+ @Override
protected void serviceInit(Configuration conf) throws Exception {
this.totalResource = NodeManagerHardwareUtils.getNodeResources(conf);
long memoryMb = totalResource.getMemorySize();
@@ -214,7 +221,11 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
YarnConfiguration.NM_RESOURCEMANAGER_MINIMUM_VERSION,
YarnConfiguration.DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION);
- nodeLabelsHandler = createNMNodeLabelsHandler(nodeLabelsProvider);
+ nodeLabelsHandler =
+ createNMNodeLabelsHandler(nodeLabelsProvider);
+ nodeAttributesHandler =
+ createNMNodeAttributesHandler(nodeAttributesProvider);
+
// Default duration to track stopped containers on nodemanager is 10Min.
// This should not be assigned very large value as it will remember all the
// containers stopped during that time.
@@ -856,6 +867,43 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
}
}
+ /**
+ * Returns a handler based on the configured node attributes provider.
+ * returns null if no provider is configured.
+ * @param provider
+ * @return attributes handler
+ */
+ private NMNodeAttributesHandler createNMNodeAttributesHandler(
+ NodeAttributesProvider provider) {
+ return provider == null ? null :
+ new NMDistributedNodeAttributesHandler(nodeAttributesProvider);
+ }
+
+ private interface NMNodeAttributesHandler {
+
+ /**
+ * @return the node attributes of this node manager.
+ */
+ Set<NodeAttribute> getNodeAttributesForHeartbeat();
+ }
+
+ private static class NMDistributedNodeAttributesHandler
+ implements NMNodeAttributesHandler {
+
+ private final NodeAttributesProvider attributesProvider;
+
+ protected NMDistributedNodeAttributesHandler(
+ NodeAttributesProvider provider) {
+ this.attributesProvider = provider;
+ }
+
+ @Override
+ public Set<NodeAttribute> getNodeAttributesForHeartbeat() {
+ return attributesProvider.getDescriptors();
+ }
+ }
+
+
private static interface NMNodeLabelsHandler {
/**
* validates nodeLabels From Provider and returns it to the caller. Also
@@ -1071,6 +1119,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
NodeHeartbeatResponse response = null;
Set<NodeLabel> nodeLabelsForHeartbeat =
nodeLabelsHandler.getNodeLabelsForHeartbeat();
+ Set<NodeAttribute> nodeAttributesForHeartbeat =
+ nodeAttributesHandler == null ? null :
+ nodeAttributesHandler.getNodeAttributesForHeartbeat();
NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);
NodeHeartbeatRequest request =
NodeHeartbeatRequest.newInstance(nodeStatus,
@@ -1079,6 +1130,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
NodeStatusUpdaterImpl.this.context
.getNMTokenSecretManager().getCurrentKey(),
nodeLabelsForHeartbeat,
+ nodeAttributesForHeartbeat,
NodeStatusUpdaterImpl.this.context
.getRegisteringCollectors());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b4c0901/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeAttributesProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeAttributesProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeAttributesProvider.java
new file mode 100644
index 0000000..74341eb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeAttributesProvider.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.TimerTask;
+import java.util.Set;
+
+/**
+ * Configuration based node attributes provider.
+ */
+public class ConfigurationNodeAttributesProvider
+ extends NodeAttributesProvider {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ConfigurationNodeAttributesProvider.class);
+
+ public ConfigurationNodeAttributesProvider() {
+ super("Configuration Based Node Attributes Provider");
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ long taskInterval = conf.getLong(YarnConfiguration
+ .NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS,
+ YarnConfiguration
+ .DEFAULT_NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS);
+ this.setIntervalTime(taskInterval);
+ super.serviceInit(conf);
+ }
+
+ private void updateNodeAttributesFromConfig(Configuration conf)
+ throws IOException {
+ String configuredNodeAttributes = conf.get(
+ YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_ATTRIBUTES, null);
+ setDescriptors(parseAttributes(configuredNodeAttributes));
+ }
+
+ // TODO parse attributes from configuration
+ @VisibleForTesting
+ public Set<NodeAttribute> parseAttributes(String config)
+ throws IOException {
+ return new HashSet<>();
+ }
+
+ private class ConfigurationMonitorTimerTask extends TimerTask {
+ @Override
+ public void run() {
+ try {
+ updateNodeAttributesFromConfig(new YarnConfiguration());
+ } catch (Exception e) {
+ LOG.error("Failed to update node attributes from "
+ + YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_ATTRIBUTES, e);
+ }
+ }
+ }
+
+ @Override
+ protected void cleanUp() throws Exception {
+ // Nothing to cleanup
+ }
+
+ @Override
+ public TimerTask createTimerTask() {
+ return new ConfigurationMonitorTimerTask();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b4c0901/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java
index b31215b..b2c2f6e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java
@@ -160,7 +160,7 @@ public class TestNodeManager {
// With valid whitelisted configurations
conf.set(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CONFIG,
- YarnConfiguration.CONFIG_NODE_LABELS_PROVIDER);
+ YarnConfiguration.CONFIG_NODE_DESCRIPTOR_PROVIDER);
labelsProviderService = nodeManager.createNodeLabelsProvider(conf);
Assert.assertNotNull("LabelsProviderService should be initialized When "
+ "node labels provider class is configured", labelsProviderService);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b4c0901/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java
index 7ef23cb..3e2d963 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java
@@ -225,11 +225,10 @@ public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase {
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
- Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
- NodeLabelsProvider labelsProvider) {
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
- metrics, labelsProvider) {
+ metrics) {
@Override
protected ResourceTracker getRMClient() {
return resourceTracker;
@@ -325,11 +324,10 @@ public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase {
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
- Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
- NodeLabelsProvider labelsProvider) {
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
- metrics, labelsProvider) {
+ metrics) {
@Override
protected ResourceTracker getRMClient() {
return resourceTracker;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b4c0901/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeAttributesProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeAttributesProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeAttributesProvider.java
new file mode 100644
index 0000000..54cc8f0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeAttributesProvider.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.api.records.NodeAttributeType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.BeforeClass;
+import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.Assert;
+
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Test class for node configuration node attributes provider.
+ */
+public class TestConfigurationNodeAttributesProvider {
+
+ private static File testRootDir = new File("target",
+ TestConfigurationNodeAttributesProvider.class.getName() + "-localDir")
+ .getAbsoluteFile();
+
+ private ConfigurationNodeAttributesProvider nodeAttributesProvider;
+
+ @BeforeClass
+ public static void create() {
+ testRootDir.mkdirs();
+ }
+
+ @Before
+ public void setup() {
+ nodeAttributesProvider = new ConfigurationNodeAttributesProvider();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (nodeAttributesProvider != null) {
+ nodeAttributesProvider.close();
+ nodeAttributesProvider.stop();
+ }
+ }
+
+ @AfterClass
+ public static void remove() throws Exception {
+ if (testRootDir.exists()) {
+ FileContext.getLocalFSFileContext()
+ .delete(new Path(testRootDir.getAbsolutePath()), true);
+ }
+ }
+
+ @Test(timeout=30000L)
+ public void testNodeAttributesFetchInterval()
+ throws IOException, InterruptedException {
+ Set<NodeAttribute> expectedAttributes1 = new HashSet<>();
+ expectedAttributes1.add(NodeAttribute
+ .newInstance("test.io", "host",
+ NodeAttributeType.STRING, "host1"));
+
+ Configuration conf = new Configuration();
+ // Set fetch interval to 1s for testing
+ conf.setLong(
+ YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS, 1000);
+ ConfigurationNodeAttributesProvider spyProvider =
+ Mockito.spy(nodeAttributesProvider);
+ Mockito.when(spyProvider.parseAttributes(Mockito.anyString()))
+ .thenReturn(expectedAttributes1);
+
+ spyProvider.init(conf);
+ spyProvider.start();
+
+ // Verify init value is honored.
+ Assert.assertEquals(expectedAttributes1, spyProvider.getDescriptors());
+
+ // Configuration provider provides a different set of attributes.
+ Set<NodeAttribute> expectedAttributes2 = new HashSet<>();
+ expectedAttributes2.add(NodeAttribute
+ .newInstance("test.io", "os",
+ NodeAttributeType.STRING, "windows"));
+ Mockito.when(spyProvider.parseAttributes(Mockito.anyString()))
+ .thenReturn(expectedAttributes2);
+
+ // Since we set fetch interval to 1s, it needs to wait for 1s until
+ // the updated attributes is updated to the provider. So we are expecting
+ // to see some old values for a short window.
+ ArrayList<String> keysMet = new ArrayList<>();
+ int numOfOldValue = 0;
+ int numOfNewValue = 0;
+ // Run 5 times in 500ms interval
+ int times=5;
+ while(times>0) {
+ Set<NodeAttribute> current = spyProvider.getDescriptors();
+ Assert.assertEquals(1, current.size());
+ String attributeName = current.iterator().next().getAttributeName();
+ if ("host".equals(attributeName)){
+ numOfOldValue++;
+ } else if ("os".equals(attributeName)) {
+ numOfNewValue++;
+ }
+ Thread.sleep(500);
+ times--;
+ }
+ // We should either see the old value or the new value.
+ Assert.assertEquals(5, numOfNewValue + numOfOldValue);
+ // Both values should be more than 0.
+ Assert.assertTrue(numOfOldValue > 0);
+ Assert.assertTrue(numOfNewValue > 0);
+ }
+
+ @Test
+ public void testDisableFetchNodeAttributes() throws IOException,
+ InterruptedException {
+ Set<NodeAttribute> expectedAttributes1 = new HashSet<>();
+ expectedAttributes1.add(NodeAttribute
+ .newInstance("test.io", "host",
+ NodeAttributeType.STRING, "host1"));
+
+ Configuration conf = new Configuration();
+ // Set fetch interval to -1 to disable refresh.
+ conf.setLong(
+ YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS, -1);
+ ConfigurationNodeAttributesProvider spyProvider =
+ Mockito.spy(nodeAttributesProvider);
+ Mockito.when(spyProvider.parseAttributes(Mockito.anyString()))
+ .thenReturn(expectedAttributes1);
+ spyProvider.init(conf);
+ spyProvider.start();
+
+ Assert.assertEquals(expectedAttributes1,
+ spyProvider.getDescriptors());
+
+ // The configuration added another attribute,
+ // as we disabled the fetch interval, this value cannot be
+ // updated to the provider.
+ Set<NodeAttribute> expectedAttributes2 = new HashSet<>();
+ expectedAttributes2.add(NodeAttribute
+ .newInstance("test.io", "os",
+ NodeAttributeType.STRING, "windows"));
+ Mockito.when(spyProvider.parseAttributes(Mockito.anyString()))
+ .thenReturn(expectedAttributes2);
+
+ // Wait a few seconds until we get the value update, expecting a failure.
+ try {
+ GenericTestUtils.waitFor(() -> {
+ Set<NodeAttribute> attributes = spyProvider.getDescriptors();
+ return "os".equalsIgnoreCase(attributes
+ .iterator().next().getAttributeName());
+ }, 500, 1000);
+ } catch (Exception e) {
+ // Make sure we get the timeout exception.
+ Assert.assertTrue(e instanceof TimeoutException);
+ return;
+ }
+
+ Assert.fail("Expecting a failure in previous check!");
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b4c0901/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index e997192..8a1a9a7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -33,6 +33,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.collections.CollectionUtils;
+import com.google.common.collect.ImmutableMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -646,6 +648,34 @@ public class ResourceTrackerService extends AbstractService implements
this.rmContext.getNodeManagerQueueLimitCalculator()
.createContainerQueuingLimit());
}
+
+ // 8. Get node's attributes and update node-to-attributes mapping
+ // in RMNodeAttributeManager.
+ Set<NodeAttribute> nodeAttributes = request.getNodeAttributes();
+ if (nodeAttributes != null && !nodeAttributes.isEmpty()) {
+ nodeAttributes.forEach(nodeAttribute ->
+ LOG.debug(nodeId.toString() + " ATTRIBUTE : "
+ + nodeAttribute.toString()));
+
+ // Validate attributes
+ if (!nodeAttributes.stream().allMatch(
+ nodeAttribute -> NodeAttribute.PREFIX_DISTRIBUTED
+ .equals(nodeAttribute.getAttributePrefix()))) {
+ // All attributes must be in same prefix: nm.yarn.io.
+ // Since we have the checks in NM to make sure attributes reported
+ // in HB are with correct prefix, so it should not reach here.
+ LOG.warn("Reject invalid node attributes from host: "
+ + nodeId.toString() + ", attributes in HB must have prefix "
+ + NodeAttribute.PREFIX_DISTRIBUTED);
+ } else {
+ // Replace all distributed node attributes associated with this host
+ // with the new reported attributes in node attribute manager.
+ this.rmContext.getNodeAttributesManager()
+ .replaceNodeAttributes(NodeAttribute.PREFIX_DISTRIBUTED,
+ ImmutableMap.of(nodeId.getHost(), nodeAttributes));
+ }
+ }
+
return nodeHeartBeatResponse;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b4c0901/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
index a902ac6..04d74a8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
@@ -33,6 +33,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import com.google.common.base.Strings;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -126,7 +127,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
private void internalUpdateAttributesOnNodes(
Map<String, Map<NodeAttribute, AttributeValue>> nodeAttributeMapping,
AttributeMappingOperationType op,
- Map<NodeAttribute, RMNodeAttribute> newAttributesToBeAdded) {
+ Map<NodeAttribute, RMNodeAttribute> newAttributesToBeAdded,
+ String attributePrefix) {
try {
writeLock.lock();
@@ -156,8 +158,9 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
break;
case REPLACE:
clusterAttributes.putAll(newAttributesToBeAdded);
- replaceNodeToAttribute(nodeHost, node.getAttributes(), attributes);
- node.replaceAttributes(attributes);
+ replaceNodeToAttribute(nodeHost, attributePrefix,
+ node.getAttributes(), attributes);
+ node.replaceAttributes(attributes, attributePrefix);
break;
default:
break;
@@ -199,15 +202,23 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
private void addNodeToAttribute(String nodeHost,
Map<NodeAttribute, AttributeValue> attributeMappings) {
for (NodeAttribute attribute : attributeMappings.keySet()) {
- clusterAttributes.get(attribute).addNode(nodeHost);
+ RMNodeAttribute rmNodeAttribute = clusterAttributes.get(attribute);
+ if (rmNodeAttribute != null) {
+ rmNodeAttribute.addNode(nodeHost);
+ } else {
+ clusterAttributes.put(attribute, new RMNodeAttribute(attribute));
+ }
}
}
- private void replaceNodeToAttribute(String nodeHost,
+ private void replaceNodeToAttribute(String nodeHost, String prefix,
Map<NodeAttribute, AttributeValue> oldAttributeMappings,
Map<NodeAttribute, AttributeValue> newAttributeMappings) {
if (oldAttributeMappings != null) {
- removeNodeFromAttributes(nodeHost, oldAttributeMappings.keySet());
+ Set<NodeAttribute> toRemoveAttributes =
+ NodeLabelUtil.filterAttributesByPrefix(
+ oldAttributeMappings.keySet(), prefix);
+ removeNodeFromAttributes(nodeHost, toRemoveAttributes);
}
addNodeToAttribute(nodeHost, newAttributeMappings);
}
@@ -432,8 +443,19 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
}
public void replaceAttributes(
- Map<NodeAttribute, AttributeValue> attributesMapping) {
- this.attributes.clear();
+ Map<NodeAttribute, AttributeValue> attributesMapping, String prefix) {
+ if (Strings.isNullOrEmpty(prefix)) {
+ this.attributes.clear();
+ } else {
+ Iterator<Entry<NodeAttribute, AttributeValue>> it =
+ this.attributes.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<NodeAttribute, AttributeValue> current = it.next();
+ if (prefix.equals(current.getKey().getAttributePrefix())) {
+ it.remove();
+ }
+ }
+ }
this.attributes.putAll(attributesMapping);
}
@@ -506,9 +528,10 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
}
@Override
- public void replaceNodeAttributes(
+ public void replaceNodeAttributes(String prefix,
Map<String, Set<NodeAttribute>> nodeAttributeMapping) throws IOException {
- processMapping(nodeAttributeMapping, AttributeMappingOperationType.REPLACE);
+ processMapping(nodeAttributeMapping,
+ AttributeMappingOperationType.REPLACE, prefix);
}
@Override
@@ -526,12 +549,19 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
private void processMapping(
Map<String, Set<NodeAttribute>> nodeAttributeMapping,
AttributeMappingOperationType mappingType) throws IOException {
+ processMapping(nodeAttributeMapping, mappingType, null);
+ }
+
+ private void processMapping(
+ Map<String, Set<NodeAttribute>> nodeAttributeMapping,
+ AttributeMappingOperationType mappingType, String attributePrefix)
+ throws IOException {
Map<NodeAttribute, RMNodeAttribute> newAttributesToBeAdded =
new HashMap<>();
Map<String, Map<NodeAttribute, AttributeValue>> validMapping =
validate(nodeAttributeMapping, newAttributesToBeAdded, false);
internalUpdateAttributesOnNodes(validMapping, mappingType,
- newAttributesToBeAdded);
+ newAttributesToBeAdded, attributePrefix);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b4c0901/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index fa0f5fd..a29e8a2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -37,6 +37,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.HashSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.xml.parsers.DocumentBuilderFactory;
@@ -64,12 +65,16 @@ import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.api.records.NodeAttributeType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.nodelabels.AttributeValue;
+import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
@@ -818,6 +823,79 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
}
@Test
+ public void testNodeHeartbeatWithNodeAttributes() throws Exception {
+ writeToHostsFile("host2");
+ Configuration conf = new Configuration();
+ conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
+ hostFile.getAbsolutePath());
+ rm = new MockRM(conf);
+ rm.start();
+
+ // Register to RM
+ ResourceTrackerService resourceTrackerService =
+ rm.getResourceTrackerService();
+ RegisterNodeManagerRequest registerReq =
+ Records.newRecord(RegisterNodeManagerRequest.class);
+ NodeId nodeId = NodeId.newInstance("host2", 1234);
+ Resource capability = BuilderUtils.newResource(1024, 1);
+ registerReq.setResource(capability);
+ registerReq.setNodeId(nodeId);
+ registerReq.setHttpPort(1234);
+ registerReq.setNMVersion(YarnVersionInfo.getVersion());
+ RegisterNodeManagerResponse registerResponse =
+ resourceTrackerService.registerNodeManager(registerReq);
+
+ Set<NodeAttribute> nodeAttributes = new HashSet<>();
+ nodeAttributes.add(NodeAttribute.newInstance(
+ NodeAttribute.PREFIX_DISTRIBUTED, "host",
+ NodeAttributeType.STRING, "host2"));
+
+ // Set node attributes in HB.
+ NodeHeartbeatRequest heartbeatReq =
+ Records.newRecord(NodeHeartbeatRequest.class);
+ NodeStatus nodeStatusObject = getNodeStatusObject(nodeId);
+ int responseId = nodeStatusObject.getResponseId();
+ heartbeatReq.setNodeStatus(nodeStatusObject);
+ heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse
+ .getNMTokenMasterKey());
+ heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse
+ .getContainerTokenMasterKey());
+ heartbeatReq.setNodeAttributes(nodeAttributes);
+ resourceTrackerService.nodeHeartbeat(heartbeatReq);
+
+ // Ensure RM gets correct node attributes update.
+ NodeAttributesManager attributeManager =
+ rm.getRMContext().getNodeAttributesManager();
+ Map<NodeAttribute, AttributeValue> attrs = attributeManager
+ .getAttributesForNode(nodeId.getHost());
+ Assert.assertEquals(1, attrs.size());
+ NodeAttribute na = attrs.keySet().iterator().next();
+ Assert.assertEquals("host", na.getAttributeName());
+ Assert.assertEquals("host2", na.getAttributeValue());
+ Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType());
+
+
+ // Send another HB to RM with updated node atrribute
+ nodeAttributes.clear();
+ nodeAttributes.add(NodeAttribute.newInstance(
+ NodeAttribute.PREFIX_DISTRIBUTED, "host",
+ NodeAttributeType.STRING, "host3"));
+ nodeStatusObject = getNodeStatusObject(nodeId);
+ nodeStatusObject.setResponseId(++responseId);
+ heartbeatReq.setNodeStatus(nodeStatusObject);
+ heartbeatReq.setNodeAttributes(nodeAttributes);
+ resourceTrackerService.nodeHeartbeat(heartbeatReq);
+
+ // Make sure RM gets the updated attribute
+ attrs = attributeManager.getAttributesForNode(nodeId.getHost());
+ Assert.assertEquals(1, attrs.size());
+ na = attrs.keySet().iterator().next();
+ Assert.assertEquals("host", na.getAttributeName());
+ Assert.assertEquals("host3", na.getAttributeValue());
+ Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType());
+ }
+
+ @Test
public void testNodeHeartBeatWithInvalidLabels() throws Exception {
writeToHostsFile("host2");
Configuration conf = new Configuration();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b4c0901/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java
index b639a74..07968d4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java
@@ -18,12 +18,14 @@
package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
import org.apache.hadoop.yarn.nodelabels.AttributeValue;
import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
+import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil;
import org.junit.Test;
import org.junit.Before;
import org.junit.After;
@@ -255,4 +257,101 @@ public class TestNodeAttributesManager {
.getClusterNodeAttributes(Sets.newHashSet(PREFIXES[0]));
Assert.assertEquals(2, allAttributesPerPrefix.size());
}
+
+ @Test
+ public void testReplaceNodeAttributes() throws IOException {
+ Map<String, Set<NodeAttribute>> toAddAttributes = new HashMap<>();
+ Map<String, Set<NodeAttribute>> toReplaceMap = new HashMap<>();
+ Map<NodeAttribute, AttributeValue> nodeAttributes;
+ Set<NodeAttribute> filteredAttributes;
+ Set<NodeAttribute> clusterAttributes;
+
+ // Add 3 attributes to host1
+ // yarn.test1.io/A1=host1_v1_1
+ // yarn.test1.io/A2=host1_v1_2
+ // yarn.test1.io/A3=host1_v1_3
+ toAddAttributes.put(HOSTNAMES[0],
+ createAttributesForTest(PREFIXES[0], 3, "A", "host1_v1"));
+
+ attributesManager.addNodeAttributes(toAddAttributes);
+ nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]);
+ Assert.assertEquals(3, nodeAttributes.size());
+
+ // Add 10 distributed node attributes to host1
+ // nn.yarn.io/dist-node-attribute1=dist_v1_1
+ // nn.yarn.io/dist-node-attribute2=dist_v1_2
+ // ...
+ // nn.yarn.io/dist-node-attribute10=dist_v1_10
+ toAddAttributes.clear();
+ toAddAttributes.put(HOSTNAMES[0],
+ createAttributesForTest(NodeAttribute.PREFIX_DISTRIBUTED,
+ 10, "dist-node-attribute", "dist_v1"));
+ attributesManager.addNodeAttributes(toAddAttributes);
+ nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]);
+ Assert.assertEquals(13, nodeAttributes.size());
+ clusterAttributes = attributesManager.getClusterNodeAttributes(
+ Sets.newHashSet(NodeAttribute.PREFIX_DISTRIBUTED, PREFIXES[0]));
+ Assert.assertEquals(13, clusterAttributes.size());
+
+ // Replace by prefix
+ // Same distributed attributes names, but different values.
+ Set<NodeAttribute> toReplaceAttributes =
+ createAttributesForTest(NodeAttribute.PREFIX_DISTRIBUTED, 5,
+ "dist-node-attribute", "dist_v2");
+
+ attributesManager.replaceNodeAttributes(NodeAttribute.PREFIX_DISTRIBUTED,
+ ImmutableMap.of(HOSTNAMES[0], toReplaceAttributes));
+ nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]);
+ Assert.assertEquals(8, nodeAttributes.size());
+ clusterAttributes = attributesManager.getClusterNodeAttributes(
+ Sets.newHashSet(NodeAttribute.PREFIX_DISTRIBUTED, PREFIXES[0]));
+ Assert.assertEquals(8, clusterAttributes.size());
+
+ // Now we have 5 distributed attributes
+ filteredAttributes = NodeLabelUtil.filterAttributesByPrefix(
+ nodeAttributes.keySet(), NodeAttribute.PREFIX_DISTRIBUTED);
+ Assert.assertEquals(5, filteredAttributes.size());
+ // Values are updated to have prefix dist_v2
+ Assert.assertTrue(filteredAttributes.stream().allMatch(
+ nodeAttribute ->
+ nodeAttribute.getAttributeValue().startsWith("dist_v2")));
+
+ // We still have 3 yarn.test1.io attributes
+ filteredAttributes = NodeLabelUtil.filterAttributesByPrefix(
+ nodeAttributes.keySet(), PREFIXES[0]);
+ Assert.assertEquals(3, filteredAttributes.size());
+
+ // Replace with prefix
+ // Different attribute names
+ toReplaceAttributes =
+ createAttributesForTest(NodeAttribute.PREFIX_DISTRIBUTED, 1,
+ "dist-node-attribute-v2", "dist_v3");
+ attributesManager.replaceNodeAttributes(NodeAttribute.PREFIX_DISTRIBUTED,
+ ImmutableMap.of(HOSTNAMES[0], toReplaceAttributes));
+ nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]);
+ Assert.assertEquals(4, nodeAttributes.size());
+ clusterAttributes = attributesManager.getClusterNodeAttributes(
+ Sets.newHashSet(NodeAttribute.PREFIX_DISTRIBUTED));
+ Assert.assertEquals(1, clusterAttributes.size());
+ NodeAttribute att = clusterAttributes.iterator().next();
+ Assert.assertEquals("dist-node-attribute-v2_0", att.getAttributeName());
+ Assert.assertEquals(NodeAttribute.PREFIX_DISTRIBUTED,
+ att.getAttributePrefix());
+ Assert.assertEquals("dist_v3_0", att.getAttributeValue());
+
+ // Replace all attributes
+ toReplaceMap.put(HOSTNAMES[0],
+ createAttributesForTest(PREFIXES[1], 2, "B", "B_v1"));
+ attributesManager.replaceNodeAttributes(null, toReplaceMap);
+
+ nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]);
+ Assert.assertEquals(2, nodeAttributes.size());
+ clusterAttributes = attributesManager
+ .getClusterNodeAttributes(Sets.newHashSet(PREFIXES[1]));
+ Assert.assertEquals(2, clusterAttributes.size());
+ clusterAttributes = attributesManager
+ .getClusterNodeAttributes(Sets.newHashSet(
+ NodeAttribute.PREFIX_DISTRIBUTED));
+ Assert.assertEquals(0, clusterAttributes.size());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org