You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by tu...@apache.org on 2013/06/03 19:36:21 UTC
svn commit: r1489072 - in
/hadoop/common/branches/branch-2/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/
hadoop-yarn/hadoop-yarn-server/ha...
Author: tucu
Date: Mon Jun 3 17:36:20 2013
New Revision: 1489072
URL: http://svn.apache.org/r1489072
Log:
YARN-326. Add multi-resource scheduling to the fair scheduler. (sandyr via tucu)
Added:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceType.java
- copied unchanged from r1489070, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceType.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceWeights.java
- copied unchanged from r1489070, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceWeights.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
- copied unchanged from r1489070, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerConfiguration.java
- copied unchanged from r1489070, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerConfiguration.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/
- copied from r1489070, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java
- copied unchanged from r1489070, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1489072&r1=1489071&r2=1489072&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Mon Jun 3 17:36:20 2013
@@ -88,6 +88,9 @@ Release 2.1.0-beta - UNRELEASED
YARN-392. Make it possible to specify hard locality constraints in resource
requests. (sandyr via tucu)
+ YARN-326. Add multi-resource scheduling to the fair scheduler.
+ (sandyr via tucu)
+
IMPROVEMENTS
YARN-365. Change NM heartbeat handling to not generate a scheduler event
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java?rev=1489072&r1=1489071&r2=1489072&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java Mon Jun 3 17:36:20 2013
@@ -32,4 +32,8 @@ public class AllocationConfigurationExce
public AllocationConfigurationException(String message) {
super(message);
}
+
+ public AllocationConfigurationException(String message, Throwable t) {
+ super(message, t);
+ }
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java?rev=1489072&r1=1489071&r2=1489072&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java Mon Jun 3 17:36:20 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
@@ -119,7 +120,7 @@ public class AppSchedulable extends Sche
}
@Override
- public double getWeight() {
+ public ResourceWeights getWeights() {
return scheduler.getAppWeight(this);
}
@@ -237,10 +238,7 @@ public class AppSchedulable extends Sche
}
// Can we allocate a container on this node?
- int availableContainers =
- available.getMemory() / capability.getMemory();
-
- if (availableContainers > 0) {
+ if (Resources.fitsIn(capability, available)) {
// Inform the application of the new container for this request
RMContainer allocatedContainer =
app.allocate(type, node, priority, request, container);
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java?rev=1489072&r1=1489071&r2=1489072&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java Mon Jun 3 17:36:20 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@@ -84,7 +85,7 @@ public abstract class FSQueue extends Sc
throws AllocationConfigurationException;
@Override
- public double getWeight() {
+ public ResourceWeights getWeights() {
return queueMgr.getQueueWeight(getName());
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java?rev=1489072&r1=1489071&r2=1489072&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java Mon Jun 3 17:36:20 2013
@@ -180,8 +180,8 @@ public class FSSchedulerNode extends Sch
@Override
public String toString() {
return "host: " + rmNode.getNodeAddress() + " #containers=" + getNumContainers() +
- " available=" + getAvailableResource().getMemory() +
- " used=" + getUsedResource().getMemory();
+ " available=" + getAvailableResource() +
+ " used=" + getUsedResource();
}
@Override
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1489072&r1=1489071&r2=1489072&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Mon Jun 3 17:36:20 2013
@@ -57,8 +57,10 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -495,14 +497,14 @@ public class FairScheduler implements Re
}
// synchronized for sizeBasedWeight
- public synchronized double getAppWeight(AppSchedulable app) {
+ public synchronized ResourceWeights getAppWeight(AppSchedulable app) {
if (!app.getRunnable()) {
// Job won't launch tasks, but don't return 0 to avoid division errors
- return 1.0;
+ return ResourceWeights.NEUTRAL;
} else {
double weight = 1.0;
if (sizeBasedWeight) {
- // Set weight based on current demand
+ // Set weight based on current memory demand
weight = Math.log1p(app.getDemand().getMemory()) / Math.log(2);
}
weight *= app.getPriority().getPriority();
@@ -510,7 +512,7 @@ public class FairScheduler implements Re
// Run weight through the user-supplied weightAdjuster
weight = weightAdjuster.adjustWeight(app, weight);
}
- return weight;
+ return new ResourceWeights((float)weight);
}
}
@@ -714,37 +716,6 @@ public class FairScheduler implements Re
" cluster capacity: " + clusterCapacity);
}
- /**
- * Utility method to normalize a list of resource requests, by ensuring that
- * the memory for each request is a multiple of minMemory and is not zero.
- *
- * @param asks a list of resource requests
- * @param minMemory the configured minimum memory allocation
- * @param maxMemory the configured maximum memory allocation
- */
- static void normalizeRequests(List<ResourceRequest> asks,
- int minMemory, int maxMemory) {
- for (ResourceRequest ask : asks) {
- normalizeRequest(ask, minMemory, maxMemory);
- }
- }
-
- /**
- * Utility method to normalize a resource request, by ensuring that the
- * requested memory is a multiple of minMemory and is not zero.
- *
- * @param ask the resource request
- * @param minMemory the configured minimum memory allocation
- * @param maxMemory the configured maximum memory allocation
- */
- static void normalizeRequest(ResourceRequest ask, int minMemory,
- int maxMemory) {
- int memory = Math.max(ask.getCapability().getMemory(), minMemory);
- int normalizedMemory =
- minMemory * ((memory / minMemory) + (memory % minMemory > 0 ? 1 : 0));
- ask.getCapability().setMemory(Math.min(normalizedMemory, maxMemory));
- }
-
@Override
public Allocation allocate(ApplicationAttemptId appAttemptId,
List<ResourceRequest> ask, List<ContainerId> release) {
@@ -758,8 +729,8 @@ public class FairScheduler implements Re
}
// Sanity check
- normalizeRequests(ask, minimumAllocation.getMemory(),
- maximumAllocation.getMemory());
+ SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(),
+ clusterCapacity, minimumAllocation, maximumAllocation);
// Release containers
for (ContainerId releasedContainerId : release) {
@@ -1015,8 +986,8 @@ public class FairScheduler implements Re
public synchronized void reinitialize(Configuration conf, RMContext rmContext)
throws IOException {
this.conf = new FairSchedulerConfiguration(conf);
- minimumAllocation = this.conf.getMinimumMemoryAllocation();
- maximumAllocation = this.conf.getMaximumMemoryAllocation();
+ minimumAllocation = this.conf.getMinimumAllocation();
+ maximumAllocation = this.conf.getMaximumAllocation();
userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
rackLocalityThreshold = this.conf.getLocalityThresholdRack();
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java?rev=1489072&r1=1489071&r2=1489072&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java Mon Jun 3 17:36:20 2013
@@ -18,12 +18,16 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.File;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.util.BuilderUtils;
@Private
@Evolving
@@ -78,18 +82,24 @@ public class FairSchedulerConfiguration
addResource(FS_CONFIGURATION_FILE);
}
- public Resource getMinimumMemoryAllocation() {
+ public Resource getMinimumAllocation() {
int mem = getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
- return Resources.createResource(mem);
+ int cpu = getInt(
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
+ return Resources.createResource(mem, cpu);
}
- public Resource getMaximumMemoryAllocation() {
+ public Resource getMaximumAllocation() {
int mem = getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
- return Resources.createResource(mem);
+ int cpu = getInt(
+ YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+ return Resources.createResource(mem, cpu);
}
public boolean getUserAsDefaultQueue() {
@@ -136,4 +146,34 @@ public class FairSchedulerConfiguration
public int getWaitTimeBeforeKill() {
return getInt(WAIT_TIME_BEFORE_KILL, DEFAULT_WAIT_TIME_BEFORE_KILL);
}
+
+ /**
+ * Parses a resource config value of a form like "1024", "1024 mb",
+ * or "1024 mb, 3 vcores". If no units are given, megabytes are assumed.
+ *
+ * @throws AllocationConfigurationException
+ */
+ public static Resource parseResourceConfigValue(String val)
+ throws AllocationConfigurationException {
+ try {
+ int memory = findResource(val, "mb");
+ int vcores = findResource(val, "vcores");
+ return BuilderUtils.newResource(memory, vcores);
+ } catch (AllocationConfigurationException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new AllocationConfigurationException(
+ "Error reading resource config", ex);
+ }
+ }
+
+ private static int findResource(String val, String units)
+ throws AllocationConfigurationException {
+ Pattern pattern = Pattern.compile("(\\d+) ?" + units);
+ Matcher matcher = pattern.matcher(val);
+ if (!matcher.find()) {
+ throw new AllocationConfigurationException("Missing resource: " + units);
+ }
+ return Integer.parseInt(matcher.group(1));
+ }
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1489072&r1=1489071&r2=1489072&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Mon Jun 3 17:36:20 2013
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
@@ -301,7 +302,7 @@ public class QueueManager {
Map<String, Resource> maxQueueResources = new HashMap<String, Resource>();
Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
- Map<String, Double> queueWeights = new HashMap<String, Double>();
+ Map<String, ResourceWeights> queueWeights = new HashMap<String, ResourceWeights>();
Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, SchedulingPolicy>();
Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
Map<String, Map<QueueACL, AccessControlList>> queueAcls =
@@ -415,7 +416,7 @@ public class QueueManager {
*/
private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources,
Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
- Map<String, Integer> userMaxApps, Map<String, Double> queueWeights,
+ Map<String, Integer> userMaxApps, Map<String, ResourceWeights> queueWeights,
Map<String, SchedulingPolicy> queuePolicies,
Map<String, Long> minSharePreemptionTimeouts,
Map<String, Map<QueueACL, AccessControlList>> queueAcls, List<String> queueNamesInAllocFile)
@@ -433,12 +434,12 @@ public class QueueManager {
Element field = (Element) fieldNode;
if ("minResources".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
- int val = Integer.parseInt(text);
- minQueueResources.put(queueName, Resources.createResource(val));
+ Resource val = FairSchedulerConfiguration.parseResourceConfigValue(text);
+ minQueueResources.put(queueName, val);
} else if ("maxResources".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
- int val = Integer.parseInt(text);
- maxQueueResources.put(queueName, Resources.createResource(val));
+ Resource val = FairSchedulerConfiguration.parseResourceConfigValue(text);
+ maxQueueResources.put(queueName, val);
} else if ("maxRunningApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
@@ -446,7 +447,7 @@ public class QueueManager {
} else if ("weight".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
double val = Double.parseDouble(text);
- queueWeights.put(queueName, val);
+ queueWeights.put(queueName, new ResourceWeights((float)val));
} else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
@@ -454,7 +455,9 @@ public class QueueManager {
} else if ("schedulingPolicy".equals(field.getTagName())
|| "schedulingMode".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
- queuePolicies.put(queueName, SchedulingPolicy.parse(text));
+ SchedulingPolicy policy = SchedulingPolicy.parse(text);
+ policy.initialize(scheduler.getClusterCapacity());
+ queuePolicies.put(queueName, policy);
} else if ("aclSubmitApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
@@ -510,13 +513,20 @@ public class QueueManager {
}
/**
- * Get a collection of all queues
+ * Get a collection of all leaf queues
*/
public Collection<FSLeafQueue> getLeafQueues() {
synchronized (queues) {
return leafQueues;
}
}
+
+ /**
+ * Get a collection of all queues
+ */
+ public Collection<FSQueue> getQueues() {
+ return queues.values();
+ }
public int getUserMaxApps(String user) {
// save current info in case it gets changed under us
@@ -538,12 +548,12 @@ public class QueueManager {
}
}
- public double getQueueWeight(String queue) {
- Double weight = info.queueWeights.get(queue);
+ public ResourceWeights getQueueWeight(String queue) {
+ ResourceWeights weight = info.queueWeights.get(queue);
if (weight != null) {
return weight;
} else {
- return 1.0;
+ return ResourceWeights.NEUTRAL;
}
}
@@ -595,7 +605,7 @@ public class QueueManager {
// Maximum amount of resources per queue
public final Map<String, Resource> maxQueueResources;
// Sharing weights for each queue
- public final Map<String, Double> queueWeights;
+ public final Map<String, ResourceWeights> queueWeights;
// Max concurrent running applications for each queue and for each user; in addition,
// for users that have no max specified, we use the userMaxJobsDefault.
@@ -625,7 +635,7 @@ public class QueueManager {
public QueueManagerInfo(Map<String, Resource> minQueueResources,
Map<String, Resource> maxQueueResources,
Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
- Map<String, Double> queueWeights, int userMaxAppsDefault,
+ Map<String, ResourceWeights> queueWeights, int userMaxAppsDefault,
int queueMaxAppsDefault, SchedulingPolicy defaultSchedulingPolicy,
Map<String, Long> minSharePreemptionTimeouts,
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
@@ -647,7 +657,7 @@ public class QueueManager {
public QueueManagerInfo() {
minQueueResources = new HashMap<String, Resource>();
maxQueueResources = new HashMap<String, Resource>();
- queueWeights = new HashMap<String, Double>();
+ queueWeights = new HashMap<String, ResourceWeights>();
queueMaxApps = new HashMap<String, Integer>();
userMaxApps = new HashMap<String, Integer>();
userMaxAppsDefault = Integer.MAX_VALUE;
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java?rev=1489072&r1=1489071&r2=1489072&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java Mon Jun 3 17:36:20 2013
@@ -22,6 +22,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
/**
@@ -80,7 +81,7 @@ public abstract class Schedulable {
/** Job/queue weight in fair sharing. */
- public abstract double getWeight();
+ public abstract ResourceWeights getWeights();
/** Start time for jobs in FIFO queues; meaningless for QueueSchedulables.*/
public abstract long getStartTime();
@@ -110,7 +111,7 @@ public abstract class Schedulable {
/** Convenient toString implementation for debugging. */
@Override
public String toString() {
- return String.format("[%s, demand=%s, running=%s, share=%s,], w=%.1f]",
- getName(), getDemand(), getResourceUsage(), fairShare, getWeight());
+ return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]",
+ getName(), getDemand(), getResourceUsage(), fairShare, getWeights());
}
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java?rev=1489072&r1=1489071&r2=1489072&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java Mon Jun 3 17:36:20 2013
@@ -25,6 +25,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
@@ -67,11 +68,12 @@ public abstract class SchedulingPolicy {
/**
* Returns {@link SchedulingPolicy} instance corresponding to the
* {@link SchedulingPolicy} passed as a string. The policy can be "fair" for
- * FairsharePolicy or "fifo" for FifoPolicy. For custom
+ * FairSharePolicy, "fifo" for FifoPolicy, or "drf" for
+ * DominantResourceFairnessPolicy. For a custom
* {@link SchedulingPolicy}s in the RM classpath, the policy should be
* canonical class name of the {@link SchedulingPolicy}.
*
- * @param policy canonical class name or "fair" or "fifo"
+ * @param policy canonical class name or "drf" or "fair" or "fifo"
* @throws AllocationConfigurationException
*/
@SuppressWarnings("unchecked")
@@ -80,10 +82,12 @@ public abstract class SchedulingPolicy {
@SuppressWarnings("rawtypes")
Class clazz;
String text = policy.toLowerCase();
- if (text.equals("fair")) {
+ if (text.equalsIgnoreCase(FairSharePolicy.NAME)) {
clazz = FairSharePolicy.class;
- } else if (text.equals("fifo")) {
+ } else if (text.equalsIgnoreCase(FifoPolicy.NAME)) {
clazz = FifoPolicy.class;
+ } else if (text.equalsIgnoreCase(DominantResourceFairnessPolicy.NAME)) {
+ clazz = DominantResourceFairnessPolicy.class;
} else {
try {
clazz = Class.forName(policy);
@@ -98,6 +102,8 @@ public abstract class SchedulingPolicy {
}
return getInstance(clazz);
}
+
+ public void initialize(Resource clusterCapacity) {}
/**
* @return returns the name of {@link SchedulingPolicy}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java?rev=1489072&r1=1489071&r2=1489072&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java Mon Jun 3 17:36:20 2013
@@ -25,17 +25,21 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
import com.google.common.annotations.VisibleForTesting;
+/**
+ * Makes scheduling decisions by trying to equalize shares of memory.
+ */
@Private
@Unstable
public class FairSharePolicy extends SchedulingPolicy {
@VisibleForTesting
- public static final String NAME = "Fairshare";
+ public static final String NAME = "fair";
private static final DefaultResourceCalculator RESOURCE_CALCULATOR =
new DefaultResourceCalculator();
private FairShareComparator comparator = new FairShareComparator();
@@ -79,8 +83,10 @@ public class FairSharePolicy extends Sch
/ Resources.max(RESOURCE_CALCULATOR, null, minShare1, one).getMemory();
minShareRatio2 = (double) s2.getResourceUsage().getMemory()
/ Resources.max(RESOURCE_CALCULATOR, null, minShare2, one).getMemory();
- useToWeightRatio1 = s1.getResourceUsage().getMemory() / s1.getWeight();
- useToWeightRatio2 = s2.getResourceUsage().getMemory() / s2.getWeight();
+ useToWeightRatio1 = s1.getResourceUsage().getMemory() /
+ s1.getWeights().getWeight(ResourceType.MEMORY);
+ useToWeightRatio2 = s2.getResourceUsage().getMemory() /
+ s2.getWeights().getWeight(ResourceType.MEMORY);
int res = 0;
if (s1Needy && !s2Needy)
res = -1;
@@ -220,7 +226,7 @@ public class FairSharePolicy extends Sch
* {@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
*/
private static Resource computeShare(Schedulable sched, double r2sRatio) {
- double share = sched.getWeight() * r2sRatio;
+ double share = sched.getWeights().getWeight(ResourceType.MEMORY) * r2sRatio;
share = Math.max(share, sched.getMinShare().getMemory());
share = Math.min(share, sched.getDemand().getMemory());
return Resources.createResource((int) share);
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java?rev=1489072&r1=1489071&r2=1489072&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java Mon Jun 3 17:36:20 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.util.Records;
@@ -30,7 +31,7 @@ public class FakeSchedulable extends Sch
private Resource demand;
private Resource usage;
private Resource minShare;
- private double weight;
+ private ResourceWeights weights;
private Priority priority;
private long startTime;
@@ -46,21 +47,22 @@ public class FakeSchedulable extends Sch
this(demand, minShare, 1, 0, 0, 0);
}
- public FakeSchedulable(int demand, int minShare, double weight) {
- this(demand, minShare, weight, 0, 0, 0);
+ public FakeSchedulable(int demand, int minShare, double memoryWeight) {
+ this(demand, minShare, memoryWeight, 0, 0, 0);
}
public FakeSchedulable(int demand, int minShare, double weight, int fairShare, int usage,
long startTime) {
- this(Resources.createResource(demand), Resources.createResource(minShare), weight,
- Resources.createResource(fairShare), Resources.createResource(usage), startTime);
+ this(Resources.createResource(demand), Resources.createResource(minShare),
+ new ResourceWeights((float)weight), Resources.createResource(fairShare),
+ Resources.createResource(usage), startTime);
}
- public FakeSchedulable(Resource demand, Resource minShare, double weight, Resource fairShare,
- Resource usage, long startTime) {
+ public FakeSchedulable(Resource demand, Resource minShare, ResourceWeights weight,
+ Resource fairShare, Resource usage, long startTime) {
this.demand = demand;
this.minShare = minShare;
- this.weight = weight;
+ this.weights = weight;
setFairShare(fairShare);
this.usage = usage;
this.priority = Records.newRecord(Priority.class);
@@ -98,8 +100,8 @@ public class FakeSchedulable extends Sch
}
@Override
- public double getWeight() {
- return weight;
+ public ResourceWeights getWeights() {
+ return weights;
}
@Override
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1489072&r1=1489071&r2=1489072&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Mon Jun 3 17:36:20 2013
@@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.After;
@@ -147,12 +148,17 @@ public class TestFairScheduler {
ApplicationAttemptId.newInstance(appIdImpl, attemptId);
return attId;
}
-
-
+
private ResourceRequest createResourceRequest(int memory, String host,
int priority, int numContainers, boolean relaxLocality) {
+ return createResourceRequest(memory, 1, host, priority, numContainers,
+ relaxLocality);
+ }
+
+ private ResourceRequest createResourceRequest(int memory, int vcores, String host,
+ int priority, int numContainers, boolean relaxLocality) {
ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class);
- request.setCapability(Resources.createResource(memory));
+ request.setCapability(BuilderUtils.newResource(memory, vcores));
request.setResourceName(host);
request.setNumContainers(numContainers);
Priority prio = recordFactory.newRecordInstance(Priority.class);
@@ -170,18 +176,34 @@ public class TestFairScheduler {
String userId) {
return createSchedulingRequest(memory, queueId, userId, 1);
}
+
+ private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
+ String queueId, String userId) {
+ return createSchedulingRequest(memory, vcores, queueId, userId, 1);
+ }
private ApplicationAttemptId createSchedulingRequest(int memory, String queueId,
String userId, int numContainers) {
return createSchedulingRequest(memory, queueId, userId, numContainers, 1);
}
+
+ private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
+ String queueId, String userId, int numContainers) {
+ return createSchedulingRequest(memory, vcores, queueId, userId, numContainers, 1);
+ }
private ApplicationAttemptId createSchedulingRequest(int memory, String queueId,
String userId, int numContainers, int priority) {
+ return createSchedulingRequest(memory, 1, queueId, userId, numContainers,
+ priority);
+ }
+
+ private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
+ String queueId, String userId, int numContainers, int priority) {
ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
scheduler.addApplication(id, queueId, userId);
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
- ResourceRequest request = createResourceRequest(memory, ResourceRequest.ANY,
+ ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
priority, numContainers, true);
ask.add(request);
scheduler.allocate(id, ask, new ArrayList<ContainerId>());
@@ -451,10 +473,10 @@ public class TestFairScheduler {
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
- out.println("<minResources>1024</minResources>");
+ out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
- out.println("<minResources>2048</minResources>");
+ out.println("<minResources>2048mb,0vcores</minResources>");
out.println("</queue>");
out.println("</allocations>");
out.close();
@@ -569,11 +591,11 @@ public class TestFairScheduler {
out.println("<allocations>");
// Give queue A a minimum of 1024 M
out.println("<queue name=\"queueA\">");
- out.println("<minResources>1024</minResources>");
+ out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
// Give queue B a minimum of 2048 M
out.println("<queue name=\"queueB\">");
- out.println("<minResources>2048</minResources>");
+ out.println("<minResources>2048mb,0vcores</minResources>");
out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>");
out.println("</queue>");
// Give queue C no minimum
@@ -613,9 +635,9 @@ public class TestFairScheduler {
assertEquals(Resources.createResource(0),
queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
- assertEquals(Resources.createResource(1024),
+ assertEquals(Resources.createResource(1024, 0),
queueManager.getMinResources("root.queueA"));
- assertEquals(Resources.createResource(2048),
+ assertEquals(Resources.createResource(2048, 0),
queueManager.getMinResources("root.queueB"));
assertEquals(Resources.createResource(0),
queueManager.getMinResources("root.queueC"));
@@ -672,15 +694,15 @@ public class TestFairScheduler {
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
- out.println("<minResources>2048</minResources>");
+ out.println("<minResources>2048mb,0vcores</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
- out.println("<minResources>2048</minResources>");
+ out.println("<minResources>2048mb,0vcores</minResources>");
out.println("<queue name=\"queueC\">");
- out.println("<minResources>2048</minResources>");
+ out.println("<minResources>2048mb,0vcores</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueD\">");
- out.println("<minResources>2048</minResources>");
+ out.println("<minResources>2048mb,0vcores</minResources>");
out.println("</queue>");
out.println("</queue>");
out.println("</allocations>");
@@ -710,11 +732,11 @@ public class TestFairScheduler {
out.println("<allocations>");
// Give queue A a minimum of 1024 M
out.println("<pool name=\"queueA\">");
- out.println("<minResources>1024</minResources>");
+ out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</pool>");
// Give queue B a minimum of 2048 M
out.println("<pool name=\"queueB\">");
- out.println("<minResources>2048</minResources>");
+ out.println("<minResources>2048mb,0vcores</minResources>");
out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>");
out.println("</pool>");
// Give queue C no minimum
@@ -754,9 +776,9 @@ public class TestFairScheduler {
assertEquals(Resources.createResource(0),
queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
- assertEquals(Resources.createResource(1024),
+ assertEquals(Resources.createResource(1024, 0),
queueManager.getMinResources("root.queueA"));
- assertEquals(Resources.createResource(2048),
+ assertEquals(Resources.createResource(2048, 0),
queueManager.getMinResources("root.queueB"));
assertEquals(Resources.createResource(0),
queueManager.getMinResources("root.queueC"));
@@ -812,10 +834,10 @@ public class TestFairScheduler {
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
- out.println("<minResources>2048</minResources>");
+ out.println("<minResources>2048mb,0vcores</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
- out.println("<minResources>2048</minResources>");
+ out.println("<minResources>2048mb,0vcores</minResources>");
out.println("</queue>");
out.println("</allocations>");
out.close();
@@ -825,7 +847,7 @@ public class TestFairScheduler {
// Add one big node (only care about aggregate capacity)
RMNode node1 =
- MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024), 1,
+ MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
@@ -885,7 +907,7 @@ public class TestFairScheduler {
// Add one big node (only care about aggregate capacity)
RMNode node1 =
- MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024), 1,
+ MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
@@ -963,19 +985,19 @@ public class TestFairScheduler {
// Create four nodes
RMNode node1 =
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 1,
+ MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
RMNode node2 =
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 2,
+ MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2,
"127.0.0.2");
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
scheduler.handle(nodeEvent2);
RMNode node3 =
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 3,
+ MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3,
"127.0.0.3");
NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
scheduler.handle(nodeEvent3);
@@ -1106,19 +1128,19 @@ public class TestFairScheduler {
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
out.println("<weight>.25</weight>");
- out.println("<minResources>1024</minResources>");
+ out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<weight>.25</weight>");
- out.println("<minResources>1024</minResources>");
+ out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueC\">");
out.println("<weight>.25</weight>");
- out.println("<minResources>1024</minResources>");
+ out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueD\">");
out.println("<weight>.25</weight>");
- out.println("<minResources>1024</minResources>");
+ out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
out.print("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
out.print("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>");
@@ -1130,19 +1152,19 @@ public class TestFairScheduler {
// Create four nodes
RMNode node1 =
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 1,
+ MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
RMNode node2 =
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 2,
+ MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2,
"127.0.0.2");
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
scheduler.handle(nodeEvent2);
RMNode node3 =
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 3,
+ MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3,
"127.0.0.3");
NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
scheduler.handle(nodeEvent3);
@@ -1206,19 +1228,19 @@ public class TestFairScheduler {
// After minSharePreemptionTime has passed, they should want to preempt min
// share.
clock.tick(6);
- assertTrue(Resources.equals(
- Resources.createResource(1024), scheduler.resToPreempt(schedC, clock.getTime())));
- assertTrue(Resources.equals(
- Resources.createResource(1024), scheduler.resToPreempt(schedD, clock.getTime())));
+ assertEquals(
+ 1024, scheduler.resToPreempt(schedC, clock.getTime()).getMemory());
+ assertEquals(
+ 1024, scheduler.resToPreempt(schedD, clock.getTime()).getMemory());
// After fairSharePreemptionTime has passed, they should want to preempt
// fair share.
scheduler.update();
clock.tick(6);
- assertTrue(Resources.equals(
- Resources.createResource(1536), scheduler.resToPreempt(schedC, clock.getTime())));
- assertTrue(Resources.equals(
- Resources.createResource(1536), scheduler.resToPreempt(schedD, clock.getTime())));
+ assertEquals(
+ 1536 , scheduler.resToPreempt(schedC, clock.getTime()).getMemory());
+ assertEquals(
+ 1536, scheduler.resToPreempt(schedD, clock.getTime()).getMemory());
}
@Test (timeout = 5000)
@@ -1271,7 +1293,7 @@ public class TestFairScheduler {
// Add a node
RMNode node1 =
MockNodes
- .newNodeInfo(1, Resources.createResource(8192), 1, "127.0.0.1");
+ .newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
@@ -1443,7 +1465,7 @@ public class TestFairScheduler {
public void testFifoWithinQueue() throws Exception {
RMNode node1 =
MockNodes
- .newNodeInfo(1, Resources.createResource(3072), 1, "127.0.0.1");
+ .newNodeInfo(1, Resources.createResource(3072, 3), 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
@@ -1488,7 +1510,7 @@ public class TestFairScheduler {
.setPolicy(SchedulingPolicy.getDefault());
RMNode node =
- MockNodes.newNodeInfo(1, Resources.createResource(16384), 0,
+ MockNodes.newNodeInfo(1, Resources.createResource(16384, 16), 0,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
@@ -1536,10 +1558,10 @@ public class TestFairScheduler {
RMNode node1 =
MockNodes
- .newNodeInfo(1, Resources.createResource(8192), 1, "127.0.0.1");
+ .newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1");
RMNode node2 =
MockNodes
- .newNodeInfo(1, Resources.createResource(8192), 2, "127.0.0.2");
+ .newNodeInfo(1, Resources.createResource(8192, 8), 2, "127.0.0.2");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
@@ -1685,7 +1707,8 @@ public class TestFairScheduler {
public void testRemoveNodeUpdatesRootQueueMetrics() {
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
- RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
+ RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1,
+ "127.0.0.1");
NodeAddedSchedulerEvent addEvent = new NodeAddedSchedulerEvent(node1);
scheduler.handle(addEvent);
@@ -1824,4 +1847,157 @@ public class TestFairScheduler {
scheduler.handle(nodeUpdateEvent);
assertEquals(0, app.getReservedContainers().size());
}
+
+ public void testNoMoreCpuOnNode() {
+ RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 1),
+ 1, "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ ApplicationAttemptId attId = createSchedulingRequest(1024, 1, "default",
+ "user1", 2);
+ FSSchedulerApp app = scheduler.applications.get(attId);
+ scheduler.update();
+
+ NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
+ scheduler.handle(updateEvent);
+ assertEquals(1, app.getLiveContainers().size());
+ scheduler.handle(updateEvent);
+ assertEquals(1, app.getLiveContainers().size());
+ }
+
+ public void testBasicDRFAssignment() throws Exception {
+ RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 5));
+ NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
+ scheduler.handle(nodeEvent);
+
+ ApplicationAttemptId appAttId1 = createSchedulingRequest(2048, 1, "queue1",
+ "user1", 2);
+ FSSchedulerApp app1 = scheduler.applications.get(appAttId1);
+ ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 2, "queue1",
+ "user1", 2);
+ FSSchedulerApp app2 = scheduler.applications.get(appAttId2);
+
+ DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
+ drfPolicy.initialize(scheduler.getClusterCapacity());
+ scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
+ scheduler.update();
+
+ // First both apps get a container
+ // Then the first gets another container because its dominant share of
+ // 2048/8192 is less than the other's of 2/5
+ NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
+ scheduler.handle(updateEvent);
+ Assert.assertEquals(1, app1.getLiveContainers().size());
+ Assert.assertEquals(0, app2.getLiveContainers().size());
+
+ scheduler.handle(updateEvent);
+ Assert.assertEquals(1, app1.getLiveContainers().size());
+ Assert.assertEquals(1, app2.getLiveContainers().size());
+
+ scheduler.handle(updateEvent);
+ Assert.assertEquals(2, app1.getLiveContainers().size());
+ Assert.assertEquals(1, app2.getLiveContainers().size());
+ }
+
+ /**
+ * Two apps on one queue, one app on another
+ */
+ @Test
+ public void testBasicDRFWithQueues() throws Exception {
+ RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 7),
+ 1, "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
+ scheduler.handle(nodeEvent);
+
+ ApplicationAttemptId appAttId1 = createSchedulingRequest(3072, 1, "queue1",
+ "user1", 2);
+ FSSchedulerApp app1 = scheduler.applications.get(appAttId1);
+ ApplicationAttemptId appAttId2 = createSchedulingRequest(2048, 2, "queue1",
+ "user1", 2);
+ FSSchedulerApp app2 = scheduler.applications.get(appAttId2);
+ ApplicationAttemptId appAttId3 = createSchedulingRequest(1024, 2, "queue2",
+ "user1", 2);
+ FSSchedulerApp app3 = scheduler.applications.get(appAttId3);
+
+ DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
+ drfPolicy.initialize(scheduler.getClusterCapacity());
+ scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy);
+ scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
+ scheduler.update();
+
+ NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
+ scheduler.handle(updateEvent);
+ Assert.assertEquals(1, app1.getLiveContainers().size());
+ scheduler.handle(updateEvent);
+ Assert.assertEquals(1, app3.getLiveContainers().size());
+ scheduler.handle(updateEvent);
+ Assert.assertEquals(2, app3.getLiveContainers().size());
+ scheduler.handle(updateEvent);
+ Assert.assertEquals(1, app2.getLiveContainers().size());
+ }
+
+ @Test
+ public void testDRFHierarchicalQueues() throws Exception {
+ RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(12288, 12),
+ 1, "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
+ scheduler.handle(nodeEvent);
+
+ ApplicationAttemptId appAttId1 = createSchedulingRequest(3074, 1, "queue1.subqueue1",
+ "user1", 2);
+ Thread.sleep(3); // so that start times will be different
+ FSSchedulerApp app1 = scheduler.applications.get(appAttId1);
+ ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 3, "queue1.subqueue1",
+ "user1", 2);
+ Thread.sleep(3); // so that start times will be different
+ FSSchedulerApp app2 = scheduler.applications.get(appAttId2);
+ ApplicationAttemptId appAttId3 = createSchedulingRequest(2048, 2, "queue1.subqueue2",
+ "user1", 2);
+ Thread.sleep(3); // so that start times will be different
+ FSSchedulerApp app3 = scheduler.applications.get(appAttId3);
+ ApplicationAttemptId appAttId4 = createSchedulingRequest(1024, 2, "queue2",
+ "user1", 2);
+ Thread.sleep(3); // so that start times will be different
+ FSSchedulerApp app4 = scheduler.applications.get(appAttId4);
+
+ DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
+ drfPolicy.initialize(scheduler.getClusterCapacity());
+ scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy);
+ scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
+ scheduler.getQueueManager().getQueue("queue1.subqueue1").setPolicy(drfPolicy);
+ scheduler.update();
+
+ NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
+ scheduler.handle(updateEvent);
+ // app1 gets first container because it asked first
+ Assert.assertEquals(1, app1.getLiveContainers().size());
+ scheduler.handle(updateEvent);
+ // app4 gets second container because it's on queue2
+ Assert.assertEquals(1, app4.getLiveContainers().size());
+ scheduler.handle(updateEvent);
+ // app4 gets another container because queue2's dominant share of memory
+ // is still less than queue1's of cpu
+ Assert.assertEquals(2, app4.getLiveContainers().size());
+ scheduler.handle(updateEvent);
+ // app3 gets one because queue1 gets one and queue1.subqueue2 is behind
+ // queue1.subqueue1
+ Assert.assertEquals(1, app3.getLiveContainers().size());
+ scheduler.handle(updateEvent);
+ // app4 would get another one, but it doesn't have any requests
+ // queue1.subqueue2 is still using less than queue1.subqueue1, so it
+ // gets another
+ Assert.assertEquals(2, app3.getLiveContainers().size());
+ // queue1.subqueue1 is behind again, so it gets one, which it gives to app2
+ scheduler.handle(updateEvent);
+ Assert.assertEquals(1, app2.getLiveContainers().size());
+
+ // at this point, we've used all our CPU up, so nobody else should get a container
+ scheduler.handle(updateEvent);
+
+ Assert.assertEquals(1, app1.getLiveContainers().size());
+ Assert.assertEquals(1, app2.getLiveContainers().size());
+ Assert.assertEquals(2, app3.getLiveContainers().size());
+ Assert.assertEquals(2, app4.getLiveContainers().size());
+ }
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java?rev=1489072&r1=1489071&r2=1489072&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java Mon Jun 3 17:36:20 2013
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.re
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.junit.Test;
@@ -49,6 +50,11 @@ public class TestSchedulingPolicy {
assertTrue("Invalid scheduler name",
sm.getName().equals(FairSharePolicy.NAME));
+ // Shortname - drf
+ sm = SchedulingPolicy.parse("drf");
+ assertTrue("Invalid scheduler name",
+ sm.getName().equals(DominantResourceFairnessPolicy.NAME));
+
// Shortname - fair
sm = SchedulingPolicy.parse("fair");
assertTrue("Invalid scheduler name",
@@ -93,7 +99,20 @@ public class TestSchedulingPolicy {
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_PARENT));
assertTrue(ERR,
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ANY));
-
+
+ // drf
+ policy = SchedulingPolicy.parse("drf");
+ assertTrue(ERR,
+ SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_LEAF));
+ assertTrue(ERR, SchedulingPolicy.isApplicableTo(policy,
+ SchedulingPolicy.DEPTH_INTERMEDIATE));
+ assertTrue(ERR,
+ SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ROOT));
+ assertTrue(ERR,
+ SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_PARENT));
+ assertTrue(ERR,
+ SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ANY));
+
policy = Mockito.mock(SchedulingPolicy.class);
Mockito.when(policy.getApplicableDepth()).thenReturn(
SchedulingPolicy.DEPTH_PARENT);
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm?rev=1489072&r1=1489071&r2=1489072&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm Mon Jun 3 17:36:20 2013
@@ -31,17 +31,18 @@ Hadoop MapReduce Next Generation - Fair
* {Introduction}
Fair scheduling is a method of assigning resources to applications such that
- all apps get, on average, an equal share of resources over time.
- Hadoop NextGen is capable of scheduling multiple resource types, such as
- Memory and CPU. Currently only memory is supported, so a "cluster share" is
- a proportion of aggregate memory in the cluster. When there is a single app
- running, that app uses the entire cluster. When other apps are submitted,
- resources that free up are assigned to the new apps, so that each app gets
- roughly the same amount of resources. Unlike the default Hadoop scheduler,
- which forms a queue of apps, this lets short apps finish in reasonable time
- while not starving long-lived apps. It is also a reasonable way to share a
- cluster between a number of users. Finally, fair sharing can also work with
- app priorities - the priorities are used as weights to determine the
+ all apps get, on average, an equal share of resources over time.
+ Hadoop NextGen is capable of scheduling multiple resource types. By default,
+ the Fair Scheduler bases scheduling fairness decisions only on memory. It
+ can be configured to schedule with both memory and CPU, using the notion
+ of Dominant Resource Fairness developed by Ghodsi et al. When there is a
+ single app running, that app uses the entire cluster. When other apps are
+ submitted, resources that free up are assigned to the new apps, so that each
+ app eventually on gets roughly the same amount of resources. Unlike the default
+ Hadoop scheduler, which forms a queue of apps, this lets short apps finish in
+ reasonable time while not starving long-lived apps. It is also a reasonable way
+ to share a cluster between a number of users. Finally, fair sharing can also
+ work with app priorities - the priorities are used as weights to determine the
fraction of total resources that each app should get.
The scheduler organizes apps further into "queues", and shares resources
@@ -49,9 +50,10 @@ Hadoop MapReduce Next Generation - Fair
called âdefaultâ. If an app specifically lists a queue in a container
resource request, the request is submitted to that queue. It is also
possible to assign queues based on the user name included with the request
- through configuration. Within each queue, fair sharing is used to share
- capacity between the running apps. queues can also be given weights to share
- the cluster non-proportionally in the config file.
+ through configuration. Within each queue, a scheduling policy is used to share
+ resources between the running apps. The default is memory-based fair sharing,
+ but FIFO and multi-resource with Dominant Resource Fairness can also be
+ configured. Queues can be configured with weights to share the cluster non-evenly.
The fair scheduler supports hierarchical queues. All queues descend from a
queue named "root". Available resources are distributed among the children
@@ -120,14 +122,6 @@ Hadoop MapReduce Next Generation - Fair
queues and their properties, in addition to certain policy defaults. This file
must be in XML format as described in the next section.
- * <<<yarn.scheduler.fair.minimum-allocation-mb>>>
-
- * The smallest container size the scheduler can allocate, in MB of memory.
-
- * <<<yarn.scheduler.fair.maximum-allocation-mb>>>
-
- * The largest container the scheduler can allocate, in MB of memory.
-
* <<<yarn.scheduler.fair.user-as-default-queue>>>
* Whether to use the username associated with the allocation as the default
@@ -183,17 +177,23 @@ Allocation file format
* <<Queue elements>>, which represent queues. Each may contain the following
properties:
- * minResources: minimum MB of aggregate memory the queue expects. If a queue
- demands resources, and its current allocation is below its configured minimum,
- it will be assigned available resources before any queue that is not in this
- situation. If multiple queues are in this situation, resources go to the
- queue with the smallest ratio between allocation and minimum. Note that it is
- possible that a queue that is below its minimum may not immediately get up to
- its minimum when it submits an application, because already-running jobs may
- be using those resources.
-
- * maxResources: maximum MB of aggregate memory a queue is allowed. A queue
- will never be assigned a container that would put it over this limit.
+ * minResources: minimum resources the queue is entitled to, in the form
+ "X mb, Y vcores". If a queue's minimum share is not satisfied, it will be
+ offered available resources before any other queue under the same parent.
+ Under the single-resource fairness policy, a queue
+ is considered unsatisfied if its memory usage is below its minimum memory
+ share. Under dominant resource fairness, a queue is considered unsatisfied
+ if its usage for its dominant resource with respect to the cluster capacity
+ is below its minimum share for that resource. If multiple queues are
+ unsatisfied in this situation, resources go to the queue with the smallest
+ ratio between relevant resource usage and minimum. Note that it is
+ possible that a queue that is below its minimum may not immediately get up
+ to its minimum when it submits an application, because already-running jobs
+ may be using those resources.
+
+ * maxResources: maximum resources a queue is allowed, in the form
+ "X mb, Y vcores". A queue will never be assigned a container that would
+ put its aggregate usage over this limit.
* maxRunningApps: limit the number of apps from the queue to run at once
@@ -232,13 +232,13 @@ Allocation file format
<?xml version="1.0"?>
<allocations>
<queue name="sample_queue">
- <minResources>10000</minResources>
- <maxResources>90000</maxResources>
+ <minResources>10000 mb</minResources>
+ <maxResources>90000 mb</maxResources>
<maxRunningApps>50</maxRunningApps>
<weight>2.0</weight>
<schedulingMode>fair</schedulingMode>
<queue name="sample_sub_queue">
- <minResources>5000</minResources>
+ <minResources>5000 mb</minResources>
</queue>
</queue>
<user name="sample_user">