You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by tu...@apache.org on 2013/03/26 04:25:27 UTC
svn commit: r1460961 - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/
hadoop-yarn/hadoop-yarn-server/hadoop-y...
Author: tucu
Date: Tue Mar 26 03:25:26 2013
New Revision: 1460961
URL: http://svn.apache.org/r1460961
Log:
YARN-469. Make scheduling mode in FS pluggable. (kkambatl via tucu)
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FairSchedulingMode.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FifoSchedulingMode.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingMode.java
Removed:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingAlgorithms.java
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1460961&r1=1460960&r2=1460961&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue Mar 26 03:25:26 2013
@@ -96,6 +96,8 @@ Release 2.0.5-beta - UNRELEASED
YARN-497. Yarn unmanaged-am launcher jar does not define a main class in
its manifest (Hitesh Shah via bikas)
+ YARN-469. Make scheduling mode in FS pluggable. (kkambatl via tucu)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1460961&r1=1460960&r2=1460961&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java Tue Mar 26 03:25:26 2013
@@ -92,13 +92,7 @@ public class FSLeafQueue extends FSQueue
@Override
public void recomputeFairShares() {
- if (schedulingMode == SchedulingMode.FAIR) {
- SchedulingAlgorithms.computeFairShares(appScheds, getFairShare());
- } else {
- for (AppSchedulable sched: appScheds) {
- sched.setFairShare(Resources.createResource(0));
- }
- }
+ schedulingMode.computeShares(getAppSchedulables(), getFairShare());
}
@Override
@@ -162,17 +156,9 @@ public class FSLeafQueue extends FSQueue
return Resources.none(); // We should never get here
}
- // Otherwise, chose app to schedule based on given policy (fair vs fifo).
+ // Otherwise, chose app to schedule based on given policy.
else {
- Comparator<Schedulable> comparator;
- if (schedulingMode == SchedulingMode.FIFO) {
- comparator = new SchedulingAlgorithms.FifoComparator();
- } else if (schedulingMode == SchedulingMode.FAIR) {
- comparator = new SchedulingAlgorithms.FairShareComparator();
- } else {
- throw new RuntimeException("Unsupported queue scheduling mode " +
- schedulingMode);
- }
+ Comparator<Schedulable> comparator = schedulingMode.getComparator();
Collections.sort(appScheds, comparator);
for (AppSchedulable sched: appScheds) {
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java?rev=1460961&r1=1460960&r2=1460961&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java Tue Mar 26 03:25:26 2013
@@ -51,7 +51,7 @@ public class FSParentQueue extends FSQue
@Override
public void recomputeFairShares() {
- SchedulingAlgorithms.computeFairShares(childQueues, getFairShare());
+ SchedulingMode.getDefault().computeShares(childQueues, getFairShare());
for (FSQueue childQueue : childQueues) {
childQueue.getMetrics().setAvailableResourcesToQueue(childQueue.getFairShare());
childQueue.recomputeFairShares();
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1460961&r1=1460960&r2=1460961&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Tue Mar 26 03:25:26 2013
@@ -803,7 +803,7 @@ public class FairScheduler implements Re
// At most one task is scheduled each iteration of this loop
List<FSLeafQueue> scheds = new ArrayList<FSLeafQueue>(
queueMgr.getLeafQueues());
- Collections.sort(scheds, new SchedulingAlgorithms.FairShareComparator());
+ Collections.sort(scheds, SchedulingMode.getDefault().getComparator());
boolean assignedContainer = false;
for (FSLeafQueue sched : scheds) {
Resource assigned = sched.assignContainer(node, false);
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1460961&r1=1460960&r2=1460961&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Tue Mar 26 03:25:26 2013
@@ -310,7 +310,7 @@ public class QueueManager {
int queueMaxAppsDefault = Integer.MAX_VALUE;
long fairSharePreemptionTimeout = Long.MAX_VALUE;
long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
- SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR;
+ SchedulingMode defaultSchedulingMode = SchedulingMode.getDefault();
// Remember all queue names so we can display them on web UI, etc.
List<String> queueNamesInAllocFile = new ArrayList<String>();
@@ -373,7 +373,8 @@ public class QueueManager {
queueMaxAppsDefault = val;}
else if ("defaultQueueSchedulingMode".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
- defaultSchedulingMode = parseSchedulingMode(text);
+ SchedulingMode.setDefault(text);
+ defaultSchedulingMode = SchedulingMode.getDefault();
} else {
LOG.warn("Bad element in allocations file: " + element.getTagName());
}
@@ -449,7 +450,7 @@ public class QueueManager {
minSharePreemptionTimeouts.put(queueName, val);
} else if ("schedulingMode".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
- queueModes.put(queueName, parseSchedulingMode(text));
+ queueModes.put(queueName, SchedulingMode.parse(text));
} else if ("aclSubmitApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
@@ -476,19 +477,6 @@ public class QueueManager {
}
}
- private SchedulingMode parseSchedulingMode(String text)
- throws AllocationConfigurationException {
- text = text.toLowerCase();
- if (text.equals("fair")) {
- return SchedulingMode.FAIR;
- } else if (text.equals("fifo")) {
- return SchedulingMode.FIFO;
- } else {
- throw new AllocationConfigurationException(
- "Unknown scheduling mode : " + text + "; expected 'fifo' or 'fair'");
- }
- }
-
/**
* Get the minimum resource allocation for the given queue.
* @return the cap set on this queue, or 0 if not set.
@@ -663,7 +651,7 @@ public class QueueManager {
minSharePreemptionTimeouts = new HashMap<String, Long>();
defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
fairSharePreemptionTimeout = Long.MAX_VALUE;
- defaultSchedulingMode = SchedulingMode.FAIR;
+ defaultSchedulingMode = SchedulingMode.getDefault();
}
}
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java?rev=1460961&r1=1460960&r2=1460961&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java Tue Mar 26 03:25:26 2013
@@ -55,7 +55,7 @@ import org.apache.hadoop.yarn.server.res
*/
@Private
@Unstable
-abstract class Schedulable {
+public abstract class Schedulable {
/** Fair share assigned to this Schedulable */
private Resource fairShare = Resources.createResource(0);
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java?rev=1460961&r1=1460960&r2=1460961&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java Tue Mar 26 03:25:26 2013
@@ -15,17 +15,104 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FairSchedulingMode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FifoSchedulingMode;
-/**
- * Internal scheduling modes for queues.
- */
-@Private
+@Public
@Unstable
-public enum SchedulingMode {
- FAIR, FIFO
+public abstract class SchedulingMode {
+ private static final ConcurrentHashMap<Class<? extends SchedulingMode>, SchedulingMode> instances =
+ new ConcurrentHashMap<Class<? extends SchedulingMode>, SchedulingMode>();
+
+ private static SchedulingMode DEFAULT_MODE =
+ getInstance(FairSchedulingMode.class);
+
+ public static SchedulingMode getDefault() {
+ return DEFAULT_MODE;
+ }
+
+ public static void setDefault(String className)
+ throws AllocationConfigurationException {
+ DEFAULT_MODE = parse(className);
+ }
+
+ /**
+ * Returns a {@link SchedulingMode} instance corresponding to the passed clazz
+ */
+ public static SchedulingMode getInstance(Class<? extends SchedulingMode> clazz) {
+ SchedulingMode mode = instances.get(clazz);
+ if (mode == null) {
+ mode = ReflectionUtils.newInstance(clazz, null);
+ instances.put(clazz, mode);
+ }
+ return mode;
+ }
+
+ /**
+ * Returns {@link SchedulingMode} instance corresponding to the
+ * {@link SchedulingMode} passed as a string. The mode can be "fair" for
+ * FairSchedulingMode of "fifo" for FifoSchedulingMode. For custom
+ * {@link SchedulingMode}s in the RM classpath, the mode should be canonical
+ * class name of the {@link SchedulingMode}.
+ *
+ * @param mode canonical class name or "fair" or "fifo"
+ * @throws AllocationConfigurationException
+ */
+ @SuppressWarnings("unchecked")
+ public static SchedulingMode parse(String mode)
+ throws AllocationConfigurationException {
+ @SuppressWarnings("rawtypes")
+ Class clazz;
+ String text = mode.toLowerCase();
+ if (text.equals("fair")) {
+ clazz = FairSchedulingMode.class;
+ } else if (text.equals("fifo")) {
+ clazz = FifoSchedulingMode.class;
+ } else {
+ try {
+ clazz = Class.forName(mode);
+ } catch (ClassNotFoundException cnfe) {
+ throw new AllocationConfigurationException(mode
+ + " SchedulingMode class not found!");
+ }
+ }
+ if (!SchedulingMode.class.isAssignableFrom(clazz)) {
+ throw new AllocationConfigurationException(mode
+ + " does not extend SchedulingMode");
+ }
+ return getInstance(clazz);
+ }
+
+ /**
+ * @return returns the name of SchedulingMode
+ */
+ public abstract String getName();
+
+ /**
+ * The comparator returned by this method is to be used for sorting the
+ * {@link Schedulable}s in that queue.
+ *
+ * @return the comparator to sort by
+ */
+ public abstract Comparator<Schedulable> getComparator();
+
+ /**
+ * Computes and updates the shares of {@link Schedulable}s as per the
+ * SchedulingMode, to be used later at schedule time.
+ *
+ * @param schedulables {@link Schedulable}s whose shares are to be updated
+ * @param totalResources Total {@link Resource}s in the cluster
+ */
+ public abstract void computeShares(
+ Collection<? extends Schedulable> schedulables, Resource totalResources);
}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FairSchedulingMode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FairSchedulingMode.java?rev=1460961&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FairSchedulingMode.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FairSchedulingMode.java Tue Mar 26 03:25:26 2013
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Comparator;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingMode;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class FairSchedulingMode extends SchedulingMode {
+ @VisibleForTesting
+ public static final String NAME = "FairShare";
+ private FairShareComparator comparator = new FairShareComparator();
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+
+ /**
+ * Compare Schedulables via weighted fair sharing. In addition, Schedulables
+ * below their min share get priority over those whose min share is met.
+ *
+ * Schedulables below their min share are compared by how far below it they
+ * are as a ratio. For example, if job A has 8 out of a min share of 10 tasks
+ * and job B has 50 out of a min share of 100, then job B is scheduled next,
+ * because B is at 50% of its min share and A is at 80% of its min share.
+ *
+ * Schedulables above their min share are compared by (runningTasks / weight).
+ * If all weights are equal, slots are given to the job with the fewest tasks;
+ * otherwise, jobs with more weight get proportionally more slots.
+ */
+ private static class FairShareComparator implements Comparator<Schedulable>,
+ Serializable {
+ private static final long serialVersionUID = 5564969375856699313L;
+
+ @Override
+ public int compare(Schedulable s1, Schedulable s2) {
+ double minShareRatio1, minShareRatio2;
+ double useToWeightRatio1, useToWeightRatio2;
+ Resource minShare1 = Resources.min(s1.getMinShare(), s1.getDemand());
+ Resource minShare2 = Resources.min(s2.getMinShare(), s2.getDemand());
+ boolean s1Needy = Resources.lessThan(s1.getResourceUsage(), minShare1);
+ boolean s2Needy = Resources.lessThan(s2.getResourceUsage(), minShare2);
+ Resource one = Resources.createResource(1);
+ minShareRatio1 = (double) s1.getResourceUsage().getMemory()
+ / Resources.max(minShare1, one).getMemory();
+ minShareRatio2 = (double) s2.getResourceUsage().getMemory()
+ / Resources.max(minShare2, one).getMemory();
+ useToWeightRatio1 = s1.getResourceUsage().getMemory() / s1.getWeight();
+ useToWeightRatio2 = s2.getResourceUsage().getMemory() / s2.getWeight();
+ int res = 0;
+ if (s1Needy && !s2Needy)
+ res = -1;
+ else if (s2Needy && !s1Needy)
+ res = 1;
+ else if (s1Needy && s2Needy)
+ res = (int) Math.signum(minShareRatio1 - minShareRatio2);
+ else
+ // Neither schedulable is needy
+ res = (int) Math.signum(useToWeightRatio1 - useToWeightRatio2);
+ if (res == 0) {
+ // Apps are tied in fairness ratio. Break the tie by submit time and job
+ // name to get a deterministic ordering, which is useful for unit tests.
+ res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
+ if (res == 0)
+ res = s1.getName().compareTo(s2.getName());
+ }
+ return res;
+ }
+ }
+
+ @Override
+ public Comparator<Schedulable> getComparator() {
+ return comparator;
+ }
+
+ @Override
+ public void computeShares(Collection<? extends Schedulable> schedulables,
+ Resource totalResources) {
+ computeFairShares(schedulables, totalResources);
+ }
+
+ /**
+ * Number of iterations for the binary search in computeFairShares. This is
+ * equivalent to the number of bits of precision in the output. 25 iterations
+ * gives precision better than 0.1 slots in clusters with one million slots.
+ */
+ private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25;
+
+ /**
+ * Given a set of Schedulables and a number of slots, compute their weighted
+ * fair shares. The min shares and demands of the Schedulables are assumed to
+ * be set beforehand. We compute the fairest possible allocation of shares to
+ * the Schedulables that respects their min shares and demands.
+ *
+ * To understand what this method does, we must first define what weighted
+ * fair sharing means in the presence of minimum shares and demands. If there
+ * were no minimum shares and every Schedulable had an infinite demand (i.e.
+ * could launch infinitely many tasks), then weighted fair sharing would be
+ * achieved if the ratio of slotsAssigned / weight was equal for each
+ * Schedulable and all slots were assigned. Minimum shares and demands add two
+ * further twists: - Some Schedulables may not have enough tasks to fill all
+ * their share. - Some Schedulables may have a min share higher than their
+ * assigned share.
+ *
+ * To deal with these possibilities, we define an assignment of slots as being
+ * fair if there exists a ratio R such that: - Schedulables S where S.demand <
+ * R * S.weight are assigned share S.demand - Schedulables S where S.minShare
+ * > R * S.weight are given share S.minShare - All other Schedulables S are
+ * assigned share R * S.weight - The sum of all the shares is totalSlots.
+ *
+ * We call R the weight-to-slots ratio because it converts a Schedulable's
+ * weight to the number of slots it is assigned.
+ *
+ * We compute a fair allocation by finding a suitable weight-to-slot ratio R.
+ * To do this, we use binary search. Given a ratio R, we compute the number of
+ * slots that would be used in total with this ratio (the sum of the shares
+ * computed using the conditions above). If this number of slots is less than
+ * totalSlots, then R is too small and more slots could be assigned. If the
+ * number of slots is more than totalSlots, then R is too large.
+ *
+ * We begin the binary search with a lower bound on R of 0 (which means that
+ * all Schedulables are only given their minShare) and an upper bound computed
+ * to be large enough that too many slots are given (by doubling R until we
+ * either use more than totalSlots slots or we fulfill all jobs' demands). The
+ * helper method slotsUsedWithWeightToSlotRatio computes the total number of
+ * slots used with a given value of R.
+ *
+ * The running time of this algorithm is linear in the number of Schedulables,
+ * because slotsUsedWithWeightToSlotRatio is linear-time and the number of
+ * iterations of binary search is a constant (dependent on desired precision).
+ */
+ public static void computeFairShares(
+ Collection<? extends Schedulable> schedulables, Resource totalResources) {
+ // Find an upper bound on R that we can use in our binary search. We start
+ // at R = 1 and double it until we have either used totalSlots slots or we
+ // have met all Schedulables' demands (if total demand < totalSlots).
+ Resource totalDemand = Resources.createResource(0);
+ for (Schedulable sched : schedulables) {
+ Resources.addTo(totalDemand, sched.getDemand());
+ }
+ Resource cap = Resources.min(totalDemand, totalResources);
+ double rMax = 1.0;
+ while (Resources.lessThan(resUsedWithWeightToResRatio(rMax, schedulables),
+ cap)) {
+ rMax *= 2.0;
+ }
+ // Perform the binary search for up to COMPUTE_FAIR_SHARES_ITERATIONS steps
+ double left = 0;
+ double right = rMax;
+ for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {
+ double mid = (left + right) / 2.0;
+ if (Resources.lessThan(resUsedWithWeightToResRatio(mid, schedulables),
+ cap)) {
+ left = mid;
+ } else {
+ right = mid;
+ }
+ }
+ // Set the fair shares based on the value of R we've converged to
+ for (Schedulable sched : schedulables) {
+ sched.setFairShare(computeShare(sched, right));
+ }
+ }
+
+ /**
+ * Compute the number of slots that would be used given a weight-to-slot ratio
+ * w2sRatio, for use in the computeFairShares algorithm as described in #
+ * {@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
+ */
+ private static Resource resUsedWithWeightToResRatio(double w2sRatio,
+ Collection<? extends Schedulable> schedulables) {
+ Resource slotsTaken = Resources.createResource(0);
+ for (Schedulable sched : schedulables) {
+ Resource share = computeShare(sched, w2sRatio);
+ Resources.addTo(slotsTaken, share);
+ }
+ return slotsTaken;
+ }
+
+ /**
+ * Compute the resources assigned to a Schedulable given a particular
+ * res-to-slot ratio r2sRatio, for use in computeFairShares as described in #
+ * {@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
+ */
+ private static Resource computeShare(Schedulable sched, double r2sRatio) {
+ double share = sched.getWeight() * r2sRatio;
+ share = Math.max(share, sched.getMinShare().getMemory());
+ share = Math.min(share, sched.getDemand().getMemory());
+ return Resources.createResource((int) share);
+ }
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FifoSchedulingMode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FifoSchedulingMode.java?rev=1460961&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FifoSchedulingMode.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FifoSchedulingMode.java Tue Mar 26 03:25:26 2013
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Comparator;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingMode;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class FifoSchedulingMode extends SchedulingMode {
+ @VisibleForTesting
+ public static final String NAME = "FIFO";
+ private FifoComparator comparator = new FifoComparator();
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+
+ /**
+ * Compare Schedulables in order of priority and then submission time, as in
+ * the default FIFO scheduler in Hadoop.
+ */
+ static class FifoComparator implements Comparator<Schedulable>, Serializable {
+ private static final long serialVersionUID = -5905036205491177060L;
+
+ @Override
+ public int compare(Schedulable s1, Schedulable s2) {
+ int res = s1.getPriority().compareTo(s2.getPriority());
+ if (res == 0) {
+ res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
+ }
+ if (res == 0) {
+ // In the rare case where jobs were submitted at the exact same time,
+ // compare them by name (which will be the JobID) to get a deterministic
+ // ordering, so we don't alternately launch tasks from different jobs.
+ res = s1.getName().compareTo(s2.getName());
+ }
+ return res;
+ }
+ }
+
+ @Override
+ public Comparator<Schedulable> getComparator() {
+ return comparator;
+ }
+
+ @Override
+ public void computeShares(Collection<? extends Schedulable> schedulables,
+ Resource totalResources) {
+ for (Schedulable sched : schedulables) {
+ sched.setFairShare(Resources.createResource(0));
+ }
+ }
+}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java?rev=1460961&r1=1460960&r2=1460961&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java Tue Mar 26 03:25:26 2013
@@ -24,6 +24,7 @@ import java.util.List;
import junit.framework.Assert;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FairSchedulingMode;
import org.junit.Before;
import org.junit.Test;
@@ -32,10 +33,12 @@ import org.junit.Test;
*/
public class TestComputeFairShares {
private List<Schedulable> scheds;
+ private SchedulingMode schedulingMode;
@Before
public void setUp() throws Exception {
scheds = new ArrayList<Schedulable>();
+ schedulingMode = new FairSchedulingMode();
}
/**
@@ -48,7 +51,8 @@ public class TestComputeFairShares {
scheds.add(new FakeSchedulable(50));
scheds.add(new FakeSchedulable(30));
scheds.add(new FakeSchedulable(20));
- SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(40));
+ schedulingMode.computeShares(scheds,
+ Resources.createResource(40));
verifyShares(10, 10, 10, 10);
}
@@ -65,7 +69,8 @@ public class TestComputeFairShares {
scheds.add(new FakeSchedulable(50));
scheds.add(new FakeSchedulable(11));
scheds.add(new FakeSchedulable(3));
- SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(40));
+ schedulingMode.computeShares(scheds,
+ Resources.createResource(40));
verifyShares(13, 13, 11, 3);
}
@@ -83,7 +88,8 @@ public class TestComputeFairShares {
scheds.add(new FakeSchedulable(10, 20));
scheds.add(new FakeSchedulable(10, 0));
scheds.add(new FakeSchedulable(3, 2));
- SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(40));
+ schedulingMode.computeShares(scheds,
+ Resources.createResource(40));
verifyShares(20, 10, 7, 3);
}
@@ -97,7 +103,8 @@ public class TestComputeFairShares {
scheds.add(new FakeSchedulable(50, 0, 1.0));
scheds.add(new FakeSchedulable(30, 0, 1.0));
scheds.add(new FakeSchedulable(20, 0, 0.5));
- SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(45));
+ schedulingMode.computeShares(scheds,
+ Resources.createResource(45));
verifyShares(20, 10, 10, 5);
}
@@ -114,7 +121,8 @@ public class TestComputeFairShares {
scheds.add(new FakeSchedulable(11, 0, 1.0));
scheds.add(new FakeSchedulable(30, 0, 1.0));
scheds.add(new FakeSchedulable(20, 0, 0.5));
- SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(45));
+ schedulingMode.computeShares(scheds,
+ Resources.createResource(45));
verifyShares(10, 11, 16, 8);
}
@@ -131,7 +139,8 @@ public class TestComputeFairShares {
scheds.add(new FakeSchedulable(11, 0, 1.0));
scheds.add(new FakeSchedulable(30, 5, 1.0));
scheds.add(new FakeSchedulable(20, 15, 0.5));
- SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(45));
+ schedulingMode.computeShares(scheds,
+ Resources.createResource(45));
verifyShares(10, 10, 10, 15);
}
@@ -146,7 +155,9 @@ public class TestComputeFairShares {
scheds.add(new FakeSchedulable(50 * million));
scheds.add(new FakeSchedulable(30 * million));
scheds.add(new FakeSchedulable(20 * million));
- SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(40 * million));
+ schedulingMode
+ .computeShares(scheds,
+ Resources.createResource(40 * million));
verifyShares(10 * million, 10 * million, 10 * million, 10 * million);
}
@@ -159,7 +170,8 @@ public class TestComputeFairShares {
scheds.add(new FakeSchedulable(50));
scheds.add(new FakeSchedulable(30));
scheds.add(new FakeSchedulable(0));
- SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(30));
+ schedulingMode.computeShares(scheds,
+ Resources.createResource(30));
verifyShares(10, 10, 10, 0);
}
@@ -168,7 +180,8 @@ public class TestComputeFairShares {
*/
@Test
public void testEmptyList() {
- SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(40));
+ schedulingMode.computeShares(scheds,
+ Resources.createResource(40));
verifyShares();
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1460961&r1=1460960&r2=1460961&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Tue Mar 26 03:25:26 2013
@@ -72,6 +72,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FifoSchedulingMode;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -1325,7 +1326,7 @@ public class TestFairScheduler {
FSSchedulerApp app2 = scheduler.applications.get(attId2);
FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1");
- queue1.setSchedulingMode(SchedulingMode.FIFO);
+ queue1.setSchedulingMode(new FifoSchedulingMode());
scheduler.update();
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingMode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingMode.java?rev=1460961&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingMode.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingMode.java Tue Mar 26 03:25:26 2013
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FairSchedulingMode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FifoSchedulingMode;
+import org.junit.Test;
+
+public class TestSchedulingMode {
+
+ @Test(timeout = 1000)
+ public void testParseSchedulingMode() throws AllocationConfigurationException {
+
+ // Class name
+ SchedulingMode sm = SchedulingMode
+ .parse(FairSchedulingMode.class.getName());
+ assertTrue("Invalid scheduler name",
+ sm.getName().equals(FairSchedulingMode.NAME));
+
+ // Canonical name
+ sm = SchedulingMode.parse(FairSchedulingMode.class
+ .getCanonicalName());
+ assertTrue("Invalid scheduler name",
+ sm.getName().equals(FairSchedulingMode.NAME));
+
+ // Class
+ sm = SchedulingMode.getInstance(FairSchedulingMode.class);
+ assertTrue("Invalid scheduler name",
+ sm.getName().equals(FairSchedulingMode.NAME));
+
+ // Shortname - fair
+ sm = SchedulingMode.parse("fair");
+ assertTrue("Invalid scheduler name",
+ sm.getName().equals(FairSchedulingMode.NAME));
+
+ // Shortname - fifo
+ sm = SchedulingMode.parse("fifo");
+ assertTrue("Invalid scheduler name",
+ sm.getName().equals(FifoSchedulingMode.NAME));
+ }
+}