You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:51:01 UTC
svn commit: r1619012 [15/26] - in
/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project: ./
hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-api/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api...
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java Tue Aug 19 23:49:39 2014
@@ -68,7 +68,9 @@ public class AllocationFileLoaderService
* (this is done to prevent loading a file that hasn't been fully written).
*/
public static final long ALLOC_RELOAD_WAIT_MS = 5 * 1000;
-
+
+ public static final long THREAD_JOIN_TIMEOUT_MS = 1000;
+
private final Clock clock;
private long lastSuccessfulReload; // Last time we successfully reloaded queues
@@ -96,58 +98,69 @@ public class AllocationFileLoaderService
}
@Override
- public void init(Configuration conf) {
+ public void serviceInit(Configuration conf) throws Exception {
this.allocFile = getAllocationFile(conf);
- super.init(conf);
- }
-
- @Override
- public void start() {
- if (allocFile == null) {
- return;
- }
- reloadThread = new Thread() {
- public void run() {
- while (running) {
- long time = clock.getTime();
- long lastModified = allocFile.lastModified();
- if (lastModified > lastSuccessfulReload &&
- time > lastModified + ALLOC_RELOAD_WAIT_MS) {
- try {
- reloadAllocations();
- } catch (Exception ex) {
+ if (allocFile != null) {
+ reloadThread = new Thread() {
+ @Override
+ public void run() {
+ while (running) {
+ long time = clock.getTime();
+ long lastModified = allocFile.lastModified();
+ if (lastModified > lastSuccessfulReload &&
+ time > lastModified + ALLOC_RELOAD_WAIT_MS) {
+ try {
+ reloadAllocations();
+ } catch (Exception ex) {
+ if (!lastReloadAttemptFailed) {
+ LOG.error("Failed to reload fair scheduler config file - " +
+ "will use existing allocations.", ex);
+ }
+ lastReloadAttemptFailed = true;
+ }
+ } else if (lastModified == 0l) {
if (!lastReloadAttemptFailed) {
- LOG.error("Failed to reload fair scheduler config file - " +
- "will use existing allocations.", ex);
+ LOG.warn("Failed to reload fair scheduler config file because" +
+ " last modified returned 0. File exists: "
+ + allocFile.exists());
}
lastReloadAttemptFailed = true;
}
- } else if (lastModified == 0l) {
- if (!lastReloadAttemptFailed) {
- LOG.warn("Failed to reload fair scheduler config file because" +
- " last modified returned 0. File exists: " + allocFile.exists());
+ try {
+ Thread.sleep(reloadIntervalMs);
+ } catch (InterruptedException ex) {
+ LOG.info(
+ "Interrupted while waiting to reload alloc configuration");
}
- lastReloadAttemptFailed = true;
- }
- try {
- Thread.sleep(reloadIntervalMs);
- } catch (InterruptedException ex) {
- LOG.info("Interrupted while waiting to reload alloc configuration");
}
}
- }
- };
- reloadThread.setName("AllocationFileReloader");
- reloadThread.setDaemon(true);
- reloadThread.start();
- super.start();
+ };
+ reloadThread.setName("AllocationFileReloader");
+ reloadThread.setDaemon(true);
+ }
+ super.serviceInit(conf);
}
@Override
- public void stop() {
+ public void serviceStart() throws Exception {
+ if (reloadThread != null) {
+ reloadThread.start();
+ }
+ super.serviceStart();
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
running = false;
- reloadThread.interrupt();
- super.stop();
+ if (reloadThread != null) {
+ reloadThread.interrupt();
+ try {
+ reloadThread.join(THREAD_JOIN_TIMEOUT_MS);
+ } catch (InterruptedException e) {
+ LOG.warn("reloadThread fails to join.");
+ }
+ }
+ super.serviceStop();
}
/**
@@ -200,6 +213,7 @@ public class AllocationFileLoaderService
Map<String, Resource> maxQueueResources = new HashMap<String, Resource>();
Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
+ Map<String, Float> queueMaxAMShares = new HashMap<String, Float>();
Map<String, ResourceWeights> queueWeights = new HashMap<String, ResourceWeights>();
Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, SchedulingPolicy>();
Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
@@ -207,6 +221,7 @@ public class AllocationFileLoaderService
new HashMap<String, Map<QueueACL, AccessControlList>>();
int userMaxAppsDefault = Integer.MAX_VALUE;
int queueMaxAppsDefault = Integer.MAX_VALUE;
+ float queueMaxAMShareDefault = -1.0f;
long fairSharePreemptionTimeout = Long.MAX_VALUE;
long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY;
@@ -214,8 +229,15 @@ public class AllocationFileLoaderService
QueuePlacementPolicy newPlacementPolicy = null;
// Remember all queue names so we can display them on web UI, etc.
- Set<String> queueNamesInAllocFile = new HashSet<String>();
-
+ // configuredQueues is segregated based on whether it is a leaf queue
+ // or a parent queue. This information is used for creating queues
+ // and also for making queue placement decisions(QueuePlacementRule.java).
+ Map<FSQueueType, Set<String>> configuredQueues =
+ new HashMap<FSQueueType, Set<String>>();
+ for (FSQueueType queueType : FSQueueType.values()) {
+ configuredQueues.put(queueType, new HashSet<String>());
+ }
+
// Read and parse the allocations file.
DocumentBuilderFactory docBuilderFactory =
DocumentBuilderFactory.newInstance();
@@ -266,6 +288,11 @@ public class AllocationFileLoaderService
String text = ((Text)element.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
queueMaxAppsDefault = val;
+ } else if ("queueMaxAMShareDefault".equals(element.getTagName())) {
+ String text = ((Text)element.getFirstChild()).getData().trim();
+ float val = Float.parseFloat(text);
+ val = Math.min(val, 1.0f);
+ queueMaxAMShareDefault = val;
} else if ("defaultQueueSchedulingPolicy".equals(element.getTagName())
|| "defaultQueueSchedulingMode".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
@@ -289,26 +316,27 @@ public class AllocationFileLoaderService
}
parent = null;
}
- loadQueue(parent, element, minQueueResources, maxQueueResources, queueMaxApps,
- userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts,
- queueAcls, queueNamesInAllocFile);
+ loadQueue(parent, element, minQueueResources, maxQueueResources,
+ queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
+ queuePolicies, minSharePreemptionTimeouts, queueAcls,
+ configuredQueues);
}
// Load placement policy and pass it configured queues
Configuration conf = getConfig();
if (placementPolicyElement != null) {
newPlacementPolicy = QueuePlacementPolicy.fromXml(placementPolicyElement,
- queueNamesInAllocFile, conf);
+ configuredQueues, conf);
} else {
newPlacementPolicy = QueuePlacementPolicy.fromConfiguration(conf,
- queueNamesInAllocFile);
+ configuredQueues);
}
AllocationConfiguration info = new AllocationConfiguration(minQueueResources, maxQueueResources,
- queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault,
- queueMaxAppsDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts,
+ queueMaxApps, userMaxApps, queueWeights, queueMaxAMShares, userMaxAppsDefault,
+ queueMaxAppsDefault, queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts,
queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout,
- newPlacementPolicy, queueNamesInAllocFile);
+ newPlacementPolicy, configuredQueues);
lastSuccessfulReload = clock.getTime();
lastReloadAttemptFailed = false;
@@ -321,10 +349,12 @@ public class AllocationFileLoaderService
*/
private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources,
Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
- Map<String, Integer> userMaxApps, Map<String, ResourceWeights> queueWeights,
+ Map<String, Integer> userMaxApps, Map<String, Float> queueMaxAMShares,
+ Map<String, ResourceWeights> queueWeights,
Map<String, SchedulingPolicy> queuePolicies,
Map<String, Long> minSharePreemptionTimeouts,
- Map<String, Map<QueueACL, AccessControlList>> queueAcls, Set<String> queueNamesInAllocFile)
+ Map<String, Map<QueueACL, AccessControlList>> queueAcls,
+ Map<FSQueueType, Set<String>> configuredQueues)
throws AllocationConfigurationException {
String queueName = element.getAttribute("name");
if (parentName != null) {
@@ -352,6 +382,11 @@ public class AllocationFileLoaderService
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
queueMaxApps.put(queueName, val);
+ } else if ("maxAMShare".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ float val = Float.parseFloat(text);
+ val = Math.min(val, 1.0f);
+ queueMaxAMShares.put(queueName, val);
} else if ("weight".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
double val = Double.parseDouble(text);
@@ -374,14 +409,21 @@ public class AllocationFileLoaderService
} else if ("queue".endsWith(field.getTagName()) ||
"pool".equals(field.getTagName())) {
loadQueue(queueName, field, minQueueResources, maxQueueResources,
- queueMaxApps, userMaxApps, queueWeights, queuePolicies,
- minSharePreemptionTimeouts,
- queueAcls, queueNamesInAllocFile);
+ queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
+ queuePolicies, minSharePreemptionTimeouts, queueAcls,
+ configuredQueues);
+ configuredQueues.get(FSQueueType.PARENT).add(queueName);
isLeaf = false;
}
}
if (isLeaf) {
- queueNamesInAllocFile.add(queueName);
+ // if a leaf in the alloc file is marked as type='parent'
+ // then store it under 'parent'
+ if ("parent".equals(element.getAttribute("type"))) {
+ configuredQueues.get(FSQueueType.PARENT).add(queueName);
+ } else {
+ configuredQueues.get(FSQueueType.LEAF).add(queueName);
+ }
}
queueAcls.put(queueName, acls);
if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName)
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java Tue Aug 19 23:49:39 2014
@@ -33,21 +33,22 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
@Private
@Unstable
public class FSLeafQueue extends FSQueue {
private static final Log LOG = LogFactory.getLog(
FSLeafQueue.class.getName());
-
- private final List<AppSchedulable> runnableAppScheds = // apps that are runnable
- new ArrayList<AppSchedulable>();
- private final List<AppSchedulable> nonRunnableAppScheds =
- new ArrayList<AppSchedulable>();
+
+ private final List<FSAppAttempt> runnableApps = // apps that are runnable
+ new ArrayList<FSAppAttempt>();
+ private final List<FSAppAttempt> nonRunnableApps =
+ new ArrayList<FSAppAttempt>();
private Resource demand = Resources.createResource(0);
@@ -55,6 +56,9 @@ public class FSLeafQueue extends FSQueue
private long lastTimeAtMinShare;
private long lastTimeAtHalfFairShare;
+ // Track the AM resource usage for this queue
+ private Resource amResourceUsage;
+
private final ActiveUsersManager activeUsersManager;
public FSLeafQueue(String name, FairScheduler scheduler,
@@ -63,31 +67,34 @@ public class FSLeafQueue extends FSQueue
this.lastTimeAtMinShare = scheduler.getClock().getTime();
this.lastTimeAtHalfFairShare = scheduler.getClock().getTime();
activeUsersManager = new ActiveUsersManager(getMetrics());
+ amResourceUsage = Resource.newInstance(0, 0);
}
- public void addApp(FSSchedulerApp app, boolean runnable) {
- AppSchedulable appSchedulable = new AppSchedulable(scheduler, app, this);
- app.setAppSchedulable(appSchedulable);
+ public void addApp(FSAppAttempt app, boolean runnable) {
if (runnable) {
- runnableAppScheds.add(appSchedulable);
+ runnableApps.add(app);
} else {
- nonRunnableAppScheds.add(appSchedulable);
+ nonRunnableApps.add(app);
}
}
// for testing
- void addAppSchedulable(AppSchedulable appSched) {
- runnableAppScheds.add(appSched);
+ void addAppSchedulable(FSAppAttempt appSched) {
+ runnableApps.add(appSched);
}
/**
* Removes the given app from this queue.
* @return whether or not the app was runnable
*/
- public boolean removeApp(FSSchedulerApp app) {
- if (runnableAppScheds.remove(app.getAppSchedulable())) {
+ public boolean removeApp(FSAppAttempt app) {
+ if (runnableApps.remove(app)) {
+ // Update AM resource usage
+ if (app.isAmRunning() && app.getAMResource() != null) {
+ Resources.subtractFrom(amResourceUsage, app.getAMResource());
+ }
return true;
- } else if (nonRunnableAppScheds.remove(app.getAppSchedulable())) {
+ } else if (nonRunnableApps.remove(app)) {
return false;
} else {
throw new IllegalStateException("Given app to remove " + app +
@@ -95,22 +102,22 @@ public class FSLeafQueue extends FSQueue
}
}
- public Collection<AppSchedulable> getRunnableAppSchedulables() {
- return runnableAppScheds;
+ public Collection<FSAppAttempt> getRunnableAppSchedulables() {
+ return runnableApps;
}
- public List<AppSchedulable> getNonRunnableAppSchedulables() {
- return nonRunnableAppScheds;
+ public List<FSAppAttempt> getNonRunnableAppSchedulables() {
+ return nonRunnableApps;
}
@Override
public void collectSchedulerApplications(
Collection<ApplicationAttemptId> apps) {
- for (AppSchedulable appSched : runnableAppScheds) {
- apps.add(appSched.getApp().getApplicationAttemptId());
+ for (FSAppAttempt appSched : runnableApps) {
+ apps.add(appSched.getApplicationAttemptId());
}
- for (AppSchedulable appSched : nonRunnableAppScheds) {
- apps.add(appSched.getApp().getApplicationAttemptId());
+ for (FSAppAttempt appSched : nonRunnableApps) {
+ apps.add(appSched.getApplicationAttemptId());
}
}
@@ -136,15 +143,19 @@ public class FSLeafQueue extends FSQueue
@Override
public Resource getResourceUsage() {
Resource usage = Resources.createResource(0);
- for (AppSchedulable app : runnableAppScheds) {
+ for (FSAppAttempt app : runnableApps) {
Resources.addTo(usage, app.getResourceUsage());
}
- for (AppSchedulable app : nonRunnableAppScheds) {
+ for (FSAppAttempt app : nonRunnableApps) {
Resources.addTo(usage, app.getResourceUsage());
}
return usage;
}
+ public Resource getAmResourceUsage() {
+ return amResourceUsage;
+ }
+
@Override
public void updateDemand() {
// Compute demand by iterating through apps in the queue
@@ -152,13 +163,13 @@ public class FSLeafQueue extends FSQueue
Resource maxRes = scheduler.getAllocationConfiguration()
.getMaxResources(getName());
demand = Resources.createResource(0);
- for (AppSchedulable sched : runnableAppScheds) {
+ for (FSAppAttempt sched : runnableApps) {
if (Resources.equals(demand, maxRes)) {
break;
}
updateDemandForApp(sched, maxRes);
}
- for (AppSchedulable sched : nonRunnableAppScheds) {
+ for (FSAppAttempt sched : nonRunnableApps) {
if (Resources.equals(demand, maxRes)) {
break;
}
@@ -170,7 +181,7 @@ public class FSLeafQueue extends FSQueue
}
}
- private void updateDemandForApp(AppSchedulable sched, Resource maxRes) {
+ private void updateDemandForApp(FSAppAttempt sched, Resource maxRes) {
sched.updateDemand();
Resource toAdd = sched.getDemand();
if (LOG.isDebugEnabled()) {
@@ -194,9 +205,9 @@ public class FSLeafQueue extends FSQueue
}
Comparator<Schedulable> comparator = policy.getComparator();
- Collections.sort(runnableAppScheds, comparator);
- for (AppSchedulable sched : runnableAppScheds) {
- if (SchedulerAppUtils.isBlacklisted(sched.getApp(), node, LOG)) {
+ Collections.sort(runnableApps, comparator);
+ for (FSAppAttempt sched : runnableApps) {
+ if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) {
continue;
}
@@ -209,6 +220,37 @@ public class FSLeafQueue extends FSQueue
}
@Override
+ public RMContainer preemptContainer() {
+ RMContainer toBePreempted = null;
+
+ // If this queue is not over its fair share, reject
+ if (!preemptContainerPreCheck()) {
+ return toBePreempted;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Queue " + getName() + " is going to preempt a container " +
+ "from its applications.");
+ }
+
+ // Choose the app that is most over fair share
+ Comparator<Schedulable> comparator = policy.getComparator();
+ FSAppAttempt candidateSched = null;
+ for (FSAppAttempt sched : runnableApps) {
+ if (candidateSched == null ||
+ comparator.compare(sched, candidateSched) > 0) {
+ candidateSched = sched;
+ }
+ }
+
+ // Preempt from the selected app
+ if (candidateSched != null) {
+ toBePreempted = candidateSched.preemptContainer();
+ }
+ return toBePreempted;
+ }
+
+ @Override
public List<FSQueue> getChildQueues() {
return new ArrayList<FSQueue>(1);
}
@@ -247,11 +289,52 @@ public class FSLeafQueue extends FSQueue
@Override
public int getNumRunnableApps() {
- return runnableAppScheds.size();
+ return runnableApps.size();
}
@Override
public ActiveUsersManager getActiveUsersManager() {
return activeUsersManager;
}
+
+ /**
+ * Check whether this queue can run this application master under the
+ * maxAMShare limit
+ *
+ * @param amResource
+ * @return true if this queue can run
+ */
+ public boolean canRunAppAM(Resource amResource) {
+ float maxAMShare =
+ scheduler.getAllocationConfiguration().getQueueMaxAMShare(getName());
+ if (Math.abs(maxAMShare - -1.0f) < 0.0001) {
+ return true;
+ }
+ Resource maxAMResource = Resources.multiply(getFairShare(), maxAMShare);
+ Resource ifRunAMResource = Resources.add(amResourceUsage, amResource);
+ return !policy
+ .checkIfAMResourceUsageOverLimit(ifRunAMResource, maxAMResource);
+ }
+
+ public void addAMResourceUsage(Resource amResource) {
+ if (amResource != null) {
+ Resources.addTo(amResourceUsage, amResource);
+ }
+ }
+
+ @Override
+ public void recoverContainer(Resource clusterResource,
+ SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
+ // TODO Auto-generated method stub
+ }
+
+ /**
+ * Helper method to check if the queue should preempt containers
+ *
+ * @return true if check passes (can preempt) or false otherwise
+ */
+ private boolean preemptContainerPreCheck() {
+ return parent.getPolicy().checkIfUsageOverFairShare(getResourceUsage(),
+ getFairShare());
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java Tue Aug 19 23:49:39 2014
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.re
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -32,8 +33,11 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@Private
@Unstable
@@ -157,6 +161,27 @@ public class FSParentQueue extends FSQue
}
@Override
+ public RMContainer preemptContainer() {
+ RMContainer toBePreempted = null;
+
+ // Find the childQueue which is most over fair share
+ FSQueue candidateQueue = null;
+ Comparator<Schedulable> comparator = policy.getComparator();
+ for (FSQueue queue : childQueues) {
+ if (candidateQueue == null ||
+ comparator.compare(queue, candidateQueue) > 0) {
+ candidateQueue = queue;
+ }
+ }
+
+ // Let the selected queue choose which of its container to preempt
+ if (candidateQueue != null) {
+ toBePreempted = candidateQueue.preemptContainer();
+ }
+ return toBePreempted;
+ }
+
+ @Override
public List<FSQueue> getChildQueues() {
return childQueues;
}
@@ -200,4 +225,11 @@ public class FSParentQueue extends FSQue
// Should never be called since all applications are submitted to LeafQueues
return null;
}
+
+ @Override
+ public void recoverContainer(Resource clusterResource,
+ SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
+ // TODO Auto-generated method stub
+
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java Tue Aug 19 23:49:39 2014
@@ -39,7 +39,8 @@ import org.apache.hadoop.yarn.util.resou
@Private
@Unstable
-public abstract class FSQueue extends Schedulable implements Queue {
+public abstract class FSQueue implements Queue, Schedulable {
+ private Resource fairShare = Resources.createResource(0, 0);
private final String name;
protected final FairScheduler scheduler;
private final FSQueueMetrics metrics;
@@ -119,9 +120,9 @@ public abstract class FSQueue extends Sc
// TODO: we might change these queue metrics around a little bit
// to match the semantics of the fair scheduler.
queueInfo.setCapacity((float) getFairShare().getMemory() /
- scheduler.getClusterCapacity().getMemory());
+ scheduler.getClusterResource().getMemory());
queueInfo.setCapacity((float) getResourceUsage().getMemory() /
- scheduler.getClusterCapacity().getMemory());
+ scheduler.getClusterResource().getMemory());
ArrayList<QueueInfo> childQueueInfos = new ArrayList<QueueInfo>();
if (includeChildQueues) {
@@ -139,10 +140,15 @@ public abstract class FSQueue extends Sc
public FSQueueMetrics getMetrics() {
return metrics;
}
-
+
+ /** Get the fair share assigned to this Schedulable. */
+ public Resource getFairShare() {
+ return fairShare;
+ }
+
@Override
public void setFairShare(Resource fairShare) {
- super.setFairShare(fairShare);
+ this.fairShare = fairShare;
metrics.setFairShare(fairShare);
}
@@ -187,4 +193,16 @@ public abstract class FSQueue extends Sc
}
return true;
}
+
+ @Override
+ public boolean isActive() {
+ return getNumRunnableApps() > 0;
+ }
+
+ /** Convenient toString implementation for debugging. */
+ @Override
+ public String toString() {
+ return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]",
+ getName(), getDemand(), getResourceUsage(), fairShare, getWeights());
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java Tue Aug 19 23:49:39 2014
@@ -18,28 +18,16 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.apache.hadoop.yarn.util.resource.Resources;
@Private
@Unstable
@@ -47,208 +35,56 @@ public class FSSchedulerNode extends Sch
private static final Log LOG = LogFactory.getLog(FSSchedulerNode.class);
- private static final RecordFactory recordFactory = RecordFactoryProvider
- .getRecordFactory(null);
-
- private Resource availableResource;
- private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
- private Resource totalResourceCapability;
-
- private volatile int numContainers;
-
- private RMContainer reservedContainer;
- private AppSchedulable reservedAppSchedulable;
-
- /* set of containers that are allocated containers */
- private final Map<ContainerId, RMContainer> launchedContainers =
- new HashMap<ContainerId, RMContainer>();
-
- private final RMNode rmNode;
- private final String nodeName;
+ private FSAppAttempt reservedAppSchedulable;
public FSSchedulerNode(RMNode node, boolean usePortForNodeName) {
- this.rmNode = node;
- this.availableResource = Resources.clone(node.getTotalCapability());
- totalResourceCapability =
- Resource.newInstance(node.getTotalCapability().getMemory(), node
- .getTotalCapability().getVirtualCores());
- if (usePortForNodeName) {
- nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort();
- } else {
- nodeName = rmNode.getHostName();
- }
- }
-
- public RMNode getRMNode() {
- return rmNode;
- }
-
- public NodeId getNodeID() {
- return rmNode.getNodeID();
- }
-
- public String getHttpAddress() {
- return rmNode.getHttpAddress();
- }
-
- @Override
- public String getNodeName() {
- return nodeName;
+ super(node, usePortForNodeName);
}
@Override
- public String getRackName() {
- return rmNode.getRackName();
- }
-
- /**
- * The Scheduler has allocated containers on this node to the
- * given application.
- *
- * @param applicationId application
- * @param rmContainer allocated container
- */
- public synchronized void allocateContainer(ApplicationId applicationId,
- RMContainer rmContainer) {
- Container container = rmContainer.getContainer();
- deductAvailableResource(container.getResource());
- ++numContainers;
-
- launchedContainers.put(container.getId(), rmContainer);
-
- LOG.info("Assigned container " + container.getId() +
- " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() +
- ", which currently has " + numContainers + " containers, " +
- getUsedResource() + " used and " +
- getAvailableResource() + " available");
- }
-
- @Override
- public synchronized Resource getAvailableResource() {
- return availableResource;
- }
-
- @Override
- public synchronized Resource getUsedResource() {
- return usedResource;
- }
-
- private synchronized boolean isValidContainer(Container c) {
- if (launchedContainers.containsKey(c.getId())) {
- return true;
- }
- return false;
- }
-
- private synchronized void updateResource(Container container) {
- addAvailableResource(container.getResource());
- --numContainers;
- }
-
- /**
- * Release an allocated container on this node.
- * @param container container to be released
- */
- public synchronized void releaseContainer(Container container) {
- if (!isValidContainer(container)) {
- LOG.error("Invalid container released " + container);
- return;
- }
-
- /* remove the containers from the nodemanger */
- launchedContainers.remove(container.getId());
- updateResource(container);
-
- LOG.info("Released container " + container.getId() +
- " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() +
- ", which currently has " + numContainers + " containers, " +
- getUsedResource() + " used and " + getAvailableResource()
- + " available" + ", release resources=" + true);
- }
-
-
- private synchronized void addAvailableResource(Resource resource) {
- if (resource == null) {
- LOG.error("Invalid resource addition of null resource for "
- + rmNode.getNodeAddress());
- return;
- }
- Resources.addTo(availableResource, resource);
- Resources.subtractFrom(usedResource, resource);
- }
-
- @Override
- public Resource getTotalResource() {
- return this.totalResourceCapability;
- }
-
- private synchronized void deductAvailableResource(Resource resource) {
- if (resource == null) {
- LOG.error("Invalid deduction of null resource for "
- + rmNode.getNodeAddress());
- return;
- }
- Resources.subtractFrom(availableResource, resource);
- Resources.addTo(usedResource, resource);
- }
-
- @Override
- public String toString() {
- return "host: " + rmNode.getNodeAddress() + " #containers=" + getNumContainers() +
- " available=" + getAvailableResource() +
- " used=" + getUsedResource();
- }
-
- @Override
- public int getNumContainers() {
- return numContainers;
- }
-
- public synchronized List<RMContainer> getRunningContainers() {
- return new ArrayList<RMContainer>(launchedContainers.values());
- }
-
public synchronized void reserveResource(
- FSSchedulerApp application, Priority priority,
- RMContainer reservedContainer) {
+ SchedulerApplicationAttempt application, Priority priority,
+ RMContainer container) {
// Check if it's already reserved
- if (this.reservedContainer != null) {
+ RMContainer reservedContainer = getReservedContainer();
+ if (reservedContainer != null) {
// Sanity check
- if (!reservedContainer.getContainer().getNodeId().equals(getNodeID())) {
+ if (!container.getContainer().getNodeId().equals(getNodeID())) {
throw new IllegalStateException("Trying to reserve" +
- " container " + reservedContainer +
- " on node " + reservedContainer.getReservedNode() +
- " when currently" + " reserved resource " + this.reservedContainer +
- " on node " + this.reservedContainer.getReservedNode());
+ " container " + container +
+ " on node " + container.getReservedNode() +
+ " when currently" + " reserved resource " + reservedContainer +
+ " on node " + reservedContainer.getReservedNode());
}
// Cannot reserve more than one application on a given node!
- if (!this.reservedContainer.getContainer().getId().getApplicationAttemptId().equals(
- reservedContainer.getContainer().getId().getApplicationAttemptId())) {
+ if (!reservedContainer.getContainer().getId().getApplicationAttemptId()
+ .equals(container.getContainer().getId().getApplicationAttemptId())) {
throw new IllegalStateException("Trying to reserve" +
- " container " + reservedContainer +
+ " container " + container +
" for application " + application.getApplicationId() +
" when currently" +
- " reserved container " + this.reservedContainer +
+ " reserved container " + reservedContainer +
" on node " + this);
}
LOG.info("Updated reserved container " +
- reservedContainer.getContainer().getId() + " on node " +
+ container.getContainer().getId() + " on node " +
this + " for application " + application);
} else {
- LOG.info("Reserved container " + reservedContainer.getContainer().getId() +
+ LOG.info("Reserved container " + container.getContainer().getId() +
" on node " + this + " for application " + application);
}
- this.reservedContainer = reservedContainer;
- this.reservedAppSchedulable = application.getAppSchedulable();
+ setReservedContainer(container);
+ this.reservedAppSchedulable = (FSAppAttempt) application;
}
+ @Override
public synchronized void unreserveResource(
- FSSchedulerApp application) {
+ SchedulerApplicationAttempt application) {
// Cannot unreserve for wrong application...
ApplicationAttemptId reservedApplication =
- reservedContainer.getContainer().getId().getApplicationAttemptId();
+ getReservedContainer().getContainer().getId().getApplicationAttemptId();
if (!reservedApplication.equals(
application.getApplicationAttemptId())) {
throw new IllegalStateException("Trying to unreserve " +
@@ -258,22 +94,11 @@ public class FSSchedulerNode extends Sch
" on node " + this);
}
- this.reservedContainer = null;
+ setReservedContainer(null);
this.reservedAppSchedulable = null;
}
- public synchronized RMContainer getReservedContainer() {
- return reservedContainer;
- }
-
- public synchronized AppSchedulable getReservedAppSchedulable() {
+ public synchronized FSAppAttempt getReservedAppSchedulable() {
return reservedAppSchedulable;
}
-
- @Override
- public synchronized void applyDeltaOnAvailableResource(Resource deltaResource) {
- // we can only adjust available resource if total resource is changed.
- Resources.addTo(this.availableResource, deltaResource);
- }
-
}