You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by aw...@apache.org on 2015/07/21 22:12:05 UTC
[11/16] hadoop git commit: YARN-2003. Support for Application
priority : Changes in RM and Capacity Scheduler. (Sunil G via wangda)
YARN-2003. Support for Application priority : Changes in RM and Capacity Scheduler. (Sunil G via wangda)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c39ca541
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c39ca541
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c39ca541
Branch: refs/heads/HADOOP-12111
Commit: c39ca541f498712133890961598bbff50d89d68b
Parents: da2d1ac
Author: Wangda Tan <wa...@apache.org>
Authored: Tue Jul 21 09:56:59 2015 -0700
Committer: Wangda Tan <wa...@apache.org>
Committed: Tue Jul 21 09:57:23 2015 -0700
----------------------------------------------------------------------
.../sls/scheduler/ResourceSchedulerWrapper.java | 10 +
hadoop-yarn-project/CHANGES.txt | 3 +
.../hadoop/yarn/conf/YarnConfiguration.java | 5 +
.../server/resourcemanager/RMAppManager.java | 20 +-
.../server/resourcemanager/rmapp/RMAppImpl.java | 15 +-
.../scheduler/AbstractYarnScheduler.java | 10 +
.../server/resourcemanager/scheduler/Queue.java | 8 +
.../scheduler/SchedulerApplication.java | 22 ++
.../scheduler/SchedulerApplicationAttempt.java | 15 +-
.../scheduler/YarnScheduler.java | 20 ++
.../scheduler/capacity/AbstractCSQueue.java | 7 +
.../scheduler/capacity/CapacityScheduler.java | 73 +++-
.../CapacitySchedulerConfiguration.java | 13 +
.../scheduler/capacity/LeafQueue.java | 19 +-
.../scheduler/common/fica/FiCaSchedulerApp.java | 8 +
.../scheduler/event/AppAddedSchedulerEvent.java | 28 +-
.../resourcemanager/scheduler/fair/FSQueue.java | 6 +
.../scheduler/fifo/FifoScheduler.java | 6 +
.../scheduler/policy/FifoComparator.java | 11 +-
.../scheduler/policy/SchedulableEntity.java | 5 +
.../yarn/server/resourcemanager/MockRM.java | 31 +-
.../server/resourcemanager/TestAppManager.java | 1 +
.../TestWorkPreservingRMRestart.java | 2 +-
...pacityPreemptionPolicyForNodePartitions.java | 1 +
.../capacity/TestApplicationLimits.java | 5 +-
.../capacity/TestApplicationPriority.java | 345 +++++++++++++++++++
.../capacity/TestCapacityScheduler.java | 5 +
.../scheduler/policy/MockSchedulableEntity.java | 13 +-
.../security/TestDelegationTokenRenewer.java | 10 +-
.../TestRMWebServicesAppsModification.java | 2 +-
30 files changed, 664 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c39ca541/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
index 08cb1e6..14e2645 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
@@ -949,4 +950,13 @@ final public class ResourceSchedulerWrapper
ContainerStatus containerStatus, RMContainerEventType event) {
// do nothing
}
+
+ @Override
+ public Priority checkAndGetApplicationPriority(Priority priority,
+ String user, String queueName, ApplicationId applicationId)
+ throws YarnException {
+ // TODO Dummy implementation.
+ return Priority.newInstance(0);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c39ca541/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index d0829c1..7259cf2 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -139,6 +139,9 @@ Release 2.8.0 - UNRELEASED
YARN-3116. RM notifies NM whether a container is an AM container or normal
task container. (Giovanni Matteo Fumarola via zjshen)
+ YARN-2003. Support for Application priority : Changes in RM and Capacity
+ Scheduler. (Sunil G via wangda)
+
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c39ca541/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 6b660f7..060635f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1928,6 +1928,11 @@ public class YarnConfiguration extends Configuration {
public static final String DEFAULT_NODELABEL_CONFIGURATION_TYPE =
CENTALIZED_NODELABEL_CONFIGURATION_TYPE;
+ public static final String MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY =
+ YARN_PREFIX + "cluster.max-application-priority";
+
+ public static final int DEFAULT_CLUSTER_LEVEL_APPLICATION_PRIORITY = 0;
+
@Private
public static boolean isDistributedNodeLabelConfiguration(Configuration conf) {
return DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE.equals(conf.get(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c39ca541/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index 2d9431d..6fd1838 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -329,14 +330,19 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
ResourceRequest amReq =
validateAndCreateResourceRequest(submissionContext, isRecovery);
+ // Verify and get the update application priority and set back to
+ // submissionContext
+ Priority appPriority = rmContext.getScheduler()
+ .checkAndGetApplicationPriority(submissionContext.getPriority(), user,
+ submissionContext.getQueue(), applicationId);
+ submissionContext.setPriority(appPriority);
+
// Create RMApp
- RMAppImpl application =
- new RMAppImpl(applicationId, rmContext, this.conf,
- submissionContext.getApplicationName(), user,
- submissionContext.getQueue(),
- submissionContext, this.scheduler, this.masterService,
- submitTime, submissionContext.getApplicationType(),
- submissionContext.getApplicationTags(), amReq);
+ RMAppImpl application = new RMAppImpl(applicationId, rmContext, this.conf,
+ submissionContext.getApplicationName(), user,
+ submissionContext.getQueue(), submissionContext, this.scheduler,
+ this.masterService, submitTime, submissionContext.getApplicationType(),
+ submissionContext.getApplicationTags(), amReq);
// Concurrent app submissions with same applicationId will fail here
// Concurrent app submissions with different applicationIds will not
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c39ca541/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 62d5555..d480c24 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -924,17 +924,15 @@ public class RMAppImpl implements RMApp, Recoverable {
// No existent attempts means the attempt associated with this app was not
// started or started but not yet saved.
if (app.attempts.isEmpty()) {
- app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId,
- app.submissionContext.getQueue(), app.user,
- app.submissionContext.getReservationID()));
+ app.scheduler.handle(new AppAddedSchedulerEvent(app.user,
+ app.submissionContext, false));
return RMAppState.SUBMITTED;
}
// Add application to scheduler synchronously to guarantee scheduler
// knows applications before AM or NM re-registers.
- app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId,
- app.submissionContext.getQueue(), app.user, true,
- app.submissionContext.getReservationID()));
+ app.scheduler.handle(new AppAddedSchedulerEvent(app.user,
+ app.submissionContext, true));
// recover attempts
app.recoverAppAttempts();
@@ -960,9 +958,8 @@ public class RMAppImpl implements RMApp, Recoverable {
RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
- app.handler.handle(new AppAddedSchedulerEvent(app.applicationId,
- app.submissionContext.getQueue(), app.user,
- app.submissionContext.getReservationID()));
+ app.handler.handle(new AppAddedSchedulerEvent(app.user,
+ app.submissionContext, false));
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c39ca541/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index aad76fd..094f77d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -691,4 +692,13 @@ public abstract class AbstractYarnScheduler
}
return null;
}
+
+ @Override
+ public Priority checkAndGetApplicationPriority(Priority priorityFromContext,
+ String user, String queueName, ApplicationId applicationId)
+ throws YarnException {
+ // Dummy Implementation till Application Priority changes are done in
+ // specific scheduler.
+ return Priority.newInstance(0);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c39ca541/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
index 02003c1..8646381 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
@@ -24,6 +24,7 @@ import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
@@ -110,4 +111,11 @@ public interface Queue {
* new resource asked
*/
public void decPendingResource(String nodeLabel, Resource resourceToDec);
+
+ /**
+ * Get the Default Application Priority for this queue
+ *
+ * @return default application priority
+ */
+ public Priority getDefaultApplicationPriority();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c39ca541/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
index 2c788aa..519de98 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@Private
@@ -28,10 +29,18 @@ public class SchedulerApplication<T extends SchedulerApplicationAttempt> {
private Queue queue;
private final String user;
private T currentAttempt;
+ private volatile Priority priority;
public SchedulerApplication(Queue queue, String user) {
this.queue = queue;
this.user = user;
+ this.priority = null;
+ }
+
+ public SchedulerApplication(Queue queue, String user, Priority priority) {
+ this.queue = queue;
+ this.user = user;
+ this.priority = priority;
}
public Queue getQueue() {
@@ -58,4 +67,17 @@ public class SchedulerApplication<T extends SchedulerApplicationAttempt> {
queue.getMetrics().finishApp(user, rmAppFinalState);
}
+ public Priority getPriority() {
+ return priority;
+ }
+
+ public void setPriority(Priority priority) {
+ this.priority = priority;
+
+ // Also set priority in current running attempt
+ if (null != currentAttempt) {
+ currentAttempt.setPriority(priority);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c39ca541/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 475f2c7..cf543bd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -97,7 +97,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
private boolean unmanagedAM = true;
private boolean amRunning = false;
private LogAggregationContext logAggregationContext;
-
+
+ private Priority appPriority = null;
+
protected ResourceUsage attemptResourceUsage = new ResourceUsage();
private AtomicLong firstAllocationRequestSentTime = new AtomicLong(0);
private AtomicLong firstContainerAllocatedTime = new AtomicLong(0);
@@ -726,7 +728,16 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
public ResourceUsage getAppAttemptResourceUsage() {
return this.attemptResourceUsage;
}
-
+
+ @Override
+ public Priority getPriority() {
+ return appPriority;
+ }
+
+ public void setPriority(Priority appPriority) {
+ this.appPriority = appPriority;
+ }
+
@Override
public String getId() {
return getApplicationId().toString();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c39ca541/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
index b99b217..f629579 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
@@ -286,4 +287,23 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
* @return an EnumSet containing the resource types
*/
public EnumSet<SchedulerResourceTypes> getSchedulingResourceTypes();
+
+ /**
+ *
+ * Verify whether a submitted application priority is valid as per configured
+ * Queue
+ *
+ * @param priorityFromContext
+ * Submitted Application priority.
+ * @param user
+ * User who submitted the Application
+ * @param queueName
+ * Name of the Queue
+ * @param applicationId
+ * Application ID
+ * @return Updated Priority from scheduler
+ */
+ public Priority checkAndGetApplicationPriority(Priority priorityFromContext,
+ String user, String queueName, ApplicationId applicationId)
+ throws YarnException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c39ca541/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index cd5bd8d..7f8e164 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
@@ -574,4 +575,10 @@ public abstract class AbstractCSQueue implements CSQueue {
// sorry, you cannot access
return false;
}
+
+ @Override
+ public Priority getDefaultApplicationPriority() {
+ // TODO add dummy implementation
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c39ca541/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 559dfc6..5a20f8b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
@@ -159,6 +160,9 @@ public class CapacityScheduler extends
new Comparator<FiCaSchedulerApp>() {
@Override
public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
+ if (!a1.getPriority().equals(a2.getPriority())) {
+ return a1.getPriority().compareTo(a2.getPriority());
+ }
return a1.getApplicationId().compareTo(a2.getApplicationId());
}
};
@@ -226,6 +230,7 @@ public class CapacityScheduler extends
private RMNodeLabelsManager labelManager;
private SchedulerHealth schedulerHealth = new SchedulerHealth();
long lastNodeUpdateTime;
+ private Priority maxClusterLevelAppPriority;
/**
* EXPERT
*/
@@ -326,6 +331,9 @@ public class CapacityScheduler extends
if (scheduleAsynchronously) {
asyncSchedulerThread = new AsyncScheduleThread(this);
}
+ maxClusterLevelAppPriority = Priority.newInstance(yarnConf.getInt(
+ YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY,
+ YarnConfiguration.DEFAULT_CLUSTER_LEVEL_APPLICATION_PRIORITY));
LOG.info("Initialized CapacityScheduler with " +
"calculator=" + getResourceCalculator().getClass() + ", " +
@@ -692,7 +700,7 @@ public class CapacityScheduler extends
}
private synchronized void addApplication(ApplicationId applicationId,
- String queueName, String user, boolean isAppRecovering) {
+ String queueName, String user, boolean isAppRecovering, Priority priority) {
if (mappings != null && mappings.size() > 0) {
try {
@@ -761,7 +769,7 @@ public class CapacityScheduler extends
// update the metrics
queue.getMetrics().submitApp(user);
SchedulerApplication<FiCaSchedulerApp> application =
- new SchedulerApplication<FiCaSchedulerApp>(queue, user);
+ new SchedulerApplication<FiCaSchedulerApp>(queue, user, priority);
applications.put(applicationId, application);
LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", in queue: " + queueName);
@@ -783,9 +791,9 @@ public class CapacityScheduler extends
applications.get(applicationAttemptId.getApplicationId());
CSQueue queue = (CSQueue) application.getQueue();
- FiCaSchedulerApp attempt =
- new FiCaSchedulerApp(applicationAttemptId, application.getUser(),
- queue, queue.getActiveUsersManager(), rmContext);
+ FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId,
+ application.getUser(), queue, queue.getActiveUsersManager(), rmContext,
+ application.getPriority());
if (transferStateFromPreviousAttempt) {
attempt.transferStateFromPreviousAttempt(application
.getCurrentAppAttempt());
@@ -1307,7 +1315,8 @@ public class CapacityScheduler extends
addApplication(appAddedEvent.getApplicationId(),
queueName,
appAddedEvent.getUser(),
- appAddedEvent.getIsAppRecovering());
+ appAddedEvent.getIsAppRecovering(),
+ appAddedEvent.getApplicatonPriority());
}
}
break;
@@ -1833,4 +1842,56 @@ public class CapacityScheduler extends
private synchronized void setLastNodeUpdateTime(long time) {
this.lastNodeUpdateTime = time;
}
+
+ @Override
+ public Priority checkAndGetApplicationPriority(Priority priorityFromContext,
+ String user, String queueName, ApplicationId applicationId)
+ throws YarnException {
+ Priority appPriority = null;
+
+ // ToDo: Verify against priority ACLs
+
+ // Verify the scenario where priority is null from submissionContext.
+ if (null == priorityFromContext) {
+ // Get the default priority for the Queue. If Queue is non-existent, then
+ // use default priority
+ priorityFromContext = getDefaultPriorityForQueue(queueName);
+
+ LOG.info("Application '" + applicationId
+ + "' is submitted without priority "
+ + "hence considering default queue/cluster priority:"
+ + priorityFromContext.getPriority());
+ }
+
+ // Verify whether submitted priority is lesser than max priority
+ // in the cluster. If it is out of found, defining a max cap.
+ if (priorityFromContext.compareTo(getMaxClusterLevelAppPriority()) < 0) {
+ priorityFromContext = Priority
+ .newInstance(getMaxClusterLevelAppPriority().getPriority());
+ }
+
+ appPriority = priorityFromContext;
+
+ LOG.info("Priority '" + appPriority.getPriority()
+ + "' is acceptable in queue :" + queueName + "for application:"
+ + applicationId + "for the user: " + user);
+
+ return appPriority;
+ }
+
+ private Priority getDefaultPriorityForQueue(String queueName) {
+ Queue queue = getQueue(queueName);
+ if (null == queue) {
+ // Return with default application priority
+ return Priority.newInstance(CapacitySchedulerConfiguration
+ .DEFAULT_CONFIGURATION_APPLICATION_PRIORITY);
+ }
+
+ return Priority.newInstance(queue.getDefaultApplicationPriority()
+ .getPriority());
+ }
+
+ public Priority getMaxClusterLevelAppPriority() {
+ return maxClusterLevelAppPriority;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c39ca541/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 563643c..be5e6dd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -207,6 +207,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public static final String QUEUE_PREEMPTION_DISABLED = "disable_preemption";
@Private
+ public static final String DEFAULT_APPLICATION_PRIORITY = "default-application-priority";
+
+ @Private
+ public static final Integer DEFAULT_CONFIGURATION_APPLICATION_PRIORITY = 0;
+
+ @Private
public static class QueueMapping {
public enum MappingType {
@@ -947,4 +953,11 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
return configuredNodeLabels;
}
+
+ public Integer getDefaultApplicationPriorityConfPerQueue(String queue) {
+ Integer defaultPriority = getInt(getQueuePrefix(queue)
+ + DEFAULT_APPLICATION_PRIORITY,
+ DEFAULT_CONFIGURATION_APPLICATION_PRIORITY);
+ return defaultPriority;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c39ca541/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 598f279..0ce4d68 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -95,9 +95,11 @@ public class LeafQueue extends AbstractCSQueue {
private int nodeLocalityDelay;
- Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap =
+ Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap =
new HashMap<ApplicationAttemptId, FiCaSchedulerApp>();
-
+
+ private Priority defaultAppPriorityPerQueue;
+
Set<FiCaSchedulerApp> pendingApplications;
private float minimumAllocationFactor;
@@ -220,6 +222,9 @@ public class LeafQueue extends AbstractCSQueue {
}
}
+ defaultAppPriorityPerQueue = Priority.newInstance(conf
+ .getDefaultApplicationPriorityConfPerQueue(getQueuePath()));
+
LOG.info("Initializing " + queueName + "\n" +
"capacity = " + queueCapacities.getCapacity() +
" [= (float) configuredCapacity / 100 ]" + "\n" +
@@ -265,7 +270,8 @@ public class LeafQueue extends AbstractCSQueue {
"nodeLocalityDelay = " + nodeLocalityDelay + "\n" +
"reservationsContinueLooking = " +
reservationsContinueLooking + "\n" +
- "preemptionDisabled = " + getPreemptionDisabled() + "\n");
+ "preemptionDisabled = " + getPreemptionDisabled() + "\n" +
+ "defaultAppPriorityPerQueue = " + defaultAppPriorityPerQueue);
}
@Override
@@ -2060,7 +2066,12 @@ public class LeafQueue extends AbstractCSQueue {
);
this.orderingPolicy = orderingPolicy;
}
-
+
+ @Override
+ public Priority getDefaultApplicationPriority() {
+ return defaultAppPriorityPerQueue;
+ }
+
/*
* Holds shared values used by all applications in
* the queue to calculate headroom on demand
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c39ca541/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 3085d93..dfeb30f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -72,6 +72,13 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext) {
+ this(applicationAttemptId, user, queue, activeUsersManager, rmContext,
+ Priority.newInstance(0));
+ }
+
+ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
+ String user, Queue queue, ActiveUsersManager activeUsersManager,
+ RMContext rmContext, Priority appPriority) {
super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
RMApp rmApp = rmContext.getRMApps().get(getApplicationId());
@@ -87,6 +94,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
}
setAMResource(amResource);
+ setPriority(appPriority);
}
synchronized public boolean containerCompleted(RMContainer rmContainer,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c39ca541/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
index a54e4bf..89d2f66 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
public class AppAddedSchedulerEvent extends SchedulerEvent {
@@ -28,25 +30,35 @@ public class AppAddedSchedulerEvent extends SchedulerEvent {
private final String user;
private final ReservationId reservationID;
private final boolean isAppRecovering;
+ private final Priority appPriority;
- public AppAddedSchedulerEvent(
- ApplicationId applicationId, String queue, String user) {
- this(applicationId, queue, user, false, null);
+ public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
+ String user) {
+ this(applicationId, queue, user, false, null, Priority.newInstance(0));
}
public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
- String user, ReservationId reservationID) {
- this(applicationId, queue, user, false, reservationID);
+ String user, ReservationId reservationID, Priority appPriority) {
+ this(applicationId, queue, user, false, reservationID, appPriority);
+ }
+
+ public AppAddedSchedulerEvent(String user,
+ ApplicationSubmissionContext submissionContext, boolean isAppRecovering) {
+ this(submissionContext.getApplicationId(), submissionContext.getQueue(),
+ user, isAppRecovering, submissionContext.getReservationID(),
+ submissionContext.getPriority());
}
public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
- String user, boolean isAppRecovering, ReservationId reservationID) {
+ String user, boolean isAppRecovering, ReservationId reservationID,
+ Priority appPriority) {
super(SchedulerEventType.APP_ADDED);
this.applicationId = applicationId;
this.queue = queue;
this.user = user;
this.reservationID = reservationID;
this.isAppRecovering = isAppRecovering;
+ this.appPriority = appPriority;
}
public ApplicationId getApplicationId() {
@@ -68,4 +80,8 @@ public class AppAddedSchedulerEvent extends SchedulerEvent {
public ReservationId getReservationID() {
return reservationID;
}
+
+ public Priority getApplicatonPriority() {
+ return appPriority;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c39ca541/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
index e488c76..713bdca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
@@ -331,6 +331,12 @@ public abstract class FSQueue implements Queue, Schedulable {
public void decPendingResource(String nodeLabel, Resource resourceToDec) {
}
+ @Override
+ public Priority getDefaultApplicationPriority() {
+ // TODO add implementation for FSParentQueue
+ return null;
+ }
+
public boolean fitsInMaxShare(Resource additionalResource) {
Resource usagePlusAddition =
Resources.add(getResourceUsage(), additionalResource);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c39ca541/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index e66c02c..6b77ceb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -210,6 +210,12 @@ public class FifoScheduler extends
@Override
public void decPendingResource(String nodeLabel, Resource resourceToDec) {
}
+
+ @Override
+ public Priority getDefaultApplicationPriority() {
+ // TODO add implementation for FIFO scheduler
+ return null;
+ }
};
public FifoScheduler() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c39ca541/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoComparator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoComparator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoComparator.java
index b92b264..1045386 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoComparator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoComparator.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.*;
+
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
@@ -29,9 +30,13 @@ public class FifoComparator
implements Comparator<SchedulableEntity> {
@Override
- public int compare(SchedulableEntity r1, SchedulableEntity r2) {
- int res = r1.compareInputOrderTo(r2);
- return res;
+ public int compare(SchedulableEntity r1, SchedulableEntity r2) {
+ if (r1.getPriority() != null
+ && !r1.getPriority().equals(r2.getPriority())) {
+ return r1.getPriority().compareTo(r2.getPriority());
}
+ int res = r1.compareInputOrderTo(r2);
+ return res;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c39ca541/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java
index 9b9d73d..2ccb1cd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java
@@ -48,4 +48,9 @@ public interface SchedulableEntity {
*/
public ResourceUsage getSchedulingResourceUsage();
+ /**
+ * Get the priority of the application
+ */
+ public Priority getPriority();
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c39ca541/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index d068a94..5080355 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -289,6 +290,15 @@ public class MockRM extends ResourceManager {
return submitApp(masterMemory, false);
}
+ public RMApp submitApp(int masterMemory, Priority priority) throws Exception {
+ Resource resource = Resource.newInstance(masterMemory, 0);
+ return submitApp(resource, "", UserGroupInformation.getCurrentUser()
+ .getShortUserName(), null, false, null,
+ super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true,
+ false, false, null, 0, null, true, priority);
+ }
+
public RMApp submitApp(int masterMemory, boolean unmanaged)
throws Exception {
return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser()
@@ -327,7 +337,7 @@ public class MockRM extends ResourceManager {
return submitApp(resource, name, user, acls, false, queue,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
- true, false, false, null, 0, null, true);
+ true, false, false, null, 0, null, true, null);
}
public RMApp submitApp(int masterMemory, String name, String user,
@@ -370,18 +380,19 @@ public class MockRM extends ResourceManager {
resource.setMemory(masterMemory);
return submitApp(resource, name, user, acls, unmanaged, queue,
maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
- false, null, 0, null, true);
+ false, null, 0, null, true, Priority.newInstance(0));
}
public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval)
throws Exception {
Resource resource = Records.newRecord(Resource.class);
resource.setMemory(masterMemory);
+ Priority priority = Priority.newInstance(0);
return submitApp(resource, "", UserGroupInformation.getCurrentUser()
.getShortUserName(), null, false, null,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
- false, null, attemptFailuresValidityInterval, null, true);
+ false, null, attemptFailuresValidityInterval, null, true, priority);
}
public RMApp submitApp(int masterMemory, String name, String user,
@@ -391,20 +402,22 @@ public class MockRM extends ResourceManager {
ApplicationId applicationId) throws Exception {
Resource resource = Records.newRecord(Resource.class);
resource.setMemory(masterMemory);
+ Priority priority = Priority.newInstance(0);
return submitApp(resource, name, user, acls, unmanaged, queue,
maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
- isAppIdProvided, applicationId, 0, null, true);
+ isAppIdProvided, applicationId, 0, null, true, priority);
}
public RMApp submitApp(int masterMemory,
LogAggregationContext logAggregationContext) throws Exception {
Resource resource = Records.newRecord(Resource.class);
resource.setMemory(masterMemory);
+ Priority priority = Priority.newInstance(0);
return submitApp(resource, "", UserGroupInformation.getCurrentUser()
.getShortUserName(), null, false, null,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
- false, null, 0, logAggregationContext, true);
+ false, null, 0, logAggregationContext, true, priority);
}
public RMApp submitApp(Resource capability, String name, String user,
@@ -412,7 +425,8 @@ public class MockRM extends ResourceManager {
int maxAppAttempts, Credentials ts, String appType,
boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
ApplicationId applicationId, long attemptFailuresValidityInterval,
- LogAggregationContext logAggregationContext, boolean cancelTokensWhenComplete)
+ LogAggregationContext logAggregationContext,
+ boolean cancelTokensWhenComplete, Priority priority)
throws Exception {
ApplicationId appId = isAppIdProvided ? applicationId : null;
ApplicationClientProtocol client = getClientRMService();
@@ -429,12 +443,15 @@ public class MockRM extends ResourceManager {
sub.setApplicationId(appId);
sub.setApplicationName(name);
sub.setMaxAppAttempts(maxAppAttempts);
- if(unmanaged) {
+ if (unmanaged) {
sub.setUnmanagedAM(true);
}
if (queue != null) {
sub.setQueue(queue);
}
+ if (priority != null) {
+ sub.setPriority(priority);
+ }
sub.setApplicationType(appType);
ContainerLaunchContext clc = Records
.newRecord(ContainerLaunchContext.class);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c39ca541/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
index 3db8b7c..f073763 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
@@ -219,6 +219,7 @@ public class TestAppManager{
rmContext = mockRMContext(1, now - 10);
ResourceScheduler scheduler = mockResourceScheduler();
+ ((RMContextImpl)rmContext).setScheduler(scheduler);
Configuration conf = new Configuration();
ApplicationMasterService masterService =
new ApplicationMasterService(rmContext, scheduler);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c39ca541/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
index 32743c9..b556335 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
@@ -1056,7 +1056,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
RMApp app0 = rm1.submitApp(resource, "", UserGroupInformation
.getCurrentUser().getShortUserName(), null, false, null,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true,
- false, null, 0, null, true);
+ false, null, 0, null, true, null);
MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
am0.allocate("127.0.0.1", 1000, 2, new ArrayList<ContainerId>());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c39ca541/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
index b3ac79b..d6f64bf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
@@ -1025,6 +1025,7 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
when(app.getReservedContainers()).thenReturn(reservedContainers);
when(app.getApplicationAttemptId()).thenReturn(appAttemptId);
when(app.getApplicationId()).thenReturn(appId);
+ when(app.getPriority()).thenReturn(Priority.newInstance(0));
// add to LeafQueue
LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c39ca541/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index 484090d..1afebb6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -155,7 +155,8 @@ public class TestApplicationLimits {
doReturn(applicationAttemptId). when(application).getApplicationAttemptId();
doReturn(user).when(application).getUser();
doReturn(amResource).when(application).getAMResource();
- when(application.compareInputOrderTo(any(FiCaSchedulerApp.class))).thenCallRealMethod();
+ doReturn(Priority.newInstance(0)).when(application).getPriority();
+ when(application.compareInputOrderTo(any(FiCaSchedulerApp.class))).thenCallRealMethod();
return application;
}
@@ -175,7 +176,7 @@ public class TestApplicationLimits {
ActiveUsersManager activeUsersManager = mock(ActiveUsersManager.class);
when(queue.getActiveUsersManager()).thenReturn(activeUsersManager);
-
+
assertEquals(Resource.newInstance(8 * GB, 1), queue.getAMResourceLimit());
assertEquals(Resource.newInstance(4 * GB, 1),
queue.getUserAMResourceLimit());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c39ca541/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
new file mode 100644
index 0000000..80eff06
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationIdNotProvidedException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestApplicationPriority {
+ private static final Log LOG = LogFactory
+ .getLog(TestApplicationPriority.class);
+ private final int GB = 1024;
+
+ private YarnConfiguration conf;
+
+ @Before
+ public void setUp() throws Exception {
+ conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ }
+
+ @Test
+ public void testApplicationOrderingWithPriority() throws Exception {
+
+ Configuration conf = new Configuration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ MockRM rm = new MockRM(conf);
+ rm.start();
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+ LeafQueue q = (LeafQueue) cs.getQueue("default");
+ Assert.assertNotNull(q);
+
+ String host = "127.0.0.1";
+ RMNode node = MockNodes.newNodeInfo(0, MockNodes.newResource(16 * GB), 1,
+ host);
+ cs.handle(new NodeAddedSchedulerEvent(node));
+
+ // add app 1 start
+ ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1);
+ ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
+ appId1, 1);
+
+ RMAppAttemptMetrics attemptMetric1 = new RMAppAttemptMetrics(appAttemptId1,
+ rm.getRMContext());
+ RMAppImpl app1 = mock(RMAppImpl.class);
+ when(app1.getApplicationId()).thenReturn(appId1);
+ RMAppAttemptImpl attempt1 = mock(RMAppAttemptImpl.class);
+ when(attempt1.getAppAttemptId()).thenReturn(appAttemptId1);
+ when(attempt1.getRMAppAttemptMetrics()).thenReturn(attemptMetric1);
+ when(app1.getCurrentAppAttempt()).thenReturn(attempt1);
+
+ rm.getRMContext().getRMApps().put(appId1, app1);
+
+ SchedulerEvent addAppEvent1 = new AppAddedSchedulerEvent(appId1, "default",
+ "user", null, Priority.newInstance(5));
+ cs.handle(addAppEvent1);
+ SchedulerEvent addAttemptEvent1 = new AppAttemptAddedSchedulerEvent(
+ appAttemptId1, false);
+ cs.handle(addAttemptEvent1);
+ // add app1 end
+
+ // add app2 begin
+ ApplicationId appId2 = BuilderUtils.newApplicationId(100, 2);
+ ApplicationAttemptId appAttemptId2 = BuilderUtils.newApplicationAttemptId(
+ appId2, 1);
+
+ RMAppAttemptMetrics attemptMetric2 = new RMAppAttemptMetrics(appAttemptId2,
+ rm.getRMContext());
+ RMAppImpl app2 = mock(RMAppImpl.class);
+ when(app2.getApplicationId()).thenReturn(appId2);
+ RMAppAttemptImpl attempt2 = mock(RMAppAttemptImpl.class);
+ when(attempt2.getAppAttemptId()).thenReturn(appAttemptId2);
+ when(attempt2.getRMAppAttemptMetrics()).thenReturn(attemptMetric2);
+ when(app2.getCurrentAppAttempt()).thenReturn(attempt2);
+
+ rm.getRMContext().getRMApps().put(appId2, app2);
+
+ SchedulerEvent addAppEvent2 = new AppAddedSchedulerEvent(appId2, "default",
+ "user", null, Priority.newInstance(8));
+ cs.handle(addAppEvent2);
+ SchedulerEvent addAttemptEvent2 = new AppAttemptAddedSchedulerEvent(
+ appAttemptId2, false);
+ cs.handle(addAttemptEvent2);
+ // add app end
+
+ // Now, the first assignment will be for app2 since app2 is of highest
+ // priority
+ assertEquals(q.getApplications().size(), 2);
+ assertEquals(q.getApplications().iterator().next()
+ .getApplicationAttemptId(), appAttemptId2);
+
+ rm.stop();
+ }
+
+ @Test
+ public void testApplicationPriorityAllocation() throws Exception {
+
+ Configuration conf = new Configuration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ // Set Max Application Priority as 10
+ conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
+ MockRM rm = new MockRM(conf);
+ rm.start();
+
+ Priority appPriority1 = Priority.newInstance(5);
+ MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * GB);
+ RMApp app1 = rm.submitApp(1 * GB, appPriority1);
+
+ // kick the scheduler, 1 GB given to AM1, remaining 15GB on nm1
+ MockAM am1 = MockRM.launchAM(app1, rm, nm1);
+ am1.registerAppAttempt();
+
+ // add request for containers
+ am1.addRequests(new String[]{"127.0.0.1", "127.0.0.2"}, 2 * GB, 1, 7);
+ AllocateResponse alloc1Response = am1.schedule(); // send the request
+
+ // kick the scheduler, 7 containers will be allocated for App1
+ nm1.nodeHeartbeat(true);
+ while (alloc1Response.getAllocatedContainers().size() < 1) {
+ LOG.info("Waiting for containers to be created for app 1...");
+ Thread.sleep(100);
+ alloc1Response = am1.schedule();
+ }
+
+ List<Container> allocated1 = alloc1Response.getAllocatedContainers();
+ Assert.assertEquals(7, allocated1.size());
+ Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemory());
+
+ // check node report, 15 GB used (1 AM and 7 containers) and 1 GB available
+ SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport(
+ nm1.getNodeId());
+ Assert.assertEquals(15 * GB, report_nm1.getUsedResource().getMemory());
+ Assert.assertEquals(1 * GB, report_nm1.getAvailableResource().getMemory());
+
+ // Submit the second app App2 with priority 8 (Higher than App1)
+ Priority appPriority2 = Priority.newInstance(8);
+ RMApp app2 = rm.submitApp(1 * GB, appPriority2);
+
+ // kick the scheduler, 1 GB which was free is given to AM of App2
+ nm1.nodeHeartbeat(true);
+ MockAM am2 = rm.sendAMLaunched(app2.getCurrentAppAttempt()
+ .getAppAttemptId());
+ am2.registerAppAttempt();
+
+ // check node report, 16 GB used and 0 GB available
+ report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
+ Assert.assertEquals(16 * GB, report_nm1.getUsedResource().getMemory());
+ Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemory());
+
+ // get scheduler
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+ // get scheduler app
+ FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications()
+ .get(app1.getApplicationId()).getCurrentAppAttempt();
+
+ // kill 2 containers to free up some space
+ int counter = 0;
+ for (Container c : allocated1) {
+ if (++counter > 2) {
+ break;
+ }
+ cs.killContainer(schedulerAppAttempt.getRMContainer(c.getId()));
+ }
+
+ // check node report, 12 GB used and 4 GB available
+ report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
+ Assert.assertEquals(12 * GB, report_nm1.getUsedResource().getMemory());
+ Assert.assertEquals(4 * GB, report_nm1.getAvailableResource().getMemory());
+
+ // add request for containers App1
+ am1.addRequests(new String[]{"127.0.0.1", "127.0.0.2"}, 2 * GB, 1, 10);
+ am1.schedule(); // send the request for App1
+
+ // add request for containers App2
+ am2.addRequests(new String[]{"127.0.0.1", "127.0.0.2"}, 2 * GB, 1, 3);
+ AllocateResponse alloc1Response4 = am2.schedule(); // send the request
+
+ // kick the scheduler, since App2 priority is more than App1, it will get
+ // remaining cluster space.
+ nm1.nodeHeartbeat(true);
+ while (alloc1Response4.getAllocatedContainers().size() < 1) {
+ LOG.info("Waiting for containers to be created for app 2...");
+ Thread.sleep(100);
+ alloc1Response4 = am2.schedule();
+ }
+
+ // check node report, 16 GB used and 0 GB available
+ report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
+ Assert.assertEquals(16 * GB, report_nm1.getUsedResource().getMemory());
+ Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemory());
+
+ rm.stop();
+ }
+
+ @Test
+ public void testPriorityWithPendingApplications() throws Exception {
+
+ Configuration conf = new Configuration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ // Set Max Application Priority as 10
+ conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
+ MockRM rm = new MockRM(conf);
+ rm.start();
+
+ Priority appPriority1 = Priority.newInstance(5);
+ MockNM nm1 = rm.registerNode("127.0.0.1:1234", 8 * GB);
+ RMApp app1 = rm.submitApp(1 * GB, appPriority1);
+
+ // kick the scheduler, 1 GB given to AM1, remaining 7GB on nm1
+ MockAM am1 = MockRM.launchAM(app1, rm, nm1);
+ am1.registerAppAttempt();
+
+ // add request for containers
+ am1.addRequests(new String[]{"127.0.0.1", "127.0.0.2"}, 1 * GB, 1, 7);
+ AllocateResponse alloc1Response = am1.schedule(); // send the request
+
+ // kick the scheduler, 7 containers will be allocated for App1
+ nm1.nodeHeartbeat(true);
+ while (alloc1Response.getAllocatedContainers().size() < 1) {
+ LOG.info("Waiting for containers to be created for app 1...");
+ Thread.sleep(100);
+ alloc1Response = am1.schedule();
+ }
+
+ List<Container> allocated1 = alloc1Response.getAllocatedContainers();
+ Assert.assertEquals(7, allocated1.size());
+ Assert.assertEquals(1 * GB, allocated1.get(0).getResource().getMemory());
+
+ // check node report, 8 GB used (1 AM and 7 containers) and 0 GB available
+ SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport(
+ nm1.getNodeId());
+ Assert.assertEquals(8 * GB, report_nm1.getUsedResource().getMemory());
+ Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemory());
+
+ // Submit the second app App2 with priority 7
+ Priority appPriority2 = Priority.newInstance(7);
+ RMApp app2 = rm.submitApp(1 * GB, appPriority2);
+
+ // Submit the third app App3 with priority 8
+ Priority appPriority3 = Priority.newInstance(8);
+ RMApp app3 = rm.submitApp(1 * GB, appPriority3);
+
+ // Submit the second app App4 with priority 6
+ Priority appPriority4 = Priority.newInstance(6);
+ RMApp app4 = rm.submitApp(1 * GB, appPriority4);
+
+ // Only one app can run as AM resource limit restricts it. Kill app1,
+ // If app3 (highest priority among rest) gets active, it indicates that
+ // priority is working with pendingApplications.
+ rm.killApp(app1.getApplicationId());
+
+ // kick the scheduler, app3 (high among pending) gets free space
+ nm1.nodeHeartbeat(true);
+ MockAM am3 = rm.sendAMLaunched(app3.getCurrentAppAttempt()
+ .getAppAttemptId());
+ am3.registerAppAttempt();
+
+ // check node report, 1 GB used and 7 GB available
+ report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
+ Assert.assertEquals(1 * GB, report_nm1.getUsedResource().getMemory());
+ Assert.assertEquals(7 * GB, report_nm1.getAvailableResource().getMemory());
+
+ rm.stop();
+ }
+
+ @Test
+ public void testMaxPriorityValidation() throws Exception {
+
+ Configuration conf = new Configuration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ // Set Max Application Priority as 10
+ conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
+ Priority maxPriority = Priority.newInstance(10);
+ MockRM rm = new MockRM(conf);
+ rm.start();
+
+ Priority appPriority1 = Priority.newInstance(15);
+ rm.registerNode("127.0.0.1:1234", 8 * GB);
+ RMApp app1 = rm.submitApp(1 * GB, appPriority1);
+
+ // Application submission should be successful and verify priority
+ Assert.assertEquals(app1.getApplicationSubmissionContext().getPriority(),
+ maxPriority);
+ rm.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c39ca541/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index e8afab2..a8bbac3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -129,6 +129,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedule
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
@@ -898,13 +899,17 @@ public class TestCapacityScheduler {
ApplicationId id1 = ApplicationId.newInstance(1, 1);
ApplicationId id2 = ApplicationId.newInstance(1, 2);
ApplicationId id3 = ApplicationId.newInstance(2, 1);
+ Priority priority = Priority.newInstance(0);
//same clusterId
FiCaSchedulerApp app1 = Mockito.mock(FiCaSchedulerApp.class);
when(app1.getApplicationId()).thenReturn(id1);
+ when(app1.getPriority()).thenReturn(priority);
FiCaSchedulerApp app2 = Mockito.mock(FiCaSchedulerApp.class);
when(app2.getApplicationId()).thenReturn(id2);
+ when(app2.getPriority()).thenReturn(priority);
FiCaSchedulerApp app3 = Mockito.mock(FiCaSchedulerApp.class);
when(app3.getApplicationId()).thenReturn(id3);
+ when(app3.getPriority()).thenReturn(priority);
assertTrue(appComparator.compare(app1, app2) < 0);
//different clusterId
assertTrue(appComparator.compare(app1, app3) < 0);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c39ca541/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java
index fe8c455..bf4c98a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java
@@ -31,7 +31,8 @@ public class MockSchedulableEntity implements SchedulableEntity {
private String id;
private long serial = 0;
-
+ private Priority priority;
+
public MockSchedulableEntity() { }
public void setId(String id) {
@@ -74,5 +75,13 @@ public class MockSchedulableEntity implements SchedulableEntity {
}
return 1;//let other types go before this, if any
}
-
+
+ @Override
+ public Priority getPriority() {
+ return priority;
+ }
+
+ public void setApplicationPriority(Priority priority) {
+ this.priority = priority;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c39ca541/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
index 49c7bf9..d85e928 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
@@ -1048,13 +1048,13 @@ public class TestDelegationTokenRenewer {
Resource resource = Records.newRecord(Resource.class);
resource.setMemory(200);
RMApp app1 = rm.submitApp(resource, "name", "user", null, false, null, 2,
- credentials, null, true, false, false, null, 0, null, false);
+ credentials, null, true, false, false, null, 0, null, false, null);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
rm.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
// submit app2 with the same token, set cancelTokenWhenComplete to true;
RMApp app2 = rm.submitApp(resource, "name", "user", null, false, null, 2,
- credentials, null, true, false, false, null, 0, null, true);
+ credentials, null, true, false, false, null, 0, null, true, null);
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
rm.waitForState(app2.getApplicationId(), RMAppState.RUNNING);
MockRM.finishAMAndVerifyAppState(app2, rm, nm1, am2);
@@ -1114,7 +1114,7 @@ public class TestDelegationTokenRenewer {
resource.setMemory(200);
RMApp app1 =
rm.submitApp(resource, "name", "user", null, false, null, 2, credentials,
- null, true, false, false, null, 0, null, true);
+ null, true, false, false, null, 0, null, true, null);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
rm.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
@@ -1122,7 +1122,7 @@ public class TestDelegationTokenRenewer {
Assert.assertNotNull(dttr);
Assert.assertTrue(dttr.referringAppIds.contains(app1.getApplicationId()));
RMApp app2 = rm.submitApp(resource, "name", "user", null, false, null, 2,
- credentials, null, true, false, false, null, 0, null, true);
+ credentials, null, true, false, false, null, 0, null, true, null);
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
rm.waitForState(app2.getApplicationId(), RMAppState.RUNNING);
Assert.assertTrue(renewer.getAllTokens().containsKey(token1));
@@ -1139,7 +1139,7 @@ public class TestDelegationTokenRenewer {
Assert.assertFalse(Renewer.cancelled);
RMApp app3 = rm.submitApp(resource, "name", "user", null, false, null, 2,
- credentials, null, true, false, false, null, 0, null, true);
+ credentials, null, true, false, false, null, 0, null, true, null);
MockAM am3 = MockRM.launchAndRegisterAM(app3, rm, nm1);
rm.waitForState(app3.getApplicationId(), RMAppState.RUNNING);
Assert.assertTrue(renewer.getAllTokens().containsKey(token1));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c39ca541/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesAppsModification.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesAppsModification.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesAppsModification.java
index 8e5e601..de4d116 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesAppsModification.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesAppsModification.java
@@ -759,10 +759,10 @@ public class TestRMWebServicesAppsModification extends JerseyTestBase {
ApplicationSubmissionContextInfo appInfo = new ApplicationSubmissionContextInfo();
appInfo.setApplicationId(appId);
appInfo.setApplicationName(appName);
- appInfo.setPriority(3);
appInfo.setMaxAppAttempts(2);
appInfo.setQueue(queueName);
appInfo.setApplicationType(appType);
+ appInfo.setPriority(0);
HashMap<String, LocalResourceInfo> lr = new HashMap<>();
LocalResourceInfo y = new LocalResourceInfo();
y.setUrl(new URI("http://www.test.com/file.txt"));