You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sj...@apache.org on 2015/09/26 18:05:13 UTC
[06/50] [abbrv] hadoop git commit: YARN-2920. Changed
CapacityScheduler to kill containers on nodes where node labels are changed.
Contributed by Wangda Tan (cherry picked from commit
fdf042dfffa4d2474e3cac86cfb8fe9ee4648beb)
YARN-2920. Changed CapacityScheduler to kill containers on nodes where node labels are changed. Contributed by Wangda Tan
(cherry picked from commit fdf042dfffa4d2474e3cac86cfb8fe9ee4648beb)
(cherry picked from commit 411836b74c6c02c0b5aebbbce29c209d93db1de2)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/88f022da
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/88f022da
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/88f022da
Branch: refs/heads/branch-2.6
Commit: 88f022da245c5d34997494a822124d07f8a5f72f
Parents: 2073fc0
Author: Jian He <ji...@apache.org>
Authored: Mon Dec 22 16:50:15 2014 -0800
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Sat Sep 5 20:54:18 2015 -0700
----------------------------------------------------------------------
.../hadoop/yarn/sls/nodemanager/NodeInfo.java | 3 +-
.../yarn/sls/scheduler/RMNodeWrapper.java | 3 +-
hadoop-yarn-project/CHANGES.txt | 3 +
.../server/resourcemanager/ResourceManager.java | 1 +
.../nodelabels/RMNodeLabelsManager.java | 23 +++
.../resourcemanager/rmnode/RMNodeImpl.java | 6 +-
.../scheduler/SchedulerNode.java | 24 ++-
.../scheduler/capacity/AbstractCSQueue.java | 5 +
.../scheduler/capacity/CSQueue.java | 8 +
.../scheduler/capacity/CapacityScheduler.java | 88 +++++++--
.../scheduler/capacity/LeafQueue.java | 26 +--
.../scheduler/capacity/ParentQueue.java | 24 ++-
.../common/fica/FiCaSchedulerNode.java | 11 +-
.../event/NodeLabelsUpdateSchedulerEvent.java | 37 ++++
.../scheduler/event/SchedulerEventType.java | 1 +
.../yarn/server/resourcemanager/MockNodes.java | 3 +-
.../TestCapacitySchedulerNodeLabelUpdate.java | 193 +++++++++++++++++++
17 files changed, 414 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
index fdddcf4..ee6eb7b 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode
.UpdatedContainerInfo;
@@ -162,7 +163,7 @@ public class NodeInfo {
@Override
public Set<String> getNodeLabels() {
- return null;
+ return RMNodeLabelsManager.EMPTY_STRING_SET;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
index 3b185ae..b64be1b 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode
.UpdatedContainerInfo;
@@ -150,6 +151,6 @@ public class RMNodeWrapper implements RMNode {
@Override
public Set<String> getNodeLabels() {
- return null;
+ return RMNodeLabelsManager.EMPTY_STRING_SET;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 8244d61..fa1e120 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -171,6 +171,9 @@ Release 2.6.1 - UNRELEASED
YARN-3733. Fix DominantRC#compare() does not work as expected if
cluster resource is empty. (Rohith Sharmaks via wangda)
+ YARN-2920. Changed CapacityScheduler to kill containers on nodes where
+ node labels are changed. (Wangda Tan via jianhe)
+
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index ea762c0..353851c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -430,6 +430,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
rmContext.setAMFinishingMonitor(amFinishingMonitor);
RMNodeLabelsManager nlm = createNodeLabelManager();
+ nlm.setRMContext(rmContext);
addService(nlm);
rmContext.setNodeLabelManager(nlm);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java
index ba1727c..d9828e2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java
@@ -35,7 +35,10 @@ import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.collect.ImmutableSet;
@@ -57,6 +60,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
new ConcurrentHashMap<String, Queue>();
protected AccessControlList adminAcl;
+ private RMContext rmContext = null;
+
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
@@ -331,6 +336,7 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
return map;
}
+ @SuppressWarnings("unchecked")
private void updateResourceMappings(Map<String, Host> before,
Map<String, Host> after) {
// Get NMs in before only
@@ -341,6 +347,10 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
for (Entry<String, Host> entry : after.entrySet()) {
allNMs.addAll(entry.getValue().nms.keySet());
}
+
+ // Map used to notify RM
+ Map<NodeId, Set<String>> newNodeToLabelsMap =
+ new HashMap<NodeId, Set<String>>();
// traverse all nms
for (NodeId nodeId : allNMs) {
@@ -379,6 +389,9 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
Node newNM;
if ((newNM = getNMInNodeSet(nodeId, after, true)) != null) {
Set<String> newLabels = getLabelsByNode(nodeId, after);
+
+ newNodeToLabelsMap.put(nodeId, ImmutableSet.copyOf(newLabels));
+
// no label in the past
if (newLabels.isEmpty()) {
// update labels
@@ -405,6 +418,12 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
}
}
}
+
+ // Notify RM
+ if (rmContext != null && rmContext.getDispatcher() != null) {
+ rmContext.getDispatcher().getEventHandler().handle(
+ new NodeLabelsUpdateSchedulerEvent(newNodeToLabelsMap));
+ }
}
public Resource getResourceByLabel(String label, Resource clusterResource) {
@@ -452,4 +471,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
}
return false;
}
+
+ public void setRMContext(RMContext rmContext) {
+ this.rmContext = rmContext;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index b92f399..b57b7cc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@@ -860,9 +861,10 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
@Override
public Set<String> getNodeLabels() {
- if (context.getNodeLabelManager() == null) {
+ RMNodeLabelsManager nlm = context.getNodeLabelManager();
+ if (nlm == null || nlm.getLabelsOnNode(nodeId) == null) {
return CommonNodeLabelsManager.EMPTY_STRING_SET;
}
- return context.getNodeLabelManager().getLabelsOnNode(nodeId);
+ return nlm.getLabelsOnNode(nodeId);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index f4d8731..d1922ee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -33,11 +34,14 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.util.resource.Resources;
+import com.google.common.collect.ImmutableSet;
+
/**
* Represents a YARN Cluster Node from the viewpoint of the scheduler.
@@ -61,8 +65,11 @@ public abstract class SchedulerNode {
private final RMNode rmNode;
private final String nodeName;
-
- public SchedulerNode(RMNode node, boolean usePortForNodeName) {
+
+ private volatile Set<String> labels = null;
+
+ public SchedulerNode(RMNode node, boolean usePortForNodeName,
+ Set<String> labels) {
this.rmNode = node;
this.availableResource = Resources.clone(node.getTotalCapability());
this.totalResourceCapability = Resources.clone(node.getTotalCapability());
@@ -71,6 +78,11 @@ public abstract class SchedulerNode {
} else {
nodeName = rmNode.getHostName();
}
+ this.labels = ImmutableSet.copyOf(labels);
+ }
+
+ public SchedulerNode(RMNode node, boolean usePortForNodeName) {
+ this(node, usePortForNodeName, CommonNodeLabelsManager.EMPTY_STRING_SET);
}
public RMNode getRMNode() {
@@ -275,4 +287,12 @@ public abstract class SchedulerNode {
}
allocateContainer(rmContainer);
}
+
+ public Set<String> getLabels() {
+ return labels;
+ }
+
+ public void updateLabels(Set<String> labels) {
+ this.labels = labels;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index fc0fbb4..1f6696d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -447,4 +447,9 @@ public abstract class AbstractCSQueue implements CSQueue {
public Map<QueueACL, AccessControlList> getACLs() {
return acls;
}
+
+ @Private
+ public Resource getUsedResourceByLabel(String nodeLabel) {
+ return usedResourcesByNodeLabels.get(nodeLabel);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index 6438d6c..07a7e0e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -144,6 +144,14 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
public Resource getUsedResources();
/**
+ * Get the currently utilized resources which allocated at nodes with label
+ * specified
+ *
+ * @return used resources by the queue and it's children
+ */
+ public Resource getUsedResourceByLabel(String nodeLabel);
+
+ /**
* Get the current run-state of the queue
* @return current run-state
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 042c83c..c49c498 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -25,15 +25,15 @@ import java.util.Collection;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.HashSet;
-import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -48,12 +48,15 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -79,12 +82,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping.MappingType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -93,6 +99,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptR
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -106,11 +113,6 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import org.apache.hadoop.yarn.api.records.ReservationId;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
-
@LimitedPrivate("yarn")
@Evolving
@SuppressWarnings("unchecked")
@@ -966,6 +968,51 @@ public class CapacityScheduler extends
updateNodeResource(nm, resourceOption);
root.updateClusterResource(clusterResource);
}
+
+ /**
+ * Process node labels update on a node.
+ *
+ * TODO: Currently capacity scheduler will kill containers on a node when
+ * labels on the node changed. It is a simply solution to ensure guaranteed
+ * capacity on labels of queues. When YARN-2498 completed, we can let
+ * preemption policy to decide if such containers need to be killed or just
+ * keep them running.
+ */
+ private synchronized void updateLabelsOnNode(NodeId nodeId,
+ Set<String> newLabels) {
+ FiCaSchedulerNode node = nodes.get(nodeId);
+ if (null == node) {
+ return;
+ }
+
+ // labels is same, we don't need do update
+ if (node.getLabels().size() == newLabels.size()
+ && node.getLabels().containsAll(newLabels)) {
+ return;
+ }
+
+ // Kill running containers since label is changed
+ for (RMContainer rmContainer : node.getRunningContainers()) {
+ ContainerId containerId = rmContainer.getContainerId();
+ completedContainer(rmContainer,
+ ContainerStatus.newInstance(containerId,
+ ContainerState.COMPLETE,
+ String.format(
+ "Container=%s killed since labels on the node=%s changed",
+ containerId.toString(), nodeId.toString()),
+ ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
+ RMContainerEventType.KILL);
+ }
+
+ // Unreserve container on this node
+ RMContainer reservedContainer = node.getReservedContainer();
+ if (null != reservedContainer) {
+ dropContainerReservation(reservedContainer);
+ }
+
+ // Update node labels after we've done this
+ node.updateLabels(newLabels);
+ }
private synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
if (rmContext.isWorkPreservingRecoveryEnabled()
@@ -1049,6 +1096,19 @@ public class CapacityScheduler extends
nodeResourceUpdatedEvent.getResourceOption());
}
break;
+ case NODE_LABELS_UPDATE:
+ {
+ NodeLabelsUpdateSchedulerEvent labelUpdateEvent =
+ (NodeLabelsUpdateSchedulerEvent) event;
+
+ for (Entry<NodeId, Set<String>> entry : labelUpdateEvent
+ .getUpdatedNodeToLabels().entrySet()) {
+ NodeId id = entry.getKey();
+ Set<String> labels = entry.getValue();
+ updateLabelsOnNode(id, labels);
+ }
+ }
+ break;
case NODE_UPDATE:
{
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
@@ -1117,14 +1177,8 @@ public class CapacityScheduler extends
}
private synchronized void addNode(RMNode nodeManager) {
- // update this node to node label manager
- if (labelManager != null) {
- labelManager.activateNode(nodeManager.getNodeID(),
- nodeManager.getTotalCapability());
- }
-
this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager,
- usePortForNodeName));
+ usePortForNodeName, nodeManager.getNodeLabels()));
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
root.updateClusterResource(clusterResource);
int numNodes = numNodeManagers.incrementAndGet();
@@ -1135,6 +1189,12 @@ public class CapacityScheduler extends
if (scheduleAsynchronously && numNodes == 1) {
asyncSchedulerThread.beginSchedule();
}
+
+ // update this node to node label manager
+ if (labelManager != null) {
+ labelManager.activateNode(nodeManager.getNodeID(),
+ nodeManager.getTotalCapability());
+ }
}
private synchronized void removeNode(RMNode nodeInfo) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 60b5a59..509b0f2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -809,7 +809,7 @@ public class LeafQueue extends AbstractCSQueue {
// if our queue cannot access this node, just return
if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels,
- labelManager.getLabelsOnNode(node.getNodeID()))) {
+ node.getLabels())) {
return NULL_ASSIGNMENT;
}
@@ -878,7 +878,7 @@ public class LeafQueue extends AbstractCSQueue {
// Check queue max-capacity limit
if (!canAssignToThisQueue(clusterResource, required,
- labelManager.getLabelsOnNode(node.getNodeID()), application, true)) {
+ node.getLabels(), application, true)) {
return NULL_ASSIGNMENT;
}
@@ -911,7 +911,7 @@ public class LeafQueue extends AbstractCSQueue {
// Book-keeping
// Note: Update headroom to account for current allocation too...
allocateResource(clusterResource, application, assigned,
- labelManager.getLabelsOnNode(node.getNodeID()));
+ node.getLabels());
// Don't reset scheduling opportunities for non-local assignments
// otherwise the app will be delayed for each non-local assignment.
@@ -1561,7 +1561,7 @@ public class LeafQueue extends AbstractCSQueue {
// check if the resource request can access the label
if (!SchedulerUtils.checkNodeLabelExpression(
- labelManager.getLabelsOnNode(node.getNodeID()),
+ node.getLabels(),
request.getNodeLabelExpression())) {
// this is a reserved container, but we cannot allocate it now according
// to label not match. This can be caused by node label changed
@@ -1752,8 +1752,7 @@ public class LeafQueue extends AbstractCSQueue {
// Book-keeping
if (removed) {
releaseResource(clusterResource, application,
- container.getResource(),
- labelManager.getLabelsOnNode(node.getNodeID()));
+ container.getResource(), node.getLabels());
LOG.info("completedContainer" +
" container=" + container +
" queue=" + this +
@@ -1950,9 +1949,10 @@ public class LeafQueue extends AbstractCSQueue {
}
// Careful! Locking order is important!
synchronized (this) {
+ FiCaSchedulerNode node =
+ scheduler.getNode(rmContainer.getContainer().getNodeId());
allocateResource(clusterResource, attempt, rmContainer.getContainer()
- .getResource(), labelManager.getLabelsOnNode(rmContainer
- .getContainer().getNodeId()));
+ .getResource(), node.getLabels());
}
getParent().recoverContainer(clusterResource, attempt, rmContainer);
}
@@ -1989,9 +1989,10 @@ public class LeafQueue extends AbstractCSQueue {
public void attachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null) {
+ FiCaSchedulerNode node =
+ scheduler.getNode(rmContainer.getContainer().getNodeId());
allocateResource(clusterResource, application, rmContainer.getContainer()
- .getResource(), labelManager.getLabelsOnNode(rmContainer
- .getContainer().getNodeId()));
+ .getResource(), node.getLabels());
LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
+ " resource=" + rmContainer.getContainer().getResource()
+ " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity()
@@ -2006,9 +2007,10 @@ public class LeafQueue extends AbstractCSQueue {
public void detachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null) {
+ FiCaSchedulerNode node =
+ scheduler.getNode(rmContainer.getContainer().getNodeId());
releaseResource(clusterResource, application, rmContainer.getContainer()
- .getResource(), labelManager.getLabelsOnNode(rmContainer.getContainer()
- .getNodeId()));
+ .getResource(), node.getLabels());
LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
+ " resource=" + rmContainer.getContainer().getResource()
+ " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 6ffaf4c..fd598f2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -73,6 +73,7 @@ public class ParentQueue extends AbstractCSQueue {
private final boolean rootQueue;
final Comparator<CSQueue> queueComparator;
volatile int numApplications;
+ private final CapacitySchedulerContext scheduler;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
@@ -80,7 +81,7 @@ public class ParentQueue extends AbstractCSQueue {
public ParentQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old);
-
+ this.scheduler = cs;
this.queueComparator = cs.getQueueComparator();
this.rootQueue = (parent == null);
@@ -420,10 +421,10 @@ public class ParentQueue extends AbstractCSQueue {
Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve) {
CSAssignment assignment =
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
+ Set<String> nodeLabels = node.getLabels();
// if our queue cannot access this node, just return
- if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels,
- labelManager.getLabelsOnNode(node.getNodeID()))) {
+ if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels, nodeLabels)) {
return assignment;
}
@@ -434,7 +435,6 @@ public class ParentQueue extends AbstractCSQueue {
}
boolean localNeedToUnreserve = false;
- Set<String> nodeLabels = labelManager.getLabelsOnNode(node.getNodeID());
// Are we over maximum-capacity for this queue?
if (!canAssignToThisQueue(clusterResource, nodeLabels)) {
@@ -641,7 +641,7 @@ public class ParentQueue extends AbstractCSQueue {
// Book keeping
synchronized (this) {
super.releaseResource(clusterResource, rmContainer.getContainer()
- .getResource(), labelManager.getLabelsOnNode(node.getNodeID()));
+ .getResource(), node.getLabels());
LOG.info("completedContainer" +
" queue=" + getQueueName() +
@@ -703,9 +703,10 @@ public class ParentQueue extends AbstractCSQueue {
}
// Careful! Locking order is important!
synchronized (this) {
+ FiCaSchedulerNode node =
+ scheduler.getNode(rmContainer.getContainer().getNodeId());
super.allocateResource(clusterResource, rmContainer.getContainer()
- .getResource(), labelManager.getLabelsOnNode(rmContainer
- .getContainer().getNodeId()));
+ .getResource(), node.getLabels());
}
if (parent != null) {
parent.recoverContainer(clusterResource, attempt, rmContainer);
@@ -730,9 +731,10 @@ public class ParentQueue extends AbstractCSQueue {
public void attachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null) {
+ FiCaSchedulerNode node =
+ scheduler.getNode(rmContainer.getContainer().getNodeId());
super.allocateResource(clusterResource, rmContainer.getContainer()
- .getResource(), labelManager.getLabelsOnNode(rmContainer
- .getContainer().getNodeId()));
+ .getResource(), node.getLabels());
LOG.info("movedContainer" + " queueMoveIn=" + getQueueName()
+ " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+ getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster="
@@ -748,9 +750,11 @@ public class ParentQueue extends AbstractCSQueue {
public void detachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null) {
+ FiCaSchedulerNode node =
+ scheduler.getNode(rmContainer.getContainer().getNodeId());
super.releaseResource(clusterResource,
rmContainer.getContainer().getResource(),
- labelManager.getLabelsOnNode(rmContainer.getContainer().getNodeId()));
+ node.getLabels());
LOG.info("movedContainer" + " queueMoveOut=" + getQueueName()
+ " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+ getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster="
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
index 5227aac..fe6db47 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
@@ -19,12 +19,14 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
+import java.util.Set;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
@@ -32,9 +34,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
public class FiCaSchedulerNode extends SchedulerNode {
private static final Log LOG = LogFactory.getLog(FiCaSchedulerNode.class);
+
+ public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName,
+ Set<String> nodeLabels) {
+ super(node, usePortForNodeName, nodeLabels);
+ }
public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName) {
- super(node, usePortForNodeName);
+ this(node, usePortForNodeName, CommonNodeLabelsManager.EMPTY_STRING_SET);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeLabelsUpdateSchedulerEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeLabelsUpdateSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeLabelsUpdateSchedulerEvent.java
new file mode 100644
index 0000000..7723e25
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeLabelsUpdateSchedulerEvent.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+public class NodeLabelsUpdateSchedulerEvent extends SchedulerEvent {
+ private Map<NodeId, Set<String>> nodeToLabels;
+
+ public NodeLabelsUpdateSchedulerEvent(Map<NodeId, Set<String>> nodeToLabels) {
+ super(SchedulerEventType.NODE_LABELS_UPDATE);
+ this.nodeToLabels = nodeToLabels;
+ }
+
+ public Map<NodeId, Set<String>> getUpdatedNodeToLabels() {
+ return nodeToLabels;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
index 062f831..13aecb3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
@@ -25,6 +25,7 @@ public enum SchedulerEventType {
NODE_REMOVED,
NODE_UPDATE,
NODE_RESOURCE_UPDATE,
+ NODE_LABELS_UPDATE,
// Source: RMApp
APP_ADDED,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index 228f200..278c151 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
@@ -206,7 +207,7 @@ public class MockNodes {
@Override
public Set<String> getNodeLabels() {
- return null;
+ return RMNodeLabelsManager.EMPTY_STRING_SET;
}
};
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
new file mode 100644
index 0000000..261fa01
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.ArrayList;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.MemoryRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+public class TestCapacitySchedulerNodeLabelUpdate {
+ private final int GB = 1024;
+
+ private YarnConfiguration conf;
+
+ RMNodeLabelsManager mgr;
+
+ @Before
+ public void setUp() throws Exception {
+ conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ mgr = new MemoryRMNodeLabelsManager();
+ mgr.init(conf);
+ }
+
+ private Configuration getConfigurationWithQueueLabels(Configuration config) {
+ CapacitySchedulerConfiguration conf =
+ new CapacitySchedulerConfiguration(config);
+
+ // Define top-level queues
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a"});
+ conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+ conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
+ conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "z", 100);
+
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+ conf.setCapacity(A, 100);
+ conf.setAccessibleNodeLabels(A, ImmutableSet.of("x", "y", "z"));
+ conf.setCapacityByLabel(A, "x", 100);
+ conf.setCapacityByLabel(A, "y", 100);
+ conf.setCapacityByLabel(A, "z", 100);
+
+ return conf;
+ }
+
+ private Set<String> toSet(String... elements) {
+ Set<String> set = Sets.newHashSet(elements);
+ return set;
+ }
+
+ private void checkUsedResource(MockRM rm, String queueName, int memory) {
+ checkUsedResource(rm, queueName, memory, RMNodeLabelsManager.NO_LABEL);
+ }
+
+ private void checkUsedResource(MockRM rm, String queueName, int memory,
+ String label) {
+ CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
+ CSQueue queue = scheduler.getQueue(queueName);
+ Assert.assertEquals(memory, queue.getUsedResourceByLabel(label).getMemory());
+ }
+
+ @Test (timeout = 30000)
+ public void testNodeUpdate() throws Exception {
+ // set node -> label
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y", "z"));
+
+ // set mapping:
+ // h1 -> x
+ // h2 -> y
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("y")));
+
+ // inject node label manager
+ MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm.getRMContext().setNodeLabelManager(mgr);
+ rm.start();
+ MockNM nm1 = rm.registerNode("h1:1234", 8000);
+ MockNM nm2 = rm.registerNode("h2:1234", 8000);
+ MockNM nm3 = rm.registerNode("h3:1234", 8000);
+
+ ContainerId containerId;
+
+ // launch an app to queue a1 (label = x), and check all container will
+ // be allocated in h1
+ RMApp app1 = rm.submitApp(GB, "app", "user", null, "a");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm3);
+
+ // request a container.
+ am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "x");
+ containerId =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+ Assert.assertTrue(rm.waitForState(nm1, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+
+ // check used resource:
+ // queue-a used x=1G, ""=1G
+ checkUsedResource(rm, "a", 1024, "x");
+ checkUsedResource(rm, "a", 1024);
+
+ // change h1's label to z, container should be killed
+ mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
+ toSet("z")));
+ Assert.assertTrue(rm.waitForState(nm1, containerId,
+ RMContainerState.KILLED, 10 * 1000));
+
+ // check used resource:
+ // queue-a used x=0G, ""=1G ("" not changed)
+ checkUsedResource(rm, "a", 0, "x");
+ checkUsedResource(rm, "a", 1024);
+
+ // request a container with label = y
+ am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "y");
+ containerId =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
+ Assert.assertTrue(rm.waitForState(nm2, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+
+ // check used resource:
+ // queue-a used y=1G, ""=1G
+ checkUsedResource(rm, "a", 1024, "y");
+ checkUsedResource(rm, "a", 1024);
+
+ // change h2's label to no label, container should be killed
+ mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h2", 0),
+ CommonNodeLabelsManager.EMPTY_STRING_SET));
+ Assert.assertTrue(rm.waitForState(nm1, containerId,
+ RMContainerState.KILLED, 10 * 1000));
+
+ // check used resource:
+ // queue-a used x=0G, y=0G, ""=1G ("" not changed)
+ checkUsedResource(rm, "a", 0, "x");
+ checkUsedResource(rm, "a", 0, "y");
+ checkUsedResource(rm, "a", 1024);
+
+ containerId =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+
+ // change h3's label to z, AM container should be killed
+ mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h3", 0),
+ toSet("z")));
+ Assert.assertTrue(rm.waitForState(nm1, containerId,
+ RMContainerState.KILLED, 10 * 1000));
+
+ // check used resource:
+ // queue-a used x=0G, y=0G, ""=1G ("" not changed)
+ checkUsedResource(rm, "a", 0, "x");
+ checkUsedResource(rm, "a", 0, "y");
+ checkUsedResource(rm, "a", 0);
+
+ rm.close();
+ }
+}