You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2014/11/21 19:44:52 UTC
hadoop git commit: YARN-2604. Scheduler should consider
max-allocation-* in conjunction with the largest node. (Robert Kanter via
kasha) (cherry picked from commit 3114d4731dcca7cb6c16aaa7c7a6550b7dd7dccb)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 6570f7fd7 -> e9db0aa35
YARN-2604. Scheduler should consider max-allocation-* in conjunction with the largest node. (Robert Kanter via kasha)
(cherry picked from commit 3114d4731dcca7cb6c16aaa7c7a6550b7dd7dccb)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e9db0aa3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e9db0aa3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e9db0aa3
Branch: refs/heads/branch-2
Commit: e9db0aa35c95bc67b5c5b8692e681865577e572d
Parents: 6570f7f
Author: Karthik Kambatla <ka...@apache.org>
Authored: Fri Nov 21 10:32:28 2014 -0800
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Fri Nov 21 10:42:47 2014 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../scheduler/AbstractYarnScheduler.java | 104 ++++++++-
.../scheduler/capacity/CapacityScheduler.java | 15 +-
.../scheduler/fair/FairScheduler.java | 10 +-
.../scheduler/fifo/FifoScheduler.java | 16 +-
.../resourcemanager/TestFifoScheduler.java | 1 +
.../scheduler/TestAbstractYarnScheduler.java | 213 +++++++++++++++++++
.../capacity/TestContainerAllocation.java | 4 +-
.../scheduler/fair/TestFairScheduler.java | 31 +--
9 files changed, 348 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9db0aa3/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index a9cc577..48480fe 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -57,6 +57,9 @@ Release 2.7.0 - UNRELEASED
YARN-2375. Allow enabling/disabling timeline server per framework.
(Mit Desai via jeagles)
+ YARN-2604. Scheduler should consider max-allocation-* in conjunction
+ with the largest node. (Robert Kanter via kasha)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9db0aa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 8e8d627..bf720ae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -77,7 +78,14 @@ public abstract class AbstractYarnScheduler
protected Resource clusterResource = Resource.newInstance(0, 0);
protected Resource minimumAllocation;
- protected Resource maximumAllocation;
+ private Resource maximumAllocation;
+ private Resource configuredMaximumAllocation;
+ private int maxNodeMemory = -1;
+ private int maxNodeVCores = -1;
+ private ReentrantReadWriteLock maximumAllocationLock =
+ new ReentrantReadWriteLock();
+ private boolean useConfiguredMaximumAllocationOnly = true;
+ private long configuredMaximumAllocationWaitTime;
protected RMContext rmContext;
protected Map<ApplicationId, SchedulerApplication<T>> applications;
@@ -102,6 +110,9 @@ public abstract class AbstractYarnScheduler
nmExpireInterval =
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
+ configuredMaximumAllocationWaitTime =
+ conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
+ YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);
createReleaseCache();
super.serviceInit(conf);
}
@@ -145,7 +156,37 @@ public abstract class AbstractYarnScheduler
@Override
public Resource getMaximumResourceCapability() {
- return maximumAllocation;
+ Resource maxResource;
+ ReentrantReadWriteLock.ReadLock readLock = maximumAllocationLock.readLock();
+ readLock.lock();
+ try {
+ if (useConfiguredMaximumAllocationOnly) {
+ if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
+ > configuredMaximumAllocationWaitTime) {
+ useConfiguredMaximumAllocationOnly = false;
+ }
+ maxResource = Resources.clone(configuredMaximumAllocation);
+ } else {
+ maxResource = Resources.clone(maximumAllocation);
+ }
+ } finally {
+ readLock.unlock();
+ }
+ return maxResource;
+ }
+
+ protected void initMaximumResourceCapability(Resource maximumAllocation) {
+ ReentrantReadWriteLock.WriteLock writeLock =
+ maximumAllocationLock.writeLock();
+ writeLock.lock();
+ try {
+ if (this.configuredMaximumAllocation == null) {
+ this.configuredMaximumAllocation = Resources.clone(maximumAllocation);
+ this.maximumAllocation = Resources.clone(maximumAllocation);
+ }
+ } finally {
+ writeLock.unlock();
+ }
}
protected void containerLaunchedOnNode(ContainerId containerId,
@@ -528,4 +569,63 @@ public abstract class AbstractYarnScheduler
throw new YarnException(getClass().getSimpleName()
+ " does not support reservations");
}
+
+ protected void updateMaximumAllocation(SchedulerNode node, boolean add) {
+ ReentrantReadWriteLock.WriteLock writeLock =
+ maximumAllocationLock.writeLock();
+ writeLock.lock();
+ try {
+ if (add) { // added node
+ int nodeMemory = node.getAvailableResource().getMemory();
+ if (nodeMemory > maxNodeMemory) {
+ maxNodeMemory = nodeMemory;
+ maximumAllocation.setMemory(Math.min(
+ configuredMaximumAllocation.getMemory(), maxNodeMemory));
+ }
+ int nodeVCores = node.getAvailableResource().getVirtualCores();
+ if (nodeVCores > maxNodeVCores) {
+ maxNodeVCores = nodeVCores;
+ maximumAllocation.setVirtualCores(Math.min(
+ configuredMaximumAllocation.getVirtualCores(), maxNodeVCores));
+ }
+ } else { // removed node
+ if (maxNodeMemory == node.getAvailableResource().getMemory()) {
+ maxNodeMemory = -1;
+ }
+ if (maxNodeVCores == node.getAvailableResource().getVirtualCores()) {
+ maxNodeVCores = -1;
+ }
+ // We only have to iterate through the nodes if the current max memory
+ // or vcores was equal to the removed node's
+ if (maxNodeMemory == -1 || maxNodeVCores == -1) {
+ for (Map.Entry<NodeId, N> nodeEntry : nodes.entrySet()) {
+ int nodeMemory =
+ nodeEntry.getValue().getAvailableResource().getMemory();
+ if (nodeMemory > maxNodeMemory) {
+ maxNodeMemory = nodeMemory;
+ }
+ int nodeVCores =
+ nodeEntry.getValue().getAvailableResource().getVirtualCores();
+ if (nodeVCores > maxNodeVCores) {
+ maxNodeVCores = nodeVCores;
+ }
+ }
+ if (maxNodeMemory == -1) { // no nodes
+ maximumAllocation.setMemory(configuredMaximumAllocation.getMemory());
+ } else {
+ maximumAllocation.setMemory(
+ Math.min(configuredMaximumAllocation.getMemory(), maxNodeMemory));
+ }
+ if (maxNodeVCores == -1) { // no nodes
+ maximumAllocation.setVirtualCores(configuredMaximumAllocation.getVirtualCores());
+ } else {
+ maximumAllocation.setVirtualCores(
+ Math.min(configuredMaximumAllocation.getVirtualCores(), maxNodeVCores));
+ }
+ }
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9db0aa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index c383e43..28158c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -33,7 +33,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.HashSet;
-import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -284,7 +283,7 @@ public class CapacityScheduler extends
this.conf = loadCapacitySchedulerConfiguration(configuration);
validateConf(this.conf);
this.minimumAllocation = this.conf.getMinimumAllocation();
- this.maximumAllocation = this.conf.getMaximumAllocation();
+ initMaximumResourceCapability(this.conf.getMaximumAllocation());
this.calculator = this.conf.getResourceCalculator();
this.usePortForNodeName = this.conf.getUsePortForNodeName();
this.applications =
@@ -321,8 +320,8 @@ public class CapacityScheduler extends
@Override
public void serviceInit(Configuration conf) throws Exception {
Configuration configuration = new Configuration(conf);
- initScheduler(configuration);
super.serviceInit(conf);
+ initScheduler(configuration);
}
@Override
@@ -849,7 +848,7 @@ public class CapacityScheduler extends
// Sanity check
SchedulerUtils.normalizeRequests(
ask, getResourceCalculator(), getClusterResource(),
- getMinimumResourceCapability(), maximumAllocation);
+ getMinimumResourceCapability(), getMaximumResourceCapability());
// Release containers
releaseContainers(release, application);
@@ -1123,12 +1122,13 @@ public class CapacityScheduler extends
labelManager.activateNode(nodeManager.getNodeID(),
nodeManager.getTotalCapability());
}
-
- this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager,
- usePortForNodeName));
+ FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
+ usePortForNodeName);
+ this.nodes.put(nodeManager.getNodeID(), schedulerNode);
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
root.updateClusterResource(clusterResource);
int numNodes = numNodeManagers.incrementAndGet();
+ updateMaximumAllocation(schedulerNode, true);
LOG.info("Added node " + nodeManager.getNodeAddress() +
" clusterResource: " + clusterResource);
@@ -1177,6 +1177,7 @@ public class CapacityScheduler extends
}
this.nodes.remove(nodeInfo.getNodeID());
+ updateMaximumAllocation(node, false);
LOG.info("Removed node " + nodeInfo.getNodeAddress() +
" clusterResource: " + clusterResource);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9db0aa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 94fb849..55e4549 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -817,9 +817,11 @@ public class FairScheduler extends
}
private synchronized void addNode(RMNode node) {
- nodes.put(node.getNodeID(), new FSSchedulerNode(node, usePortForNodeName));
+ FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName);
+ nodes.put(node.getNodeID(), schedulerNode);
Resources.addTo(clusterResource, node.getTotalCapability());
updateRootQueueMetrics();
+ updateMaximumAllocation(schedulerNode, true);
queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
queueMgr.getRootQueue().recomputeSteadyShares();
@@ -859,6 +861,7 @@ public class FairScheduler extends
nodes.remove(rmNode.getNodeID());
queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
queueMgr.getRootQueue().recomputeSteadyShares();
+ updateMaximumAllocation(node, false);
LOG.info("Removed node " + rmNode.getNodeAddress() +
" cluster capacity: " + clusterResource);
}
@@ -877,7 +880,8 @@ public class FairScheduler extends
// Sanity check
SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(),
- clusterResource, minimumAllocation, maximumAllocation, incrAllocation);
+ clusterResource, minimumAllocation, getMaximumResourceCapability(),
+ incrAllocation);
// Set amResource for this app
if (!application.getUnmanagedAM() && ask.size() == 1
@@ -1225,7 +1229,7 @@ public class FairScheduler extends
this.conf = new FairSchedulerConfiguration(conf);
validateConf(this.conf);
minimumAllocation = this.conf.getMinimumAllocation();
- maximumAllocation = this.conf.getMaximumAllocation();
+ initMaximumResourceCapability(this.conf.getMaximumAllocation());
incrAllocation = this.conf.getIncrementAllocation();
continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
continuousSchedulingSleepMs =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9db0aa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index 532edc7..3d4c9dd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -215,10 +215,13 @@ public class FifoScheduler extends
Resources.createResource(conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
- this.maximumAllocation =
+ initMaximumResourceCapability(
Resources.createResource(conf.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB),
+ conf.getInt(
+ YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES)));
this.usePortForNodeName = conf.getBoolean(
YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
@@ -303,7 +306,7 @@ public class FifoScheduler extends
// Sanity check
SchedulerUtils.normalizeRequests(ask, resourceCalculator,
- clusterResource, minimumAllocation, maximumAllocation);
+ clusterResource, minimumAllocation, getMaximumResourceCapability());
// Release containers
releaseContainers(release, application);
@@ -899,6 +902,7 @@ public class FifoScheduler extends
//Remove the node
this.nodes.remove(nodeInfo.getNodeID());
+ updateMaximumAllocation(node, false);
// Update cluster metrics
Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability());
@@ -916,9 +920,11 @@ public class FifoScheduler extends
}
private synchronized void addNode(RMNode nodeManager) {
- this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager,
- usePortForNodeName));
+ FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
+ usePortForNodeName);
+ this.nodes.put(nodeManager.getNodeID(), schedulerNode);
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
+ updateMaximumAllocation(schedulerNode, true);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9db0aa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
index 12f7498..225e225 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceReque
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9db0aa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
new file mode 100644
index 0000000..67bdae2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
+
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
+
+ public TestAbstractYarnScheduler(SchedulerType type) {
+ super(type);
+ }
+
+ @Test
+ public void testMaximimumAllocationMemory() throws Exception {
+ final int node1MaxMemory = 15 * 1024;
+ final int node2MaxMemory = 5 * 1024;
+ final int node3MaxMemory = 6 * 1024;
+ final int configuredMaxMemory = 10 * 1024;
+ configureScheduler();
+ YarnConfiguration conf = getConf();
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+ configuredMaxMemory);
+ conf.setLong(
+ YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
+ 1000 * 1000);
+ MockRM rm = new MockRM(conf);
+ try {
+ rm.start();
+ testMaximumAllocationMemoryHelper(
+ (AbstractYarnScheduler) rm.getResourceScheduler(),
+ node1MaxMemory, node2MaxMemory, node3MaxMemory,
+ configuredMaxMemory, configuredMaxMemory, configuredMaxMemory,
+ configuredMaxMemory, configuredMaxMemory, configuredMaxMemory);
+ } finally {
+ rm.stop();
+ }
+
+ conf.setLong(
+ YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
+ 0);
+ rm = new MockRM(conf);
+ try {
+ rm.start();
+ testMaximumAllocationMemoryHelper(
+ (AbstractYarnScheduler) rm.getResourceScheduler(),
+ node1MaxMemory, node2MaxMemory, node3MaxMemory,
+ configuredMaxMemory, configuredMaxMemory, configuredMaxMemory,
+ node2MaxMemory, node3MaxMemory, node2MaxMemory);
+ } finally {
+ rm.stop();
+ }
+ }
+
+ private void testMaximumAllocationMemoryHelper(
+ AbstractYarnScheduler scheduler,
+ final int node1MaxMemory, final int node2MaxMemory,
+ final int node3MaxMemory, final int... expectedMaxMemory)
+ throws Exception {
+ Assert.assertEquals(6, expectedMaxMemory.length);
+
+ Assert.assertEquals(0, scheduler.getNumClusterNodes());
+ int maxMemory = scheduler.getMaximumResourceCapability().getMemory();
+ Assert.assertEquals(expectedMaxMemory[0], maxMemory);
+
+ RMNode node1 = MockNodes.newNodeInfo(
+ 0, Resources.createResource(node1MaxMemory), 1, "127.0.0.2");
+ scheduler.handle(new NodeAddedSchedulerEvent(node1));
+ Assert.assertEquals(1, scheduler.getNumClusterNodes());
+ maxMemory = scheduler.getMaximumResourceCapability().getMemory();
+ Assert.assertEquals(expectedMaxMemory[1], maxMemory);
+
+ scheduler.handle(new NodeRemovedSchedulerEvent(node1));
+ Assert.assertEquals(0, scheduler.getNumClusterNodes());
+ maxMemory = scheduler.getMaximumResourceCapability().getMemory();
+ Assert.assertEquals(expectedMaxMemory[2], maxMemory);
+
+ RMNode node2 = MockNodes.newNodeInfo(
+ 0, Resources.createResource(node2MaxMemory), 2, "127.0.0.3");
+ scheduler.handle(new NodeAddedSchedulerEvent(node2));
+ Assert.assertEquals(1, scheduler.getNumClusterNodes());
+ maxMemory = scheduler.getMaximumResourceCapability().getMemory();
+ Assert.assertEquals(expectedMaxMemory[3], maxMemory);
+
+ RMNode node3 = MockNodes.newNodeInfo(
+ 0, Resources.createResource(node3MaxMemory), 3, "127.0.0.4");
+ scheduler.handle(new NodeAddedSchedulerEvent(node3));
+ Assert.assertEquals(2, scheduler.getNumClusterNodes());
+ maxMemory = scheduler.getMaximumResourceCapability().getMemory();
+ Assert.assertEquals(expectedMaxMemory[4], maxMemory);
+
+ scheduler.handle(new NodeRemovedSchedulerEvent(node3));
+ Assert.assertEquals(1, scheduler.getNumClusterNodes());
+ maxMemory = scheduler.getMaximumResourceCapability().getMemory();
+ Assert.assertEquals(expectedMaxMemory[5], maxMemory);
+
+ scheduler.handle(new NodeRemovedSchedulerEvent(node2));
+ Assert.assertEquals(0, scheduler.getNumClusterNodes());
+ }
+
+ @Test
+ public void testMaximimumAllocationVCores() throws Exception {
+ final int node1MaxVCores = 15;
+ final int node2MaxVCores = 5;
+ final int node3MaxVCores = 6;
+ final int configuredMaxVCores = 10;
+ configureScheduler();
+ YarnConfiguration conf = getConf();
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+ configuredMaxVCores);
+ conf.setLong(
+ YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
+ 1000 * 1000);
+ MockRM rm = new MockRM(conf);
+ try {
+ rm.start();
+ testMaximumAllocationVCoresHelper(
+ (AbstractYarnScheduler) rm.getResourceScheduler(),
+ node1MaxVCores, node2MaxVCores, node3MaxVCores,
+ configuredMaxVCores, configuredMaxVCores, configuredMaxVCores,
+ configuredMaxVCores, configuredMaxVCores, configuredMaxVCores);
+ } finally {
+ rm.stop();
+ }
+
+ conf.setLong(
+ YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
+ 0);
+ rm = new MockRM(conf);
+ try {
+ rm.start();
+ testMaximumAllocationVCoresHelper(
+ (AbstractYarnScheduler) rm.getResourceScheduler(),
+ node1MaxVCores, node2MaxVCores, node3MaxVCores,
+ configuredMaxVCores, configuredMaxVCores, configuredMaxVCores,
+ node2MaxVCores, node3MaxVCores, node2MaxVCores);
+ } finally {
+ rm.stop();
+ }
+ }
+
+ private void testMaximumAllocationVCoresHelper(
+ AbstractYarnScheduler scheduler,
+ final int node1MaxVCores, final int node2MaxVCores,
+ final int node3MaxVCores, final int... expectedMaxVCores)
+ throws Exception {
+ Assert.assertEquals(6, expectedMaxVCores.length);
+
+ Assert.assertEquals(0, scheduler.getNumClusterNodes());
+ int maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
+ Assert.assertEquals(expectedMaxVCores[0], maxVCores);
+
+ RMNode node1 = MockNodes.newNodeInfo(
+ 0, Resources.createResource(1024, node1MaxVCores), 1, "127.0.0.2");
+ scheduler.handle(new NodeAddedSchedulerEvent(node1));
+ Assert.assertEquals(1, scheduler.getNumClusterNodes());
+ maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
+ Assert.assertEquals(expectedMaxVCores[1], maxVCores);
+
+ scheduler.handle(new NodeRemovedSchedulerEvent(node1));
+ Assert.assertEquals(0, scheduler.getNumClusterNodes());
+ maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
+ Assert.assertEquals(expectedMaxVCores[2], maxVCores);
+
+ RMNode node2 = MockNodes.newNodeInfo(
+ 0, Resources.createResource(1024, node2MaxVCores), 2, "127.0.0.3");
+ scheduler.handle(new NodeAddedSchedulerEvent(node2));
+ Assert.assertEquals(1, scheduler.getNumClusterNodes());
+ maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
+ Assert.assertEquals(expectedMaxVCores[3], maxVCores);
+
+ RMNode node3 = MockNodes.newNodeInfo(
+ 0, Resources.createResource(1024, node3MaxVCores), 3, "127.0.0.4");
+ scheduler.handle(new NodeAddedSchedulerEvent(node3));
+ Assert.assertEquals(2, scheduler.getNumClusterNodes());
+ maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
+ Assert.assertEquals(expectedMaxVCores[4], maxVCores);
+
+ scheduler.handle(new NodeRemovedSchedulerEvent(node3));
+ Assert.assertEquals(1, scheduler.getNumClusterNodes());
+ maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
+ Assert.assertEquals(expectedMaxVCores[5], maxVCores);
+
+ scheduler.handle(new NodeRemovedSchedulerEvent(node2));
+ Assert.assertEquals(0, scheduler.getNumClusterNodes());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9db0aa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
index ad834ac..9a29bff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
@@ -116,7 +116,7 @@ public class TestContainerAllocation {
am1.registerAppAttempt();
LOG.info("sending container requests ");
- am1.addRequests(new String[] {"*"}, 3 * GB, 1, 1);
+ am1.addRequests(new String[] {"*"}, 2 * GB, 1, 1);
AllocateResponse alloc1Response = am1.schedule(); // send the request
// kick the scheduler
@@ -291,7 +291,7 @@ public class TestContainerAllocation {
// This is to test fetching AM container will be retried, if AM container is
// not fetchable since DNS is unavailable causing container token/NMtoken
// creation failure.
- @Test(timeout = 20000)
+ @Test(timeout = 30000)
public void testAMContainerAllocationWhenDNSUnavailable() throws Exception {
MockRM rm1 = new MockRM(conf) {
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9db0aa3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index 0b14499..3cd445a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
@@ -2590,37 +2591,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
}
assertEquals(FinalApplicationStatus.FAILED, application.getFinalApplicationStatus());
}
-
- @Test
- public void testReservationThatDoesntFit() throws IOException {
- scheduler.init(conf);
- scheduler.start();
- scheduler.reinitialize(conf, resourceManager.getRMContext());
- RMNode node1 =
- MockNodes
- .newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
- NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
- scheduler.handle(nodeEvent1);
-
- ApplicationAttemptId attId = createSchedulingRequest(2048, "queue1",
- "user1", 1);
- scheduler.update();
- NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
- scheduler.handle(updateEvent);
-
- FSAppAttempt app = scheduler.getSchedulerApp(attId);
- assertEquals(0, app.getLiveContainers().size());
- assertEquals(0, app.getReservedContainers().size());
-
- createSchedulingRequestExistingApplication(1024, 2, attId);
- scheduler.update();
- scheduler.handle(updateEvent);
-
- assertEquals(1, app.getLiveContainers().size());
- assertEquals(0, app.getReservedContainers().size());
- }
-
@Test
public void testRemoveNodeUpdatesRootQueueMetrics() throws IOException {
scheduler.init(conf);