You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by tu...@apache.org on 2013/06/03 19:33:56 UTC
svn commit: r1489070 [1/2] - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/
hadoop-yarn/hadoop-yarn-server/hadoop-y...
Author: tucu
Date: Mon Jun 3 17:33:55 2013
New Revision: 1489070
URL: http://svn.apache.org/r1489070
Log:
YARN-326. Add multi-resource scheduling to the fair scheduler. (sandyr via tucu)
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceType.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceWeights.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerConfiguration.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1489070&r1=1489069&r2=1489070&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Mon Jun 3 17:33:55 2013
@@ -108,6 +108,9 @@ Release 2.1.0-beta - UNRELEASED
YARN-392. Make it possible to specify hard locality constraints in resource
requests. (sandyr via tucu)
+ YARN-326. Add multi-resource scheduling to the fair scheduler.
+ (sandyr via tucu)
+
IMPROVEMENTS
YARN-365. Change NM heartbeat handling to not generate a scheduler event
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceType.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceType.java?rev=1489070&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceType.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceType.java Mon Jun 3 17:33:55 2013
@@ -0,0 +1,28 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.resource;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Private
+@Evolving
+public enum ResourceType {
+ MEMORY, CPU
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceWeights.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceWeights.java?rev=1489070&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceWeights.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceWeights.java Mon Jun 3 17:33:55 2013
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.resource;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Private
+@Evolving
+public class ResourceWeights {
+ public static final ResourceWeights NEUTRAL = new ResourceWeights(1.0f);
+
+ private float[] weights = new float[ResourceType.values().length];
+
+ public ResourceWeights(float memoryWeight, float cpuWeight) {
+ weights[ResourceType.MEMORY.ordinal()] = memoryWeight;
+ weights[ResourceType.CPU.ordinal()] = cpuWeight;
+ }
+
+ public ResourceWeights(float weight) {
+ for (int i = 0; i < weights.length; i++) {
+ weights[i] = weight;
+ }
+ }
+
+ public ResourceWeights() { }
+
+ public void setWeight(ResourceType resourceType, float weight) {
+ weights[resourceType.ordinal()] = weight;
+ }
+
+ public float getWeight(ResourceType resourceType) {
+ return weights[resourceType.ordinal()];
+ }
+
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append("<");
+ for (int i = 0; i < ResourceType.values().length; i++) {
+ if (i != 0) {
+ sb.append(", ");
+ }
+ ResourceType resourceType = ResourceType.values()[i];
+ sb.append(resourceType.name().toLowerCase());
+ sb.append(String.format(" weight=%.1f", getWeight(resourceType)));
+ }
+ sb.append(">");
+ return sb.toString();
+ }
+}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java?rev=1489070&r1=1489069&r2=1489070&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java Mon Jun 3 17:33:55 2013
@@ -32,4 +32,8 @@ public class AllocationConfigurationExce
public AllocationConfigurationException(String message) {
super(message);
}
+
+ public AllocationConfigurationException(String message, Throwable t) {
+ super(message, t);
+ }
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java?rev=1489070&r1=1489069&r2=1489070&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java Mon Jun 3 17:33:55 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
@@ -119,7 +120,7 @@ public class AppSchedulable extends Sche
}
@Override
- public double getWeight() {
+ public ResourceWeights getWeights() {
return scheduler.getAppWeight(this);
}
@@ -237,10 +238,7 @@ public class AppSchedulable extends Sche
}
// Can we allocate a container on this node?
- int availableContainers =
- available.getMemory() / capability.getMemory();
-
- if (availableContainers > 0) {
+ if (Resources.fitsIn(capability, available)) {
// Inform the application of the new container for this request
RMContainer allocatedContainer =
app.allocate(type, node, priority, request, container);
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java?rev=1489070&r1=1489069&r2=1489070&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java Mon Jun 3 17:33:55 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@@ -84,7 +85,7 @@ public abstract class FSQueue extends Sc
throws AllocationConfigurationException;
@Override
- public double getWeight() {
+ public ResourceWeights getWeights() {
return queueMgr.getQueueWeight(getName());
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java?rev=1489070&r1=1489069&r2=1489070&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java Mon Jun 3 17:33:55 2013
@@ -180,8 +180,8 @@ public class FSSchedulerNode extends Sch
@Override
public String toString() {
return "host: " + rmNode.getNodeAddress() + " #containers=" + getNumContainers() +
- " available=" + getAvailableResource().getMemory() +
- " used=" + getUsedResource().getMemory();
+ " available=" + getAvailableResource() +
+ " used=" + getUsedResource();
}
@Override
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1489070&r1=1489069&r2=1489070&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Mon Jun 3 17:33:55 2013
@@ -57,8 +57,10 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -495,14 +497,14 @@ public class FairScheduler implements Re
}
// synchronized for sizeBasedWeight
- public synchronized double getAppWeight(AppSchedulable app) {
+ public synchronized ResourceWeights getAppWeight(AppSchedulable app) {
if (!app.getRunnable()) {
// Job won't launch tasks, but don't return 0 to avoid division errors
- return 1.0;
+ return ResourceWeights.NEUTRAL;
} else {
double weight = 1.0;
if (sizeBasedWeight) {
- // Set weight based on current demand
+ // Set weight based on current memory demand
weight = Math.log1p(app.getDemand().getMemory()) / Math.log(2);
}
weight *= app.getPriority().getPriority();
@@ -510,7 +512,7 @@ public class FairScheduler implements Re
// Run weight through the user-supplied weightAdjuster
weight = weightAdjuster.adjustWeight(app, weight);
}
- return weight;
+ return new ResourceWeights((float)weight);
}
}
@@ -714,37 +716,6 @@ public class FairScheduler implements Re
" cluster capacity: " + clusterCapacity);
}
- /**
- * Utility method to normalize a list of resource requests, by ensuring that
- * the memory for each request is a multiple of minMemory and is not zero.
- *
- * @param asks a list of resource requests
- * @param minMemory the configured minimum memory allocation
- * @param maxMemory the configured maximum memory allocation
- */
- static void normalizeRequests(List<ResourceRequest> asks,
- int minMemory, int maxMemory) {
- for (ResourceRequest ask : asks) {
- normalizeRequest(ask, minMemory, maxMemory);
- }
- }
-
- /**
- * Utility method to normalize a resource request, by ensuring that the
- * requested memory is a multiple of minMemory and is not zero.
- *
- * @param ask the resource request
- * @param minMemory the configured minimum memory allocation
- * @param maxMemory the configured maximum memory allocation
- */
- static void normalizeRequest(ResourceRequest ask, int minMemory,
- int maxMemory) {
- int memory = Math.max(ask.getCapability().getMemory(), minMemory);
- int normalizedMemory =
- minMemory * ((memory / minMemory) + (memory % minMemory > 0 ? 1 : 0));
- ask.getCapability().setMemory(Math.min(normalizedMemory, maxMemory));
- }
-
@Override
public Allocation allocate(ApplicationAttemptId appAttemptId,
List<ResourceRequest> ask, List<ContainerId> release) {
@@ -758,8 +729,8 @@ public class FairScheduler implements Re
}
// Sanity check
- normalizeRequests(ask, minimumAllocation.getMemory(),
- maximumAllocation.getMemory());
+ SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(),
+ clusterCapacity, minimumAllocation, maximumAllocation);
// Release containers
for (ContainerId releasedContainerId : release) {
@@ -1015,8 +986,8 @@ public class FairScheduler implements Re
public synchronized void reinitialize(Configuration conf, RMContext rmContext)
throws IOException {
this.conf = new FairSchedulerConfiguration(conf);
- minimumAllocation = this.conf.getMinimumMemoryAllocation();
- maximumAllocation = this.conf.getMaximumMemoryAllocation();
+ minimumAllocation = this.conf.getMinimumAllocation();
+ maximumAllocation = this.conf.getMaximumAllocation();
userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
rackLocalityThreshold = this.conf.getLocalityThresholdRack();
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java?rev=1489070&r1=1489069&r2=1489070&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java Mon Jun 3 17:33:55 2013
@@ -18,12 +18,16 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.File;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.util.BuilderUtils;
@Private
@Evolving
@@ -78,18 +82,24 @@ public class FairSchedulerConfiguration
addResource(FS_CONFIGURATION_FILE);
}
- public Resource getMinimumMemoryAllocation() {
+ public Resource getMinimumAllocation() {
int mem = getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
- return Resources.createResource(mem);
+ int cpu = getInt(
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
+ return Resources.createResource(mem, cpu);
}
- public Resource getMaximumMemoryAllocation() {
+ public Resource getMaximumAllocation() {
int mem = getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
- return Resources.createResource(mem);
+ int cpu = getInt(
+ YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+ return Resources.createResource(mem, cpu);
}
public boolean getUserAsDefaultQueue() {
@@ -136,4 +146,34 @@ public class FairSchedulerConfiguration
public int getWaitTimeBeforeKill() {
return getInt(WAIT_TIME_BEFORE_KILL, DEFAULT_WAIT_TIME_BEFORE_KILL);
}
+
+ /**
+ * Parses a resource config value of a form like "1024", "1024 mb",
+ * or "1024 mb, 3 vcores". If no units are given, megabytes are assumed.
+ *
+ * @throws AllocationConfigurationException
+ */
+ public static Resource parseResourceConfigValue(String val)
+ throws AllocationConfigurationException {
+ try {
+ int memory = findResource(val, "mb");
+ int vcores = findResource(val, "vcores");
+ return BuilderUtils.newResource(memory, vcores);
+ } catch (AllocationConfigurationException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new AllocationConfigurationException(
+ "Error reading resource config", ex);
+ }
+ }
+
+ private static int findResource(String val, String units)
+ throws AllocationConfigurationException {
+ Pattern pattern = Pattern.compile("(\\d+) ?" + units);
+ Matcher matcher = pattern.matcher(val);
+ if (!matcher.find()) {
+ throw new AllocationConfigurationException("Missing resource: " + units);
+ }
+ return Integer.parseInt(matcher.group(1));
+ }
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1489070&r1=1489069&r2=1489070&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Mon Jun 3 17:33:55 2013
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
@@ -301,7 +302,7 @@ public class QueueManager {
Map<String, Resource> maxQueueResources = new HashMap<String, Resource>();
Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
- Map<String, Double> queueWeights = new HashMap<String, Double>();
+ Map<String, ResourceWeights> queueWeights = new HashMap<String, ResourceWeights>();
Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, SchedulingPolicy>();
Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
Map<String, Map<QueueACL, AccessControlList>> queueAcls =
@@ -415,7 +416,7 @@ public class QueueManager {
*/
private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources,
Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
- Map<String, Integer> userMaxApps, Map<String, Double> queueWeights,
+ Map<String, Integer> userMaxApps, Map<String, ResourceWeights> queueWeights,
Map<String, SchedulingPolicy> queuePolicies,
Map<String, Long> minSharePreemptionTimeouts,
Map<String, Map<QueueACL, AccessControlList>> queueAcls, List<String> queueNamesInAllocFile)
@@ -433,12 +434,12 @@ public class QueueManager {
Element field = (Element) fieldNode;
if ("minResources".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
- int val = Integer.parseInt(text);
- minQueueResources.put(queueName, Resources.createResource(val));
+ Resource val = FairSchedulerConfiguration.parseResourceConfigValue(text);
+ minQueueResources.put(queueName, val);
} else if ("maxResources".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
- int val = Integer.parseInt(text);
- maxQueueResources.put(queueName, Resources.createResource(val));
+ Resource val = FairSchedulerConfiguration.parseResourceConfigValue(text);
+ maxQueueResources.put(queueName, val);
} else if ("maxRunningApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
@@ -446,7 +447,7 @@ public class QueueManager {
} else if ("weight".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
double val = Double.parseDouble(text);
- queueWeights.put(queueName, val);
+ queueWeights.put(queueName, new ResourceWeights((float)val));
} else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
@@ -454,7 +455,9 @@ public class QueueManager {
} else if ("schedulingPolicy".equals(field.getTagName())
|| "schedulingMode".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
- queuePolicies.put(queueName, SchedulingPolicy.parse(text));
+ SchedulingPolicy policy = SchedulingPolicy.parse(text);
+ policy.initialize(scheduler.getClusterCapacity());
+ queuePolicies.put(queueName, policy);
} else if ("aclSubmitApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
@@ -510,13 +513,20 @@ public class QueueManager {
}
/**
- * Get a collection of all queues
+ * Get a collection of all leaf queues
*/
public Collection<FSLeafQueue> getLeafQueues() {
synchronized (queues) {
return leafQueues;
}
}
+
+ /**
+ * Get a collection of all queues
+ */
+ public Collection<FSQueue> getQueues() {
+ return queues.values();
+ }
public int getUserMaxApps(String user) {
// save current info in case it gets changed under us
@@ -538,12 +548,12 @@ public class QueueManager {
}
}
- public double getQueueWeight(String queue) {
- Double weight = info.queueWeights.get(queue);
+ public ResourceWeights getQueueWeight(String queue) {
+ ResourceWeights weight = info.queueWeights.get(queue);
if (weight != null) {
return weight;
} else {
- return 1.0;
+ return ResourceWeights.NEUTRAL;
}
}
@@ -595,7 +605,7 @@ public class QueueManager {
// Maximum amount of resources per queue
public final Map<String, Resource> maxQueueResources;
// Sharing weights for each queue
- public final Map<String, Double> queueWeights;
+ public final Map<String, ResourceWeights> queueWeights;
// Max concurrent running applications for each queue and for each user; in addition,
// for users that have no max specified, we use the userMaxJobsDefault.
@@ -625,7 +635,7 @@ public class QueueManager {
public QueueManagerInfo(Map<String, Resource> minQueueResources,
Map<String, Resource> maxQueueResources,
Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
- Map<String, Double> queueWeights, int userMaxAppsDefault,
+ Map<String, ResourceWeights> queueWeights, int userMaxAppsDefault,
int queueMaxAppsDefault, SchedulingPolicy defaultSchedulingPolicy,
Map<String, Long> minSharePreemptionTimeouts,
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
@@ -647,7 +657,7 @@ public class QueueManager {
public QueueManagerInfo() {
minQueueResources = new HashMap<String, Resource>();
maxQueueResources = new HashMap<String, Resource>();
- queueWeights = new HashMap<String, Double>();
+ queueWeights = new HashMap<String, ResourceWeights>();
queueMaxApps = new HashMap<String, Integer>();
userMaxApps = new HashMap<String, Integer>();
userMaxAppsDefault = Integer.MAX_VALUE;
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java?rev=1489070&r1=1489069&r2=1489070&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java Mon Jun 3 17:33:55 2013
@@ -22,6 +22,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
/**
@@ -80,7 +81,7 @@ public abstract class Schedulable {
/** Job/queue weight in fair sharing. */
- public abstract double getWeight();
+ public abstract ResourceWeights getWeights();
/** Start time for jobs in FIFO queues; meaningless for QueueSchedulables.*/
public abstract long getStartTime();
@@ -110,7 +111,7 @@ public abstract class Schedulable {
/** Convenient toString implementation for debugging. */
@Override
public String toString() {
- return String.format("[%s, demand=%s, running=%s, share=%s,], w=%.1f]",
- getName(), getDemand(), getResourceUsage(), fairShare, getWeight());
+ return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]",
+ getName(), getDemand(), getResourceUsage(), fairShare, getWeights());
}
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java?rev=1489070&r1=1489069&r2=1489070&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java Mon Jun 3 17:33:55 2013
@@ -25,6 +25,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
@@ -67,11 +68,12 @@ public abstract class SchedulingPolicy {
/**
* Returns {@link SchedulingPolicy} instance corresponding to the
* {@link SchedulingPolicy} passed as a string. The policy can be "fair" for
- * FairsharePolicy or "fifo" for FifoPolicy. For custom
+ * FairSharePolicy, "fifo" for FifoPolicy, or "drf" for
+ * DominantResourceFairnessPolicy. For a custom
* {@link SchedulingPolicy}s in the RM classpath, the policy should be
* canonical class name of the {@link SchedulingPolicy}.
*
- * @param policy canonical class name or "fair" or "fifo"
+ * @param policy canonical class name or "drf" or "fair" or "fifo"
* @throws AllocationConfigurationException
*/
@SuppressWarnings("unchecked")
@@ -80,10 +82,12 @@ public abstract class SchedulingPolicy {
@SuppressWarnings("rawtypes")
Class clazz;
String text = policy.toLowerCase();
- if (text.equals("fair")) {
+ if (text.equalsIgnoreCase(FairSharePolicy.NAME)) {
clazz = FairSharePolicy.class;
- } else if (text.equals("fifo")) {
+ } else if (text.equalsIgnoreCase(FifoPolicy.NAME)) {
clazz = FifoPolicy.class;
+ } else if (text.equalsIgnoreCase(DominantResourceFairnessPolicy.NAME)) {
+ clazz = DominantResourceFairnessPolicy.class;
} else {
try {
clazz = Class.forName(policy);
@@ -98,6 +102,8 @@ public abstract class SchedulingPolicy {
}
return getInstance(clazz);
}
+
+ public void initialize(Resource clusterCapacity) {}
/**
* @return returns the name of {@link SchedulingPolicy}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java?rev=1489070&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java Mon Jun 3 17:33:55 2013
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies;
+
+import java.util.Collection;
+import java.util.Comparator;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType.*;
+
+/**
+ * Makes scheduling decisions by trying to equalize dominant resource usage.
+ * A schedulable's dominant resource usage is the largest ratio of resource
+ * usage to capacity among the resource types it is using.
+ */
+@Private
+@Unstable
+public class DominantResourceFairnessPolicy extends SchedulingPolicy {
+
+ public static final String NAME = "DRF";
+
+ private DominantResourceFairnessComparator comparator =
+ new DominantResourceFairnessComparator();
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+
+ @Override
+ public byte getApplicableDepth() {
+ return SchedulingPolicy.DEPTH_ANY;
+ }
+
+ @Override
+ public Comparator<Schedulable> getComparator() {
+ return comparator;
+ }
+
+ @Override
+ public void computeShares(Collection<? extends Schedulable> schedulables,
+ Resource totalResources) {
+
+ // TODO: For now, set all fair shares to 0, because, in the context of DRF,
+ // it doesn't make sense to set a value for each resource. YARN-736 should
+ // add in a sensible replacement.
+
+ for (Schedulable schedulable : schedulables) {
+ schedulable.setFairShare(Resources.none());
+ }
+ }
+
+ @Override
+ public void initialize(Resource clusterCapacity) {
+ comparator.setClusterCapacity(clusterCapacity);
+ }
+
+ public static class DominantResourceFairnessComparator implements Comparator<Schedulable> {
+ private static final int NUM_RESOURCES = ResourceType.values().length;
+
+ private Resource clusterCapacity;
+
+ public void setClusterCapacity(Resource clusterCapacity) {
+ this.clusterCapacity = clusterCapacity;
+ }
+
+ @Override
+ public int compare(Schedulable s1, Schedulable s2) {
+ ResourceWeights sharesOfCluster1 = new ResourceWeights();
+ ResourceWeights sharesOfCluster2 = new ResourceWeights();
+ ResourceWeights sharesOfMinShare1 = new ResourceWeights();
+ ResourceWeights sharesOfMinShare2 = new ResourceWeights();
+ ResourceType[] resourceOrder1 = new ResourceType[NUM_RESOURCES];
+ ResourceType[] resourceOrder2 = new ResourceType[NUM_RESOURCES];
+
+ // Calculate shares of the cluster for each resource both schedulables.
+ calculateShares(s1.getResourceUsage(),
+ clusterCapacity, sharesOfCluster1, resourceOrder1, s1.getWeights());
+ calculateShares(s1.getResourceUsage(),
+ s1.getMinShare(), sharesOfMinShare1, null, ResourceWeights.NEUTRAL);
+ calculateShares(s2.getResourceUsage(),
+ clusterCapacity, sharesOfCluster2, resourceOrder2, s2.getWeights());
+ calculateShares(s2.getResourceUsage(),
+ s2.getMinShare(), sharesOfMinShare2, null, ResourceWeights.NEUTRAL);
+
+ // A queue is needy for its min share if its dominant resource
+ // (with respect to the cluster capacity) is below its configured min share
+ // for that resource
+ boolean s1Needy = sharesOfMinShare1.getWeight(resourceOrder1[0]) < 1.0f;
+ boolean s2Needy = sharesOfMinShare2.getWeight(resourceOrder2[0]) < 1.0f;
+
+ int res = 0;
+ if (!s2Needy && !s1Needy) {
+ res = compareShares(sharesOfCluster1, sharesOfCluster2,
+ resourceOrder1, resourceOrder2);
+ } else if (s1Needy && !s2Needy) {
+ res = -1;
+ } else if (s2Needy && !s1Needy) {
+ res = 1;
+ } else { // both are needy below min share
+ res = compareShares(sharesOfMinShare1, sharesOfMinShare2,
+ resourceOrder1, resourceOrder2);
+ }
+ if (res == 0) {
+ // Apps are tied in fairness ratio. Break the tie by submit time.
+ res = (int)(s1.getStartTime() - s2.getStartTime());
+ }
+ return res;
+ }
+
+ /**
+ * Calculates and orders a resource's share of a pool in terms of two vectors.
+ * The shares vector contains, for each resource, the fraction of the pool that
+ * it takes up. The resourceOrder vector contains an ordering of resources
+ * by largest share. So if resource=<10 MB, 5 CPU>, and pool=<100 MB, 10 CPU>,
+ * shares will be [.1, .5] and resourceOrder will be [CPU, MEMORY].
+ */
+ void calculateShares(Resource resource, Resource pool,
+ ResourceWeights shares, ResourceType[] resourceOrder, ResourceWeights weights) {
+ shares.setWeight(MEMORY, (float)resource.getMemory() /
+ (pool.getMemory() * weights.getWeight(MEMORY)));
+ shares.setWeight(CPU, (float)resource.getVirtualCores() /
+ (pool.getVirtualCores() * weights.getWeight(CPU)));
+ // sort order vector by resource share
+ if (resourceOrder != null) {
+ if (shares.getWeight(MEMORY) > shares.getWeight(CPU)) {
+ resourceOrder[0] = MEMORY;
+ resourceOrder[1] = CPU;
+ } else {
+ resourceOrder[0] = CPU;
+ resourceOrder[1] = MEMORY;
+ }
+ }
+ }
+
+ private int compareShares(ResourceWeights shares1, ResourceWeights shares2,
+ ResourceType[] resourceOrder1, ResourceType[] resourceOrder2) {
+ for (int i = 0; i < resourceOrder1.length; i++) {
+ int ret = (int)Math.signum(shares1.getWeight(resourceOrder1[i])
+ - shares2.getWeight(resourceOrder2[i]));
+ if (ret != 0) {
+ return ret;
+ }
+ }
+ return 0;
+ }
+ }
+}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java?rev=1489070&r1=1489069&r2=1489070&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java Mon Jun 3 17:33:55 2013
@@ -25,17 +25,21 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
import com.google.common.annotations.VisibleForTesting;
+/**
+ * Makes scheduling decisions by trying to equalize shares of memory.
+ */
@Private
@Unstable
public class FairSharePolicy extends SchedulingPolicy {
@VisibleForTesting
- public static final String NAME = "Fairshare";
+ public static final String NAME = "fair";
private static final DefaultResourceCalculator RESOURCE_CALCULATOR =
new DefaultResourceCalculator();
private FairShareComparator comparator = new FairShareComparator();
@@ -79,8 +83,10 @@ public class FairSharePolicy extends Sch
/ Resources.max(RESOURCE_CALCULATOR, null, minShare1, one).getMemory();
minShareRatio2 = (double) s2.getResourceUsage().getMemory()
/ Resources.max(RESOURCE_CALCULATOR, null, minShare2, one).getMemory();
- useToWeightRatio1 = s1.getResourceUsage().getMemory() / s1.getWeight();
- useToWeightRatio2 = s2.getResourceUsage().getMemory() / s2.getWeight();
+ useToWeightRatio1 = s1.getResourceUsage().getMemory() /
+ s1.getWeights().getWeight(ResourceType.MEMORY);
+ useToWeightRatio2 = s2.getResourceUsage().getMemory() /
+ s2.getWeights().getWeight(ResourceType.MEMORY);
int res = 0;
if (s1Needy && !s2Needy)
res = -1;
@@ -220,7 +226,7 @@ public class FairSharePolicy extends Sch
* {@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
*/
private static Resource computeShare(Schedulable sched, double r2sRatio) {
- double share = sched.getWeight() * r2sRatio;
+ double share = sched.getWeights().getWeight(ResourceType.MEMORY) * r2sRatio;
share = Math.max(share, sched.getMinShare().getMemory());
share = Math.min(share, sched.getDemand().getMemory());
return Resources.createResource((int) share);
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java?rev=1489070&r1=1489069&r2=1489070&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java Mon Jun 3 17:33:55 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.util.Records;
@@ -30,7 +31,7 @@ public class FakeSchedulable extends Sch
private Resource demand;
private Resource usage;
private Resource minShare;
- private double weight;
+ private ResourceWeights weights;
private Priority priority;
private long startTime;
@@ -46,21 +47,22 @@ public class FakeSchedulable extends Sch
this(demand, minShare, 1, 0, 0, 0);
}
- public FakeSchedulable(int demand, int minShare, double weight) {
- this(demand, minShare, weight, 0, 0, 0);
+ public FakeSchedulable(int demand, int minShare, double memoryWeight) {
+ this(demand, minShare, memoryWeight, 0, 0, 0);
}
public FakeSchedulable(int demand, int minShare, double weight, int fairShare, int usage,
long startTime) {
- this(Resources.createResource(demand), Resources.createResource(minShare), weight,
- Resources.createResource(fairShare), Resources.createResource(usage), startTime);
+ this(Resources.createResource(demand), Resources.createResource(minShare),
+ new ResourceWeights((float)weight), Resources.createResource(fairShare),
+ Resources.createResource(usage), startTime);
}
- public FakeSchedulable(Resource demand, Resource minShare, double weight, Resource fairShare,
- Resource usage, long startTime) {
+ public FakeSchedulable(Resource demand, Resource minShare, ResourceWeights weight,
+ Resource fairShare, Resource usage, long startTime) {
this.demand = demand;
this.minShare = minShare;
- this.weight = weight;
+ this.weights = weight;
setFairShare(fairShare);
this.usage = usage;
this.priority = Records.newRecord(Priority.class);
@@ -98,8 +100,8 @@ public class FakeSchedulable extends Sch
}
@Override
- public double getWeight() {
- return weight;
+ public ResourceWeights getWeights() {
+ return weights;
}
@Override
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1489070&r1=1489069&r2=1489070&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Mon Jun 3 17:33:55 2013
@@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.After;
@@ -147,12 +148,17 @@ public class TestFairScheduler {
ApplicationAttemptId.newInstance(appIdImpl, attemptId);
return attId;
}
-
-
+
private ResourceRequest createResourceRequest(int memory, String host,
int priority, int numContainers, boolean relaxLocality) {
+ return createResourceRequest(memory, 1, host, priority, numContainers,
+ relaxLocality);
+ }
+
+ private ResourceRequest createResourceRequest(int memory, int vcores, String host,
+ int priority, int numContainers, boolean relaxLocality) {
ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class);
- request.setCapability(Resources.createResource(memory));
+ request.setCapability(BuilderUtils.newResource(memory, vcores));
request.setResourceName(host);
request.setNumContainers(numContainers);
Priority prio = recordFactory.newRecordInstance(Priority.class);
@@ -170,18 +176,34 @@ public class TestFairScheduler {
String userId) {
return createSchedulingRequest(memory, queueId, userId, 1);
}
+
+ private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
+ String queueId, String userId) {
+ return createSchedulingRequest(memory, vcores, queueId, userId, 1);
+ }
private ApplicationAttemptId createSchedulingRequest(int memory, String queueId,
String userId, int numContainers) {
return createSchedulingRequest(memory, queueId, userId, numContainers, 1);
}
+
+ private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
+ String queueId, String userId, int numContainers) {
+ return createSchedulingRequest(memory, vcores, queueId, userId, numContainers, 1);
+ }
private ApplicationAttemptId createSchedulingRequest(int memory, String queueId,
String userId, int numContainers, int priority) {
+ return createSchedulingRequest(memory, 1, queueId, userId, numContainers,
+ priority);
+ }
+
+ private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
+ String queueId, String userId, int numContainers, int priority) {
ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
scheduler.addApplication(id, queueId, userId);
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
- ResourceRequest request = createResourceRequest(memory, ResourceRequest.ANY,
+ ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
priority, numContainers, true);
ask.add(request);
scheduler.allocate(id, ask, new ArrayList<ContainerId>());
@@ -451,10 +473,10 @@ public class TestFairScheduler {
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
- out.println("<minResources>1024</minResources>");
+ out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
- out.println("<minResources>2048</minResources>");
+ out.println("<minResources>2048mb,0vcores</minResources>");
out.println("</queue>");
out.println("</allocations>");
out.close();
@@ -569,11 +591,11 @@ public class TestFairScheduler {
out.println("<allocations>");
// Give queue A a minimum of 1024 M
out.println("<queue name=\"queueA\">");
- out.println("<minResources>1024</minResources>");
+ out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
// Give queue B a minimum of 2048 M
out.println("<queue name=\"queueB\">");
- out.println("<minResources>2048</minResources>");
+ out.println("<minResources>2048mb,0vcores</minResources>");
out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>");
out.println("</queue>");
// Give queue C no minimum
@@ -613,9 +635,9 @@ public class TestFairScheduler {
assertEquals(Resources.createResource(0),
queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
- assertEquals(Resources.createResource(1024),
+ assertEquals(Resources.createResource(1024, 0),
queueManager.getMinResources("root.queueA"));
- assertEquals(Resources.createResource(2048),
+ assertEquals(Resources.createResource(2048, 0),
queueManager.getMinResources("root.queueB"));
assertEquals(Resources.createResource(0),
queueManager.getMinResources("root.queueC"));
@@ -672,15 +694,15 @@ public class TestFairScheduler {
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
- out.println("<minResources>2048</minResources>");
+ out.println("<minResources>2048mb,0vcores</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
- out.println("<minResources>2048</minResources>");
+ out.println("<minResources>2048mb,0vcores</minResources>");
out.println("<queue name=\"queueC\">");
- out.println("<minResources>2048</minResources>");
+ out.println("<minResources>2048mb,0vcores</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueD\">");
- out.println("<minResources>2048</minResources>");
+ out.println("<minResources>2048mb,0vcores</minResources>");
out.println("</queue>");
out.println("</queue>");
out.println("</allocations>");
@@ -710,11 +732,11 @@ public class TestFairScheduler {
out.println("<allocations>");
// Give queue A a minimum of 1024 M
out.println("<pool name=\"queueA\">");
- out.println("<minResources>1024</minResources>");
+ out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</pool>");
// Give queue B a minimum of 2048 M
out.println("<pool name=\"queueB\">");
- out.println("<minResources>2048</minResources>");
+ out.println("<minResources>2048mb,0vcores</minResources>");
out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>");
out.println("</pool>");
// Give queue C no minimum
@@ -754,9 +776,9 @@ public class TestFairScheduler {
assertEquals(Resources.createResource(0),
queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
- assertEquals(Resources.createResource(1024),
+ assertEquals(Resources.createResource(1024, 0),
queueManager.getMinResources("root.queueA"));
- assertEquals(Resources.createResource(2048),
+ assertEquals(Resources.createResource(2048, 0),
queueManager.getMinResources("root.queueB"));
assertEquals(Resources.createResource(0),
queueManager.getMinResources("root.queueC"));
@@ -812,10 +834,10 @@ public class TestFairScheduler {
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
- out.println("<minResources>2048</minResources>");
+ out.println("<minResources>2048mb,0vcores</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
- out.println("<minResources>2048</minResources>");
+ out.println("<minResources>2048mb,0vcores</minResources>");
out.println("</queue>");
out.println("</allocations>");
out.close();
@@ -825,7 +847,7 @@ public class TestFairScheduler {
// Add one big node (only care about aggregate capacity)
RMNode node1 =
- MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024), 1,
+ MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
@@ -885,7 +907,7 @@ public class TestFairScheduler {
// Add one big node (only care about aggregate capacity)
RMNode node1 =
- MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024), 1,
+ MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
@@ -963,19 +985,19 @@ public class TestFairScheduler {
// Create four nodes
RMNode node1 =
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 1,
+ MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
RMNode node2 =
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 2,
+ MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2,
"127.0.0.2");
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
scheduler.handle(nodeEvent2);
RMNode node3 =
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 3,
+ MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3,
"127.0.0.3");
NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
scheduler.handle(nodeEvent3);
@@ -1106,19 +1128,19 @@ public class TestFairScheduler {
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
out.println("<weight>.25</weight>");
- out.println("<minResources>1024</minResources>");
+ out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<weight>.25</weight>");
- out.println("<minResources>1024</minResources>");
+ out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueC\">");
out.println("<weight>.25</weight>");
- out.println("<minResources>1024</minResources>");
+ out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueD\">");
out.println("<weight>.25</weight>");
- out.println("<minResources>1024</minResources>");
+ out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
out.print("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
out.print("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>");
@@ -1130,19 +1152,19 @@ public class TestFairScheduler {
// Create four nodes
RMNode node1 =
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 1,
+ MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
RMNode node2 =
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 2,
+ MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2,
"127.0.0.2");
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
scheduler.handle(nodeEvent2);
RMNode node3 =
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 3,
+ MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3,
"127.0.0.3");
NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
scheduler.handle(nodeEvent3);
@@ -1206,19 +1228,19 @@ public class TestFairScheduler {
// After minSharePreemptionTime has passed, they should want to preempt min
// share.
clock.tick(6);
- assertTrue(Resources.equals(
- Resources.createResource(1024), scheduler.resToPreempt(schedC, clock.getTime())));
- assertTrue(Resources.equals(
- Resources.createResource(1024), scheduler.resToPreempt(schedD, clock.getTime())));
+ assertEquals(
+ 1024, scheduler.resToPreempt(schedC, clock.getTime()).getMemory());
+ assertEquals(
+ 1024, scheduler.resToPreempt(schedD, clock.getTime()).getMemory());
// After fairSharePreemptionTime has passed, they should want to preempt
// fair share.
scheduler.update();
clock.tick(6);
- assertTrue(Resources.equals(
- Resources.createResource(1536), scheduler.resToPreempt(schedC, clock.getTime())));
- assertTrue(Resources.equals(
- Resources.createResource(1536), scheduler.resToPreempt(schedD, clock.getTime())));
+ assertEquals(
+ 1536 , scheduler.resToPreempt(schedC, clock.getTime()).getMemory());
+ assertEquals(
+ 1536, scheduler.resToPreempt(schedD, clock.getTime()).getMemory());
}
@Test (timeout = 5000)
@@ -1271,7 +1293,7 @@ public class TestFairScheduler {
// Add a node
RMNode node1 =
MockNodes
- .newNodeInfo(1, Resources.createResource(8192), 1, "127.0.0.1");
+ .newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
@@ -1443,7 +1465,7 @@ public class TestFairScheduler {
public void testFifoWithinQueue() throws Exception {
RMNode node1 =
MockNodes
- .newNodeInfo(1, Resources.createResource(3072), 1, "127.0.0.1");
+ .newNodeInfo(1, Resources.createResource(3072, 3), 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
@@ -1488,7 +1510,7 @@ public class TestFairScheduler {
.setPolicy(SchedulingPolicy.getDefault());
RMNode node =
- MockNodes.newNodeInfo(1, Resources.createResource(16384), 0,
+ MockNodes.newNodeInfo(1, Resources.createResource(16384, 16), 0,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
@@ -1536,10 +1558,10 @@ public class TestFairScheduler {
RMNode node1 =
MockNodes
- .newNodeInfo(1, Resources.createResource(8192), 1, "127.0.0.1");
+ .newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1");
RMNode node2 =
MockNodes
- .newNodeInfo(1, Resources.createResource(8192), 2, "127.0.0.2");
+ .newNodeInfo(1, Resources.createResource(8192, 8), 2, "127.0.0.2");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
@@ -1685,7 +1707,8 @@ public class TestFairScheduler {
public void testRemoveNodeUpdatesRootQueueMetrics() {
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
- RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
+ RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1,
+ "127.0.0.1");
NodeAddedSchedulerEvent addEvent = new NodeAddedSchedulerEvent(node1);
scheduler.handle(addEvent);
@@ -1824,4 +1847,157 @@ public class TestFairScheduler {
scheduler.handle(nodeUpdateEvent);
assertEquals(0, app.getReservedContainers().size());
}
+
+ public void testNoMoreCpuOnNode() {
+ RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 1),
+ 1, "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ ApplicationAttemptId attId = createSchedulingRequest(1024, 1, "default",
+ "user1", 2);
+ FSSchedulerApp app = scheduler.applications.get(attId);
+ scheduler.update();
+
+ NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
+ scheduler.handle(updateEvent);
+ assertEquals(1, app.getLiveContainers().size());
+ scheduler.handle(updateEvent);
+ assertEquals(1, app.getLiveContainers().size());
+ }
+
+ public void testBasicDRFAssignment() throws Exception {
+ RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 5));
+ NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
+ scheduler.handle(nodeEvent);
+
+ ApplicationAttemptId appAttId1 = createSchedulingRequest(2048, 1, "queue1",
+ "user1", 2);
+ FSSchedulerApp app1 = scheduler.applications.get(appAttId1);
+ ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 2, "queue1",
+ "user1", 2);
+ FSSchedulerApp app2 = scheduler.applications.get(appAttId2);
+
+ DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
+ drfPolicy.initialize(scheduler.getClusterCapacity());
+ scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
+ scheduler.update();
+
+ // First both apps get a container
+ // Then the first gets another container because its dominant share of
+ // 2048/8192 is less than the other's of 2/5
+ NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
+ scheduler.handle(updateEvent);
+ Assert.assertEquals(1, app1.getLiveContainers().size());
+ Assert.assertEquals(0, app2.getLiveContainers().size());
+
+ scheduler.handle(updateEvent);
+ Assert.assertEquals(1, app1.getLiveContainers().size());
+ Assert.assertEquals(1, app2.getLiveContainers().size());
+
+ scheduler.handle(updateEvent);
+ Assert.assertEquals(2, app1.getLiveContainers().size());
+ Assert.assertEquals(1, app2.getLiveContainers().size());
+ }
+
+ /**
+ * Two apps on one queue, one app on another
+ */
+ @Test
+ public void testBasicDRFWithQueues() throws Exception {
+ RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 7),
+ 1, "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
+ scheduler.handle(nodeEvent);
+
+ ApplicationAttemptId appAttId1 = createSchedulingRequest(3072, 1, "queue1",
+ "user1", 2);
+ FSSchedulerApp app1 = scheduler.applications.get(appAttId1);
+ ApplicationAttemptId appAttId2 = createSchedulingRequest(2048, 2, "queue1",
+ "user1", 2);
+ FSSchedulerApp app2 = scheduler.applications.get(appAttId2);
+ ApplicationAttemptId appAttId3 = createSchedulingRequest(1024, 2, "queue2",
+ "user1", 2);
+ FSSchedulerApp app3 = scheduler.applications.get(appAttId3);
+
+ DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
+ drfPolicy.initialize(scheduler.getClusterCapacity());
+ scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy);
+ scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
+ scheduler.update();
+
+ NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
+ scheduler.handle(updateEvent);
+ Assert.assertEquals(1, app1.getLiveContainers().size());
+ scheduler.handle(updateEvent);
+ Assert.assertEquals(1, app3.getLiveContainers().size());
+ scheduler.handle(updateEvent);
+ Assert.assertEquals(2, app3.getLiveContainers().size());
+ scheduler.handle(updateEvent);
+ Assert.assertEquals(1, app2.getLiveContainers().size());
+ }
+
+ @Test
+ public void testDRFHierarchicalQueues() throws Exception {
+ RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(12288, 12),
+ 1, "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
+ scheduler.handle(nodeEvent);
+
+ ApplicationAttemptId appAttId1 = createSchedulingRequest(3074, 1, "queue1.subqueue1",
+ "user1", 2);
+ Thread.sleep(3); // so that start times will be different
+ FSSchedulerApp app1 = scheduler.applications.get(appAttId1);
+ ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 3, "queue1.subqueue1",
+ "user1", 2);
+ Thread.sleep(3); // so that start times will be different
+ FSSchedulerApp app2 = scheduler.applications.get(appAttId2);
+ ApplicationAttemptId appAttId3 = createSchedulingRequest(2048, 2, "queue1.subqueue2",
+ "user1", 2);
+ Thread.sleep(3); // so that start times will be different
+ FSSchedulerApp app3 = scheduler.applications.get(appAttId3);
+ ApplicationAttemptId appAttId4 = createSchedulingRequest(1024, 2, "queue2",
+ "user1", 2);
+ Thread.sleep(3); // so that start times will be different
+ FSSchedulerApp app4 = scheduler.applications.get(appAttId4);
+
+ DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
+ drfPolicy.initialize(scheduler.getClusterCapacity());
+ scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy);
+ scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
+ scheduler.getQueueManager().getQueue("queue1.subqueue1").setPolicy(drfPolicy);
+ scheduler.update();
+
+ NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
+ scheduler.handle(updateEvent);
+ // app1 gets first container because it asked first
+ Assert.assertEquals(1, app1.getLiveContainers().size());
+ scheduler.handle(updateEvent);
+ // app4 gets second container because it's on queue2
+ Assert.assertEquals(1, app4.getLiveContainers().size());
+ scheduler.handle(updateEvent);
+ // app4 gets another container because queue2's dominant share of memory
+ // is still less than queue1's of cpu
+ Assert.assertEquals(2, app4.getLiveContainers().size());
+ scheduler.handle(updateEvent);
+ // app3 gets one because queue1 gets one and queue1.subqueue2 is behind
+ // queue1.subqueue1
+ Assert.assertEquals(1, app3.getLiveContainers().size());
+ scheduler.handle(updateEvent);
+ // app4 would get another one, but it doesn't have any requests
+ // queue1.subqueue2 is still using less than queue1.subqueue1, so it
+ // gets another
+ Assert.assertEquals(2, app3.getLiveContainers().size());
+ // queue1.subqueue1 is behind again, so it gets one, which it gives to app2
+ scheduler.handle(updateEvent);
+ Assert.assertEquals(1, app2.getLiveContainers().size());
+
+ // at this point, we've used all our CPU up, so nobody else should get a container
+ scheduler.handle(updateEvent);
+
+ Assert.assertEquals(1, app1.getLiveContainers().size());
+ Assert.assertEquals(1, app2.getLiveContainers().size());
+ Assert.assertEquals(2, app3.getLiveContainers().size());
+ Assert.assertEquals(2, app4.getLiveContainers().size());
+ }
}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerConfiguration.java?rev=1489070&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerConfiguration.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerConfiguration.java Mon Jun 3 17:33:55 2013
@@ -0,0 +1,58 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration.*;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.Test;
+
+public class TestFairSchedulerConfiguration {
+ @Test
+ public void testParseResourceConfigValue() throws Exception {
+ assertEquals(BuilderUtils.newResource(1024, 2),
+ parseResourceConfigValue("2 vcores, 1024 mb"));
+ assertEquals(BuilderUtils.newResource(1024, 2),
+ parseResourceConfigValue("1024 mb, 2 vcores"));
+ assertEquals(BuilderUtils.newResource(1024, 2),
+ parseResourceConfigValue("2vcores,1024mb"));
+ assertEquals(BuilderUtils.newResource(1024, 2),
+ parseResourceConfigValue("1024mb,2vcores"));
+ }
+
+ @Test(expected = AllocationConfigurationException.class)
+ public void testNoUnits() throws Exception {
+ parseResourceConfigValue("1024");
+ }
+
+ @Test(expected = AllocationConfigurationException.class)
+ public void testOnlyMemory() throws Exception {
+ parseResourceConfigValue("1024mb");
+ }
+
+ @Test(expected = AllocationConfigurationException.class)
+ public void testOnlyCPU() throws Exception {
+ parseResourceConfigValue("1024vcores");
+ }
+
+ @Test(expected = AllocationConfigurationException.class)
+ public void testGibberish() throws Exception {
+ parseResourceConfigValue("1o24vc0res");
+ }
+}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java?rev=1489070&r1=1489069&r2=1489070&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java Mon Jun 3 17:33:55 2013
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.re
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.junit.Test;
@@ -49,6 +50,11 @@ public class TestSchedulingPolicy {
assertTrue("Invalid scheduler name",
sm.getName().equals(FairSharePolicy.NAME));
+ // Shortname - drf
+ sm = SchedulingPolicy.parse("drf");
+ assertTrue("Invalid scheduler name",
+ sm.getName().equals(DominantResourceFairnessPolicy.NAME));
+
// Shortname - fair
sm = SchedulingPolicy.parse("fair");
assertTrue("Invalid scheduler name",
@@ -93,7 +99,20 @@ public class TestSchedulingPolicy {
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_PARENT));
assertTrue(ERR,
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ANY));
-
+
+ // drf
+ policy = SchedulingPolicy.parse("drf");
+ assertTrue(ERR,
+ SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_LEAF));
+ assertTrue(ERR, SchedulingPolicy.isApplicableTo(policy,
+ SchedulingPolicy.DEPTH_INTERMEDIATE));
+ assertTrue(ERR,
+ SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ROOT));
+ assertTrue(ERR,
+ SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_PARENT));
+ assertTrue(ERR,
+ SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ANY));
+
policy = Mockito.mock(SchedulingPolicy.class);
Mockito.when(policy.getApplicableDepth()).thenReturn(
SchedulingPolicy.DEPTH_PARENT);