You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2016/03/18 19:25:11 UTC
[11/46] hadoop git commit: YARN-4719. Add a helper library to
maintain node state and allows common queries. (kasha)
YARN-4719. Add a helper library to maintain node state and allows common queries. (kasha)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/20d389ce
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/20d389ce
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/20d389ce
Branch: refs/heads/HDFS-7240
Commit: 20d389ce61eaacb5ddfb329015f50e96ad894f8d
Parents: 5644137
Author: Karthik Kambatla <ka...@cloudera.com>
Authored: Mon Mar 14 14:19:05 2016 -0700
Committer: Karthik Kambatla <ka...@cloudera.com>
Committed: Mon Mar 14 14:19:05 2016 -0700
----------------------------------------------------------------------
.../scheduler/AbstractYarnScheduler.java | 170 ++---------
.../scheduler/ClusterNodeTracker.java | 300 +++++++++++++++++++
.../resourcemanager/scheduler/NodeFilter.java | 33 ++
.../scheduler/capacity/CapacityScheduler.java | 86 +++---
.../scheduler/fair/FSAppAttempt.java | 13 +-
.../scheduler/fair/FairScheduler.java | 104 +++----
.../scheduler/fifo/FifoScheduler.java | 32 +-
.../scheduler/TestAbstractYarnScheduler.java | 14 +-
.../scheduler/capacity/TestReservations.java | 19 +-
.../scheduler/fair/TestFairScheduler.java | 7 +-
.../scheduler/fifo/TestFifoScheduler.java | 19 +-
11 files changed, 478 insertions(+), 319 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/20d389ce/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 7ca8671..7d12301 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -27,11 +27,7 @@ import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -92,22 +88,10 @@ public abstract class AbstractYarnScheduler
private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class);
- // Nodes in the cluster, indexed by NodeId
- protected Map<NodeId, N> nodes = new ConcurrentHashMap<NodeId, N>();
-
- // Whole capacity of the cluster
- protected Resource clusterResource = Resource.newInstance(0, 0);
+ protected final ClusterNodeTracker<N> nodeTracker =
+ new ClusterNodeTracker<>();
protected Resource minimumAllocation;
- protected Resource maximumAllocation;
- private Resource configuredMaximumAllocation;
- private int maxNodeMemory = -1;
- private int maxNodeVCores = -1;
- private final ReadLock maxAllocReadLock;
- private final WriteLock maxAllocWriteLock;
-
- private boolean useConfiguredMaximumAllocationOnly = true;
- private long configuredMaximumAllocationWaitTime;
protected RMContext rmContext;
@@ -132,9 +116,6 @@ public abstract class AbstractYarnScheduler
*/
public AbstractYarnScheduler(String name) {
super(name);
- ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- this.maxAllocReadLock = lock.readLock();
- this.maxAllocWriteLock = lock.writeLock();
}
@Override
@@ -142,14 +123,21 @@ public abstract class AbstractYarnScheduler
nmExpireInterval =
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
- configuredMaximumAllocationWaitTime =
+ long configuredMaximumAllocationWaitTime =
conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);
+ nodeTracker.setConfiguredMaxAllocationWaitTime(
+ configuredMaximumAllocationWaitTime);
maxClusterLevelAppPriority = getMaxPriorityFromConf(conf);
createReleaseCache();
super.serviceInit(conf);
}
+ @VisibleForTesting
+ public ClusterNodeTracker getNodeTracker() {
+ return nodeTracker;
+ }
+
public List<Container> getTransferredContainers(
ApplicationAttemptId currentAttempt) {
ApplicationId appId = currentAttempt.getApplicationId();
@@ -184,20 +172,21 @@ public abstract class AbstractYarnScheduler
* Add blacklisted NodeIds to the list that is passed.
*
* @param app application attempt.
- * @param blacklistNodeIdList the list to store blacklisted NodeIds.
*/
- public void addBlacklistedNodeIdsToList(SchedulerApplicationAttempt app,
- List<NodeId> blacklistNodeIdList) {
- for (Map.Entry<NodeId, N> nodeEntry : nodes.entrySet()) {
- if (SchedulerAppUtils.isBlacklisted(app, nodeEntry.getValue(), LOG)) {
- blacklistNodeIdList.add(nodeEntry.getKey());
+ public List<N> getBlacklistedNodes(final SchedulerApplicationAttempt app) {
+
+ NodeFilter nodeFilter = new NodeFilter() {
+ @Override
+ public boolean accept(SchedulerNode node) {
+ return SchedulerAppUtils.isBlacklisted(app, node, LOG);
}
- }
+ };
+ return nodeTracker.getNodes(nodeFilter);
}
@Override
public Resource getClusterResource() {
- return clusterResource;
+ return nodeTracker.getClusterCapacity();
}
@Override
@@ -207,22 +196,7 @@ public abstract class AbstractYarnScheduler
@Override
public Resource getMaximumResourceCapability() {
- Resource maxResource;
- maxAllocReadLock.lock();
- try {
- if (useConfiguredMaximumAllocationOnly) {
- if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
- > configuredMaximumAllocationWaitTime) {
- useConfiguredMaximumAllocationOnly = false;
- }
- maxResource = Resources.clone(configuredMaximumAllocation);
- } else {
- maxResource = Resources.clone(maximumAllocation);
- }
- } finally {
- maxAllocReadLock.unlock();
- }
- return maxResource;
+ return nodeTracker.getMaxAllowedAllocation();
}
@Override
@@ -231,15 +205,7 @@ public abstract class AbstractYarnScheduler
}
protected void initMaximumResourceCapability(Resource maximumAllocation) {
- maxAllocWriteLock.lock();
- try {
- if (this.configuredMaximumAllocation == null) {
- this.configuredMaximumAllocation = Resources.clone(maximumAllocation);
- this.maximumAllocation = Resources.clone(maximumAllocation);
- }
- } finally {
- maxAllocWriteLock.unlock();
- }
+ nodeTracker.setConfiguredMaxAllocation(maximumAllocation);
}
protected synchronized void containerLaunchedOnNode(
@@ -332,8 +298,7 @@ public abstract class AbstractYarnScheduler
@Override
public SchedulerNodeReport getNodeReport(NodeId nodeId) {
- N node = nodes.get(nodeId);
- return node == null ? null : new SchedulerNodeReport(node);
+ return nodeTracker.getNodeReport(nodeId);
}
@Override
@@ -431,12 +396,13 @@ public abstract class AbstractYarnScheduler
container));
// recover scheduler node
- SchedulerNode schedulerNode = nodes.get(nm.getNodeID());
+ SchedulerNode schedulerNode = nodeTracker.getNode(nm.getNodeID());
schedulerNode.recoverContainer(rmContainer);
// recover queue: update headroom etc.
Queue queue = schedulerAttempt.getQueue();
- queue.recoverContainer(clusterResource, schedulerAttempt, rmContainer);
+ queue.recoverContainer(
+ getClusterResource(), schedulerAttempt, rmContainer);
// recover scheduler attempt
schedulerAttempt.recoverContainer(schedulerNode, rmContainer);
@@ -621,7 +587,7 @@ public abstract class AbstractYarnScheduler
@Override
public SchedulerNode getSchedulerNode(NodeId nodeId) {
- return nodes.get(nodeId);
+ return nodeTracker.getNode(nodeId);
}
@Override
@@ -690,18 +656,12 @@ public abstract class AbstractYarnScheduler
+ " from: " + oldResource + ", to: "
+ newResource);
- nodes.remove(nm.getNodeID());
- updateMaximumAllocation(node, false);
+ nodeTracker.removeNode(nm.getNodeID());
// update resource to node
node.setTotalResource(newResource);
- nodes.put(nm.getNodeID(), (N)node);
- updateMaximumAllocation(node, true);
-
- // update resource to clusterResource
- Resources.subtractFrom(clusterResource, oldResource);
- Resources.addTo(clusterResource, newResource);
+ nodeTracker.addNode((N) node);
} else {
// Log resource change
LOG.warn("Update resource on node: " + node.getNodeName()
@@ -721,80 +681,8 @@ public abstract class AbstractYarnScheduler
+ " does not support reservations");
}
- protected void updateMaximumAllocation(SchedulerNode node, boolean add) {
- Resource totalResource = node.getTotalResource();
- maxAllocWriteLock.lock();
- try {
- if (add) { // added node
- int nodeMemory = totalResource.getMemory();
- if (nodeMemory > maxNodeMemory) {
- maxNodeMemory = nodeMemory;
- maximumAllocation.setMemory(Math.min(
- configuredMaximumAllocation.getMemory(), maxNodeMemory));
- }
- int nodeVCores = totalResource.getVirtualCores();
- if (nodeVCores > maxNodeVCores) {
- maxNodeVCores = nodeVCores;
- maximumAllocation.setVirtualCores(Math.min(
- configuredMaximumAllocation.getVirtualCores(), maxNodeVCores));
- }
- } else { // removed node
- if (maxNodeMemory == totalResource.getMemory()) {
- maxNodeMemory = -1;
- }
- if (maxNodeVCores == totalResource.getVirtualCores()) {
- maxNodeVCores = -1;
- }
- // We only have to iterate through the nodes if the current max memory
- // or vcores was equal to the removed node's
- if (maxNodeMemory == -1 || maxNodeVCores == -1) {
- for (Map.Entry<NodeId, N> nodeEntry : nodes.entrySet()) {
- int nodeMemory =
- nodeEntry.getValue().getTotalResource().getMemory();
- if (nodeMemory > maxNodeMemory) {
- maxNodeMemory = nodeMemory;
- }
- int nodeVCores =
- nodeEntry.getValue().getTotalResource().getVirtualCores();
- if (nodeVCores > maxNodeVCores) {
- maxNodeVCores = nodeVCores;
- }
- }
- if (maxNodeMemory == -1) { // no nodes
- maximumAllocation.setMemory(configuredMaximumAllocation.getMemory());
- } else {
- maximumAllocation.setMemory(
- Math.min(configuredMaximumAllocation.getMemory(), maxNodeMemory));
- }
- if (maxNodeVCores == -1) { // no nodes
- maximumAllocation.setVirtualCores(configuredMaximumAllocation.getVirtualCores());
- } else {
- maximumAllocation.setVirtualCores(
- Math.min(configuredMaximumAllocation.getVirtualCores(), maxNodeVCores));
- }
- }
- }
- } finally {
- maxAllocWriteLock.unlock();
- }
- }
-
protected void refreshMaximumAllocation(Resource newMaxAlloc) {
- maxAllocWriteLock.lock();
- try {
- configuredMaximumAllocation = Resources.clone(newMaxAlloc);
- int maxMemory = newMaxAlloc.getMemory();
- if (maxNodeMemory != -1) {
- maxMemory = Math.min(maxMemory, maxNodeMemory);
- }
- int maxVcores = newMaxAlloc.getVirtualCores();
- if (maxNodeVCores != -1) {
- maxVcores = Math.min(maxVcores, maxNodeVCores);
- }
- maximumAllocation = Resources.createResource(maxMemory, maxVcores);
- } finally {
- maxAllocWriteLock.unlock();
- }
+ nodeTracker.setConfiguredMaxAllocation(newMaxAlloc);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/20d389ce/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
new file mode 100644
index 0000000..34b4267
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Helper library that:
+ * - tracks the state of all cluster {@link SchedulerNode}s
+ * - provides convenience methods to filter and sort nodes
+ */
+@InterfaceAudience.Private
+public class ClusterNodeTracker<N extends SchedulerNode> {
+ private static final Log LOG = LogFactory.getLog(ClusterNodeTracker.class);
+
+ private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
+ private Lock readLock = readWriteLock.readLock();
+ private Lock writeLock = readWriteLock.writeLock();
+
+ private HashMap<NodeId, N> nodes = new HashMap<>();
+ private Map<String, Integer> nodesPerRack = new HashMap<>();
+
+ private Resource clusterCapacity = Resources.clone(Resources.none());
+ private Resource staleClusterCapacity = null;
+
+ // Max allocation
+ private int maxNodeMemory = -1;
+ private int maxNodeVCores = -1;
+ private Resource configuredMaxAllocation;
+ private boolean forceConfiguredMaxAllocation = true;
+ private long configuredMaxAllocationWaitTime;
+
+ public void addNode(N node) {
+ writeLock.lock();
+ try {
+ nodes.put(node.getNodeID(), node);
+
+ // Update nodes per rack as well
+ String rackName = node.getRackName();
+ Integer numNodes = nodesPerRack.get(rackName);
+ if (numNodes == null) {
+ numNodes = 0;
+ }
+ nodesPerRack.put(rackName, ++numNodes);
+
+ // Update cluster capacity
+ Resources.addTo(clusterCapacity, node.getTotalResource());
+
+ // Update maximumAllocation
+ updateMaxResources(node, true);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public boolean exists(NodeId nodeId) {
+ readLock.lock();
+ try {
+ return nodes.containsKey(nodeId);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public N getNode(NodeId nodeId) {
+ readLock.lock();
+ try {
+ return nodes.get(nodeId);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public SchedulerNodeReport getNodeReport(NodeId nodeId) {
+ readLock.lock();
+ try {
+ N n = nodes.get(nodeId);
+ return n == null ? null : new SchedulerNodeReport(n);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public int nodeCount() {
+ readLock.lock();
+ try {
+ return nodes.size();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public int nodeCount(String rackName) {
+ readLock.lock();
+ String rName = rackName == null ? "NULL" : rackName;
+ try {
+ Integer nodeCount = nodesPerRack.get(rName);
+ return nodeCount == null ? 0 : nodeCount;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public Resource getClusterCapacity() {
+ readLock.lock();
+ try {
+ if (staleClusterCapacity == null ||
+ !Resources.equals(staleClusterCapacity, clusterCapacity)) {
+ staleClusterCapacity = Resources.clone(clusterCapacity);
+ }
+ return staleClusterCapacity;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public N removeNode(NodeId nodeId) {
+ writeLock.lock();
+ try {
+ N node = nodes.remove(nodeId);
+ if (node == null) {
+ LOG.warn("Attempting to remove a non-existent node " + nodeId);
+ return null;
+ }
+
+ // Update nodes per rack as well
+ String rackName = node.getRackName();
+ Integer numNodes = nodesPerRack.get(rackName);
+ if (numNodes > 0) {
+ nodesPerRack.put(rackName, --numNodes);
+ } else {
+ LOG.error("Attempting to remove node from an empty rack " + rackName);
+ }
+
+ // Update cluster capacity
+ Resources.subtractFrom(clusterCapacity, node.getTotalResource());
+
+ // Update maximumAllocation
+ updateMaxResources(node, false);
+
+ return node;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public void setConfiguredMaxAllocation(Resource resource) {
+ writeLock.lock();
+ try {
+ configuredMaxAllocation = Resources.clone(resource);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public void setConfiguredMaxAllocationWaitTime(
+ long configuredMaxAllocationWaitTime) {
+ writeLock.lock();
+ try {
+ this.configuredMaxAllocationWaitTime =
+ configuredMaxAllocationWaitTime;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public Resource getMaxAllowedAllocation() {
+ readLock.lock();
+ try {
+ if (forceConfiguredMaxAllocation &&
+ System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
+ > configuredMaxAllocationWaitTime) {
+ forceConfiguredMaxAllocation = false;
+ }
+
+ if (forceConfiguredMaxAllocation
+ || maxNodeMemory == -1 || maxNodeVCores == -1) {
+ return configuredMaxAllocation;
+ }
+
+ return Resources.createResource(
+ Math.min(configuredMaxAllocation.getMemory(), maxNodeMemory),
+ Math.min(configuredMaxAllocation.getVirtualCores(), maxNodeVCores)
+ );
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ private void updateMaxResources(SchedulerNode node, boolean add) {
+ Resource totalResource = node.getTotalResource();
+ writeLock.lock();
+ try {
+ if (add) { // added node
+ int nodeMemory = totalResource.getMemory();
+ if (nodeMemory > maxNodeMemory) {
+ maxNodeMemory = nodeMemory;
+ }
+ int nodeVCores = totalResource.getVirtualCores();
+ if (nodeVCores > maxNodeVCores) {
+ maxNodeVCores = nodeVCores;
+ }
+ } else { // removed node
+ if (maxNodeMemory == totalResource.getMemory()) {
+ maxNodeMemory = -1;
+ }
+ if (maxNodeVCores == totalResource.getVirtualCores()) {
+ maxNodeVCores = -1;
+ }
+ // We only have to iterate through the nodes if the current max memory
+ // or vcores was equal to the removed node's
+ if (maxNodeMemory == -1 || maxNodeVCores == -1) {
+ // Treat it like an empty cluster and add nodes
+ for (N n : nodes.values()) {
+ updateMaxResources(n, true);
+ }
+ }
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public List<N> getAllNodes() {
+ return getNodes(null);
+ }
+
+ /**
+ * Convenience method to filter nodes based on a condition.
+ */
+ public List<N> getNodes(NodeFilter nodeFilter) {
+ List<N> nodeList = new ArrayList<>();
+ readLock.lock();
+ try {
+ if (nodeFilter == null) {
+ nodeList.addAll(nodes.values());
+ } else {
+ for (N node : nodes.values()) {
+ if (nodeFilter.accept(node)) {
+ nodeList.add(node);
+ }
+ }
+ }
+ } finally {
+ readLock.unlock();
+ }
+ return nodeList;
+ }
+
+ /**
+ * Convenience method to sort nodes.
+ *
+ * Note that the sort is performed without holding a lock. We are sorting
+ * here instead of on the caller to allow for future optimizations (e.g.
+ * sort once every x milliseconds).
+ */
+ public List<N> sortedNodeList(Comparator<N> comparator) {
+ List<N> sortedList = null;
+ readLock.lock();
+ try {
+ sortedList = new ArrayList(nodes.values());
+ } finally {
+ readLock.unlock();
+ }
+ Collections.sort(sortedList, comparator);
+ return sortedList;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/20d389ce/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeFilter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeFilter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeFilter.java
new file mode 100644
index 0000000..7b3e7a2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeFilter.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Convenience way to filter nodes based on a criteria. To be used in
+ * conjunction with {@link ClusterNodeTracker}
+ */
+@InterfaceAudience.Private
+public interface NodeFilter {
+
+ /**
+ * Criteria to accept node in the filtered list.
+ */
+ boolean accept(SchedulerNode node);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/20d389ce/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 6a1091d..735306a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -34,7 +34,6 @@ import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@@ -219,8 +218,6 @@ public class CapacityScheduler extends
private Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>();
- private AtomicInteger numNodeManagers = new AtomicInteger(0);
-
private ResourceCalculator calculator;
private boolean usePortForNodeName;
@@ -280,7 +277,7 @@ public class CapacityScheduler extends
@Override
public int getNumClusterNodes() {
- return numNodeManagers.get();
+ return nodeTracker.nodeCount();
}
@Override
@@ -387,7 +384,7 @@ public class CapacityScheduler extends
static void schedule(CapacityScheduler cs) {
// First randomize the start point
int current = 0;
- Collection<FiCaSchedulerNode> nodes = cs.getAllNodes().values();
+ Collection<FiCaSchedulerNode> nodes = cs.nodeTracker.getAllNodes();
int start = random.nextInt(nodes.size());
for (FiCaSchedulerNode node : nodes) {
if (current++ >= start) {
@@ -524,10 +521,11 @@ public class CapacityScheduler extends
addNewQueues(queues, newQueues);
// Re-configure queues
- root.reinitialize(newRoot, clusterResource);
+ root.reinitialize(newRoot, getClusterResource());
updatePlacementRules();
// Re-calculate headroom for active applications
+ Resource clusterResource = getClusterResource();
root.updateClusterResource(clusterResource, new ResourceLimits(
clusterResource));
@@ -995,7 +993,7 @@ public class CapacityScheduler extends
allocation = application.getAllocation(getResourceCalculator(),
- clusterResource, getMinimumResourceCapability());
+ getClusterResource(), getMinimumResourceCapability());
}
if (updateDemandForQueue != null && !application
@@ -1036,7 +1034,8 @@ public class CapacityScheduler extends
private synchronized void nodeUpdate(RMNode nm) {
if (LOG.isDebugEnabled()) {
- LOG.debug("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
+ LOG.debug("nodeUpdate: " + nm +
+ " clusterResources: " + getClusterResource());
}
Resource releaseResources = Resource.newInstance(0, 0);
@@ -1119,6 +1118,7 @@ public class CapacityScheduler extends
private synchronized void updateNodeAndQueueResource(RMNode nm,
ResourceOption resourceOption) {
updateNodeResource(nm, resourceOption);
+ Resource clusterResource = getClusterResource();
root.updateClusterResource(clusterResource, new ResourceLimits(
clusterResource));
}
@@ -1128,7 +1128,7 @@ public class CapacityScheduler extends
*/
private synchronized void updateLabelsOnNode(NodeId nodeId,
Set<String> newLabels) {
- FiCaSchedulerNode node = nodes.get(nodeId);
+ FiCaSchedulerNode node = nodeTracker.getNode(nodeId);
if (null == node) {
return;
}
@@ -1230,12 +1230,12 @@ public class CapacityScheduler extends
LeafQueue queue = ((LeafQueue) reservedApplication.getQueue());
assignment =
queue.assignContainers(
- clusterResource,
+ getClusterResource(),
node,
// TODO, now we only consider limits for parent for non-labeled
// resources, should consider labeled resources as well.
new ResourceLimits(labelManager.getResourceByLabel(
- RMNodeLabelsManager.NO_LABEL, clusterResource)),
+ RMNodeLabelsManager.NO_LABEL, getClusterResource())),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
if (assignment.isFulfilledReservation()) {
CSAssignment tmp =
@@ -1261,14 +1261,14 @@ public class CapacityScheduler extends
}
assignment = root.assignContainers(
- clusterResource,
+ getClusterResource(),
node,
// TODO, now we only consider limits for parent for non-labeled
// resources, should consider labeled resources as well.
new ResourceLimits(labelManager.getResourceByLabel(
- RMNodeLabelsManager.NO_LABEL, clusterResource)),
+ RMNodeLabelsManager.NO_LABEL, getClusterResource())),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
- if (Resources.greaterThan(calculator, clusterResource,
+ if (Resources.greaterThan(calculator, getClusterResource(),
assignment.getResource(), Resources.none())) {
updateSchedulerHealth(lastNodeUpdateTime, node, assignment);
return;
@@ -1294,12 +1294,12 @@ public class CapacityScheduler extends
// Try to use NON_EXCLUSIVE
assignment = root.assignContainers(
- clusterResource,
+ getClusterResource(),
node,
// TODO, now we only consider limits for parent for non-labeled
// resources, should consider labeled resources as well.
new ResourceLimits(labelManager.getResourceByLabel(
- RMNodeLabelsManager.NO_LABEL, clusterResource)),
+ RMNodeLabelsManager.NO_LABEL, getClusterResource())),
SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY);
updateSchedulerHealth(lastNodeUpdateTime, node, assignment);
}
@@ -1451,24 +1451,22 @@ public class CapacityScheduler extends
private synchronized void addNode(RMNode nodeManager) {
FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
usePortForNodeName, nodeManager.getNodeLabels());
- this.nodes.put(nodeManager.getNodeID(), schedulerNode);
- Resources.addTo(clusterResource, schedulerNode.getTotalResource());
+ nodeTracker.addNode(schedulerNode);
// update this node to node label manager
if (labelManager != null) {
labelManager.activateNode(nodeManager.getNodeID(),
schedulerNode.getTotalResource());
}
-
+
+ Resource clusterResource = getClusterResource();
root.updateClusterResource(clusterResource, new ResourceLimits(
clusterResource));
- int numNodes = numNodeManagers.incrementAndGet();
- updateMaximumAllocation(schedulerNode, true);
-
+
LOG.info("Added node " + nodeManager.getNodeAddress() +
" clusterResource: " + clusterResource);
- if (scheduleAsynchronously && numNodes == 1) {
+ if (scheduleAsynchronously && getNumClusterNodes() == 1) {
asyncSchedulerThread.beginSchedule();
}
}
@@ -1478,20 +1476,14 @@ public class CapacityScheduler extends
if (labelManager != null) {
labelManager.deactivateNode(nodeInfo.getNodeID());
}
-
- FiCaSchedulerNode node = nodes.get(nodeInfo.getNodeID());
+
+ NodeId nodeId = nodeInfo.getNodeID();
+ FiCaSchedulerNode node = nodeTracker.getNode(nodeId);
if (node == null) {
+ LOG.error("Attempting to remove non-existent node " + nodeId);
return;
}
- Resources.subtractFrom(clusterResource, node.getTotalResource());
- root.updateClusterResource(clusterResource, new ResourceLimits(
- clusterResource));
- int numNodes = numNodeManagers.decrementAndGet();
- if (scheduleAsynchronously && numNodes == 0) {
- asyncSchedulerThread.suspendSchedule();
- }
-
// Remove running containers
List<RMContainer> runningContainers = node.getRunningContainers();
for (RMContainer container : runningContainers) {
@@ -1512,11 +1504,18 @@ public class CapacityScheduler extends
RMContainerEventType.KILL);
}
- this.nodes.remove(nodeInfo.getNodeID());
- updateMaximumAllocation(node, false);
+ nodeTracker.removeNode(nodeId);
+ Resource clusterResource = getClusterResource();
+ root.updateClusterResource(clusterResource, new ResourceLimits(
+ clusterResource));
+ int numNodes = nodeTracker.nodeCount();
+
+ if (scheduleAsynchronously && numNodes == 0) {
+ asyncSchedulerThread.suspendSchedule();
+ }
LOG.info("Removed node " + nodeInfo.getNodeAddress() +
- " clusterResource: " + clusterResource);
+ " clusterResource: " + getClusterResource());
}
private void rollbackContainerResource(
@@ -1568,7 +1567,7 @@ public class CapacityScheduler extends
// Inform the queue
LeafQueue queue = (LeafQueue)application.getQueue();
- queue.completedContainer(clusterResource, application, node,
+ queue.completedContainer(getClusterResource(), application, node,
rmContainer, containerStatus, event, null, true);
if (containerStatus.getExitStatus() == ContainerExitStatus.PREEMPTED) {
@@ -1594,7 +1593,7 @@ public class CapacityScheduler extends
FiCaSchedulerApp app = (FiCaSchedulerApp)attempt;
LeafQueue queue = (LeafQueue) attempt.getQueue();
try {
- queue.decreaseContainer(clusterResource, decreaseRequest, app);
+ queue.decreaseContainer(getClusterResource(), decreaseRequest, app);
// Notify RMNode that the container can be pulled by NodeManager in the
// next heartbeat
this.rmContext.getDispatcher().getEventHandler()
@@ -1617,14 +1616,9 @@ public class CapacityScheduler extends
@Lock(Lock.NoLock.class)
public FiCaSchedulerNode getNode(NodeId nodeId) {
- return nodes.get(nodeId);
+ return nodeTracker.getNode(nodeId);
}
- @Lock(Lock.NoLock.class)
- Map<NodeId, FiCaSchedulerNode> getAllNodes() {
- return nodes;
- }
-
@Override
@Lock(Lock.NoLock.class)
public void recover(RMState state) throws Exception {
@@ -1869,9 +1863,9 @@ public class CapacityScheduler extends
}
// Move all live containers
for (RMContainer rmContainer : app.getLiveContainers()) {
- source.detachContainer(clusterResource, app, rmContainer);
+ source.detachContainer(getClusterResource(), app, rmContainer);
// attach the Container to another queue
- dest.attachContainer(clusterResource, app, rmContainer);
+ dest.attachContainer(getClusterResource(), app, rmContainer);
}
// Detach the application..
source.finishApplicationAttempt(app, sourceQueueName);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/20d389ce/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index f1cefad..e426da6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -86,7 +86,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
// Key = RackName, Value = Set of Nodes reserved by app on rack
private Map<String, Set<String>> reservations = new HashMap<>();
- private List<NodeId> blacklistNodeIds = new ArrayList<NodeId>();
+ private List<FSSchedulerNode> blacklistNodeIds = new ArrayList<>();
/**
* Delay scheduling: We often want to prioritize scheduling of node-local
* containers over rack-local or off-switch containers. To achieve this
@@ -185,14 +185,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
Resource availableResources) {
if (appSchedulingInfo.getAndResetBlacklistChanged()) {
blacklistNodeIds.clear();
- scheduler.addBlacklistedNodeIdsToList(this, blacklistNodeIds);
+ blacklistNodeIds.addAll(scheduler.getBlacklistedNodes(this));
}
- for (NodeId nodeId: blacklistNodeIds) {
- SchedulerNode node = scheduler.getSchedulerNode(nodeId);
- if (node != null) {
- Resources.subtractFrom(availableResources,
- node.getUnallocatedResource());
- }
+ for (FSSchedulerNode node: blacklistNodeIds) {
+ Resources.subtractFrom(availableResources,
+ node.getUnallocatedResource());
}
if (availableResources.getMemory() < 0) {
availableResources.setMemory(0);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/20d389ce/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 917fc8a..ba90e21 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -20,13 +20,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -186,14 +184,11 @@ public class FairScheduler extends
private float reservableNodesRatio; // percentage of available nodes
// an app can be reserved on
- // Count of number of nodes per rack
- private Map<String, Integer> nodesPerRack = new ConcurrentHashMap<>();
-
protected boolean sizeBasedWeight; // Give larger weights to larger jobs
protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not
protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling
- private Comparator<NodeId> nodeAvailableResourceComparator =
+ private Comparator<FSSchedulerNode> nodeAvailableResourceComparator =
new NodeAvailableResourceComparator(); // Node available resource comparator
protected double nodeLocalityThreshold; // Cluster threshold for node locality
protected double rackLocalityThreshold; // Cluster threshold for rack locality
@@ -225,8 +220,8 @@ public class FairScheduler extends
public boolean isAtLeastReservationThreshold(
ResourceCalculator resourceCalculator, Resource resource) {
- return Resources.greaterThanOrEqual(
- resourceCalculator, clusterResource, resource, reservationThreshold);
+ return Resources.greaterThanOrEqual(resourceCalculator,
+ getClusterResource(), resource, reservationThreshold);
}
private void validateConf(Configuration conf) {
@@ -272,11 +267,7 @@ public class FairScheduler extends
}
public int getNumNodesInRack(String rackName) {
- String rName = rackName == null ? "NULL" : rackName;
- if (nodesPerRack.containsKey(rName)) {
- return nodesPerRack.get(rName);
- }
- return 0;
+ return nodeTracker.nodeCount(rackName);
}
public QueueManager getQueueManager() {
@@ -352,6 +343,7 @@ public class FairScheduler extends
// Recursively update demands for all queues
rootQueue.updateDemand();
+ Resource clusterResource = getClusterResource();
rootQueue.setFairShare(clusterResource);
// Recursively compute fair shares for all queues
// and update metrics
@@ -526,6 +518,7 @@ public class FairScheduler extends
Resource resDueToMinShare = Resources.none();
Resource resDueToFairShare = Resources.none();
ResourceCalculator calc = sched.getPolicy().getResourceCalculator();
+ Resource clusterResource = getClusterResource();
if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
Resource target = Resources.componentwiseMin(
sched.getMinShare(), sched.getDemand());
@@ -577,7 +570,7 @@ public class FairScheduler extends
}
private FSSchedulerNode getFSSchedulerNode(NodeId nodeId) {
- return nodes.get(nodeId);
+ return nodeTracker.getNode(nodeId);
}
public double getNodeLocalityThreshold() {
@@ -882,18 +875,11 @@ public class FairScheduler extends
private synchronized void addNode(List<NMContainerStatus> containerReports,
RMNode node) {
FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName);
- nodes.put(node.getNodeID(), schedulerNode);
- String rackName = node.getRackName() == null ? "NULL" : node.getRackName();
- if (nodesPerRack.containsKey(rackName)) {
- nodesPerRack.put(rackName, nodesPerRack.get(rackName) + 1);
- } else {
- nodesPerRack.put(rackName, 1);
- }
- Resources.addTo(clusterResource, schedulerNode.getTotalResource());
- updateMaximumAllocation(schedulerNode, true);
+ nodeTracker.addNode(schedulerNode);
triggerUpdate();
+ Resource clusterResource = getClusterResource();
queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
queueMgr.getRootQueue().recomputeSteadyShares();
LOG.info("Added node " + node.getNodeAddress() +
@@ -904,15 +890,12 @@ public class FairScheduler extends
}
private synchronized void removeNode(RMNode rmNode) {
- FSSchedulerNode node = getFSSchedulerNode(rmNode.getNodeID());
- // This can occur when an UNHEALTHY node reconnects
+ NodeId nodeId = rmNode.getNodeID();
+ FSSchedulerNode node = nodeTracker.getNode(nodeId);
if (node == null) {
+ LOG.error("Attempting to remove non-existent node " + nodeId);
return;
}
- Resources.subtractFrom(clusterResource, node.getTotalResource());
- updateRootQueueMetrics();
-
- triggerUpdate();
// Remove running containers
List<RMContainer> runningContainers = node.getRunningContainers();
@@ -934,18 +917,13 @@ public class FairScheduler extends
RMContainerEventType.KILL);
}
- nodes.remove(rmNode.getNodeID());
- String rackName = node.getRackName() == null ? "NULL" : node.getRackName();
- if (nodesPerRack.containsKey(rackName)
- && (nodesPerRack.get(rackName) > 0)) {
- nodesPerRack.put(rackName, nodesPerRack.get(rackName) - 1);
- } else {
- LOG.error("Node [" + rmNode.getNodeAddress() + "] being removed from" +
- " unknown rack [" + rackName + "] !!");
- }
+ nodeTracker.removeNode(nodeId);
+ Resource clusterResource = getClusterResource();
queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
queueMgr.getRootQueue().recomputeSteadyShares();
- updateMaximumAllocation(node, false);
+ updateRootQueueMetrics();
+ triggerUpdate();
+
LOG.info("Removed node " + rmNode.getNodeAddress() +
" cluster capacity: " + clusterResource);
}
@@ -967,7 +945,7 @@ public class FairScheduler extends
// Sanity check
SchedulerUtils.normalizeRequests(ask, DOMINANT_RESOURCE_CALCULATOR,
- clusterResource, minimumAllocation, getMaximumResourceCapability(),
+ getClusterResource(), minimumAllocation, getMaximumResourceCapability(),
incrAllocation);
// Record container allocation start time
@@ -1034,7 +1012,8 @@ public class FairScheduler extends
private synchronized void nodeUpdate(RMNode nm) {
long start = getClock().getTime();
if (LOG.isDebugEnabled()) {
- LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterResource);
+ LOG.debug("nodeUpdate: " + nm +
+ " cluster capacity: " + getClusterResource());
}
eventLog.log("HEARTBEAT", nm.getHostName());
FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID());
@@ -1091,20 +1070,13 @@ public class FairScheduler extends
void continuousSchedulingAttempt() throws InterruptedException {
long start = getClock().getTime();
- List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
- // Sort the nodes by space available on them, so that we offer
- // containers on emptier nodes first, facilitating an even spread. This
- // requires holding the scheduler lock, so that the space available on a
- // node doesn't change during the sort.
- synchronized (this) {
- Collections.sort(nodeIdList, nodeAvailableResourceComparator);
- }
+ List<FSSchedulerNode> nodeIdList =
+ nodeTracker.sortedNodeList(nodeAvailableResourceComparator);
// iterate all nodes
- for (NodeId nodeId : nodeIdList) {
- FSSchedulerNode node = getFSSchedulerNode(nodeId);
+ for (FSSchedulerNode node : nodeIdList) {
try {
- if (node != null && Resources.fitsIn(minimumAllocation,
+ if (Resources.fitsIn(minimumAllocation,
node.getUnallocatedResource())) {
attemptScheduling(node);
}
@@ -1126,19 +1098,14 @@ public class FairScheduler extends
}
/** Sort nodes by available resource */
- private class NodeAvailableResourceComparator implements Comparator<NodeId> {
+ private class NodeAvailableResourceComparator
+ implements Comparator<FSSchedulerNode> {
@Override
- public int compare(NodeId n1, NodeId n2) {
- if (!nodes.containsKey(n1)) {
- return 1;
- }
- if (!nodes.containsKey(n2)) {
- return -1;
- }
- return RESOURCE_CALCULATOR.compare(clusterResource,
- nodes.get(n2).getUnallocatedResource(),
- nodes.get(n1).getUnallocatedResource());
+ public int compare(FSSchedulerNode n1, FSSchedulerNode n2) {
+ return RESOURCE_CALCULATOR.compare(getClusterResource(),
+ n2.getUnallocatedResource(),
+ n1.getUnallocatedResource());
}
}
@@ -1150,7 +1117,7 @@ public class FairScheduler extends
}
final NodeId nodeID = node.getNodeID();
- if (!nodes.containsKey(nodeID)) {
+ if (!nodeTracker.exists(nodeID)) {
// The node might have just been removed while this thread was waiting
// on the synchronized lock before it entered this synchronized method
LOG.info("Skipping scheduling as the node " + nodeID +
@@ -1203,7 +1170,7 @@ public class FairScheduler extends
private void updateRootQueueMetrics() {
rootMetrics.setAvailableResourcesToQueue(
Resources.subtract(
- clusterResource, rootMetrics.getAllocatedResources()));
+ getClusterResource(), rootMetrics.getAllocatedResources()));
}
/**
@@ -1214,6 +1181,7 @@ public class FairScheduler extends
*/
private boolean shouldAttemptPreemption() {
if (preemptionEnabled) {
+ Resource clusterResource = getClusterResource();
return (preemptionUtilizationThreshold < Math.max(
(float) rootMetrics.getAllocatedMB() / clusterResource.getMemory(),
(float) rootMetrics.getAllocatedVirtualCores() /
@@ -1547,7 +1515,7 @@ public class FairScheduler extends
@Override
public int getNumClusterNodes() {
- return nodes.size();
+ return nodeTracker.nodeCount();
}
@Override
@@ -1577,7 +1545,7 @@ public class FairScheduler extends
// if it does not already exist, so it can be displayed on the web UI.
synchronized (FairScheduler.this) {
allocConf = queueInfo;
- allocConf.getDefaultSchedulingPolicy().initialize(clusterResource);
+ allocConf.getDefaultSchedulingPolicy().initialize(getClusterResource());
queueMgr.updateAllocationConfiguration(allocConf);
maxRunningEnforcer.updateRunnabilityOnReload();
}
@@ -1721,7 +1689,7 @@ public class FairScheduler extends
ResourceOption resourceOption) {
super.updateNodeResource(nm, resourceOption);
updateRootQueueMetrics();
- queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
+ queueMgr.getRootQueue().setSteadyFairShare(getClusterResource());
queueMgr.getRootQueue().recomputeSteadyShares();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/20d389ce/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index 147c3f3..cf12501 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -142,6 +142,7 @@ public class FifoScheduler extends
QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
queueInfo.setQueueName(DEFAULT_QUEUE.getQueueName());
queueInfo.setCapacity(1.0f);
+ Resource clusterResource = getClusterResource();
if (clusterResource.getMemory() == 0) {
queueInfo.setCurrentCapacity(0.0f);
} else {
@@ -297,7 +298,7 @@ public class FifoScheduler extends
@Override
public int getNumClusterNodes() {
- return nodes.size();
+ return nodeTracker.nodeCount();
}
@Override
@@ -327,7 +328,8 @@ public class FifoScheduler extends
// Sanity check
SchedulerUtils.normalizeRequests(ask, resourceCalculator,
- clusterResource, minimumAllocation, getMaximumResourceCapability());
+ getClusterResource(), minimumAllocation,
+ getMaximumResourceCapability());
// Release containers
releaseContainers(release, application);
@@ -377,7 +379,7 @@ public class FifoScheduler extends
}
private FiCaSchedulerNode getNode(NodeId nodeId) {
- return nodes.get(nodeId);
+ return nodeTracker.getNode(nodeId);
}
@VisibleForTesting
@@ -526,7 +528,7 @@ public class FifoScheduler extends
application.showRequests();
// Done
- if (Resources.lessThan(resourceCalculator, clusterResource,
+ if (Resources.lessThan(resourceCalculator, getClusterResource(),
node.getUnallocatedResource(), minimumAllocation)) {
break;
}
@@ -764,7 +766,7 @@ public class FifoScheduler extends
return;
}
- if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
+ if (Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(),
node.getUnallocatedResource(), minimumAllocation)) {
LOG.debug("Node heartbeat " + rmNode.getNodeID() +
" available resource = " + node.getUnallocatedResource());
@@ -783,13 +785,13 @@ public class FifoScheduler extends
}
private void updateAppHeadRoom(SchedulerApplicationAttempt schedulerAttempt) {
- schedulerAttempt.setHeadroom(Resources.subtract(clusterResource,
+ schedulerAttempt.setHeadroom(Resources.subtract(getClusterResource(),
usedResource));
}
private void updateAvailableResourcesMetrics() {
- metrics.setAvailableResourcesToQueue(Resources.subtract(clusterResource,
- usedResource));
+ metrics.setAvailableResourcesToQueue(
+ Resources.subtract(getClusterResource(), usedResource));
}
@Override
@@ -925,7 +927,7 @@ public class FifoScheduler extends
private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
private synchronized void removeNode(RMNode nodeInfo) {
- FiCaSchedulerNode node = getNode(nodeInfo.getNodeID());
+ FiCaSchedulerNode node = nodeTracker.getNode(nodeInfo.getNodeID());
if (node == null) {
return;
}
@@ -937,13 +939,7 @@ public class FifoScheduler extends
SchedulerUtils.LOST_CONTAINER),
RMContainerEventType.KILL);
}
-
- //Remove the node
- this.nodes.remove(nodeInfo.getNodeID());
- updateMaximumAllocation(node, false);
-
- // Update cluster metrics
- Resources.subtractFrom(clusterResource, node.getTotalResource());
+ nodeTracker.removeNode(nodeInfo.getNodeID());
}
@Override
@@ -965,9 +961,7 @@ public class FifoScheduler extends
private synchronized void addNode(RMNode nodeManager) {
FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
usePortForNodeName);
- this.nodes.put(nodeManager.getNodeID(), schedulerNode);
- Resources.addTo(clusterResource, schedulerNode.getTotalResource());
- updateMaximumAllocation(schedulerNode, true);
+ nodeTracker.addNode(schedulerNode);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/20d389ce/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
index e7ba58d..81c8fe6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
@@ -300,22 +300,16 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
verifyMaximumResourceCapability(configuredMaximumResource, scheduler);
- scheduler.nodes = new HashMap<NodeId, SchedulerNode>();
-
- scheduler.nodes.put(mockNode1.getNodeID(), mockNode1);
- scheduler.updateMaximumAllocation(mockNode1, true);
+ scheduler.nodeTracker.addNode(mockNode1);
verifyMaximumResourceCapability(fullResource1, scheduler);
- scheduler.nodes.put(mockNode2.getNodeID(), mockNode2);
- scheduler.updateMaximumAllocation(mockNode2, true);
+ scheduler.nodeTracker.addNode(mockNode2);
verifyMaximumResourceCapability(fullResource2, scheduler);
- scheduler.nodes.remove(mockNode2.getNodeID());
- scheduler.updateMaximumAllocation(mockNode2, false);
+ scheduler.nodeTracker.removeNode(mockNode2.getNodeID());
verifyMaximumResourceCapability(fullResource1, scheduler);
- scheduler.nodes.remove(mockNode1.getNodeID());
- scheduler.updateMaximumAllocation(mockNode1, false);
+ scheduler.nodeTracker.removeNode(mockNode1.getNodeID());
verifyMaximumResourceCapability(configuredMaximumResource, scheduler);
} finally {
rm.stop();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/20d389ce/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
index 9047138..2ef5e39 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
@@ -183,6 +183,7 @@ public class TestReservations {
}
@Test
+ @SuppressWarnings("unchecked")
public void testReservation() throws Exception {
// Test that we now unreserve and use a node that has space
@@ -231,9 +232,9 @@ public class TestReservations {
when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1);
when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2);
- cs.getAllNodes().put(node_0.getNodeID(), node_0);
- cs.getAllNodes().put(node_1.getNodeID(), node_1);
- cs.getAllNodes().put(node_2.getNodeID(), node_2);
+ cs.getNodeTracker().addNode(node_0);
+ cs.getNodeTracker().addNode(node_1);
+ cs.getNodeTracker().addNode(node_2);
final int numNodes = 3;
Resource clusterResource = Resources.createResource(numNodes * (8 * GB));
@@ -346,6 +347,7 @@ public class TestReservations {
// Test that hitting a reservation limit and needing to unreserve
// does not affect assigning containers for other users
@Test
+ @SuppressWarnings("unchecked")
public void testReservationLimitOtherUsers() throws Exception {
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
setup(csConf, true);
@@ -395,9 +397,9 @@ public class TestReservations {
when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1);
when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2);
- cs.getAllNodes().put(node_0.getNodeID(), node_0);
- cs.getAllNodes().put(node_1.getNodeID(), node_1);
- cs.getAllNodes().put(node_2.getNodeID(), node_2);
+ cs.getNodeTracker().addNode(node_0);
+ cs.getNodeTracker().addNode(node_1);
+ cs.getNodeTracker().addNode(node_2);
final int numNodes = 3;
Resource clusterResource = Resources.createResource(numNodes * (8 * GB));
@@ -641,6 +643,7 @@ public class TestReservations {
}
@Test
+ @SuppressWarnings("unchecked")
public void testAssignContainersNeedToUnreserve() throws Exception {
// Test that we now unreserve and use a node that has space
Logger rootLogger = LogManager.getRootLogger();
@@ -684,8 +687,8 @@ public class TestReservations {
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0,
8 * GB);
- cs.getAllNodes().put(node_0.getNodeID(), node_0);
- cs.getAllNodes().put(node_1.getNodeID(), node_1);
+ cs.getNodeTracker().addNode(node_0);
+ cs.getNodeTracker().addNode(node_1);
when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0);
when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/20d389ce/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index 8d7c22e..1add193 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -75,12 +75,10 @@ import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.server.resourcemanager.Application;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -108,7 +106,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.Default;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -2751,8 +2748,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 2, "127.0.0.2");
- NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node1);
- scheduler.handle(nodeEvent2);
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
ApplicationAttemptId attId = createSchedulingRequest(1024, "queue1",
"user1", 0);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/20d389ce/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
index 9bfc283..44877fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
@@ -306,12 +306,7 @@ public class TestFifoScheduler {
nmTokenSecretManager.rollMasterKey();
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
- FifoScheduler scheduler = new FifoScheduler(){
- @SuppressWarnings("unused")
- public Map<NodeId, FiCaSchedulerNode> getNodes(){
- return nodes;
- }
- };
+ FifoScheduler scheduler = new FifoScheduler();
RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
null, containerTokenSecretManager, nmTokenSecretManager, null, scheduler);
rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class));
@@ -331,11 +326,7 @@ public class TestFifoScheduler {
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node0);
scheduler.handle(nodeEvent1);
- Method method = scheduler.getClass().getDeclaredMethod("getNodes");
- @SuppressWarnings("unchecked")
- Map<NodeId, FiCaSchedulerNode> schedulerNodes =
- (Map<NodeId, FiCaSchedulerNode>) method.invoke(scheduler);
- assertEquals(schedulerNodes.values().size(), 1);
+ assertEquals(scheduler.getNumClusterNodes(), 1);
Resource newResource = Resources.createResource(1024, 4);
@@ -345,9 +336,9 @@ public class TestFifoScheduler {
scheduler.handle(node0ResourceUpdate);
// SchedulerNode's total resource and available resource are changed.
- assertEquals(schedulerNodes.get(node0.getNodeID()).getTotalResource()
- .getMemory(), 1024);
- assertEquals(schedulerNodes.get(node0.getNodeID()).
+ assertEquals(1024, scheduler.getNodeTracker().getNode(node0.getNodeID())
+ .getTotalResource().getMemory());
+ assertEquals(1024, scheduler.getNodeTracker().getNode(node0.getNodeID()).
getUnallocatedResource().getMemory(), 1024);
QueueInfo queueInfo = scheduler.getQueueInfo(null, false, false);
Assert.assertEquals(0.0f, queueInfo.getCurrentCapacity(), 0.0f);