You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yu...@apache.org on 2019/03/26 05:49:32 UTC
[hadoop] branch trunk updated: YARN-8967. Change FairScheduler to
use PlacementRule interface. Contributed by Wilfred Spiegelenburg.
This is an automated email from the ASF dual-hosted git repository.
yufei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5257f50 YARN-8967. Change FairScheduler to use PlacementRule interface. Contributed by Wilfred Spiegelenburg.
5257f50 is described below
commit 5257f50abb71905ef3068fd45541d00ce9e8f355
Author: yufei <yu...@apache.org>
AuthorDate: Mon Mar 25 22:47:24 2019 -0700
YARN-8967. Change FairScheduler to use PlacementRule interface. Contributed by Wilfred Spiegelenburg.
---
.../yarn/server/resourcemanager/RMAppManager.java | 117 +++--
.../resourcemanager/placement/FSPlacementRule.java | 19 +-
.../placement/QueuePlacementRuleUtils.java | 2 +-
.../scheduler/fair/AllocationConfiguration.java | 30 +-
.../fair/AllocationFileLoaderService.java | 28 +-
.../scheduler/fair/FairScheduler.java | 144 ++----
.../scheduler/fair/QueueManager.java | 15 +-
.../scheduler/fair/QueuePlacementPolicy.java | 344 ++++++++++----
.../scheduler/fair/QueuePlacementRule.java | 366 ---------------
.../scheduler/fair/allocation/QueueProperties.java | 3 +-
.../TestAppManagerWithFairScheduler.java | 289 ++++++++----
.../resourcemanager/TestApplicationACLs.java | 2 +-
.../reservation/TestSchedulerPlanFollowerBase.java | 12 +-
.../scheduler/TestSchedulerUtils.java | 10 +-
.../scheduler/fair/FairSchedulerTestBase.java | 21 +-
.../fair/TestAllocationFileLoaderService.java | 152 +++---
.../scheduler/fair/TestAppRunnability.java | 19 +-
.../scheduler/fair/TestContinuousScheduling.java | 30 +-
.../scheduler/fair/TestFSAppAttempt.java | 5 +-
.../scheduler/fair/TestFSParentQueue.java | 20 +-
.../scheduler/fair/TestFairScheduler.java | 295 ++++--------
.../TestFairSchedulerWithMultiResourceTypes.java | 9 +
.../scheduler/fair/TestMaxRunningAppsEnforcer.java | 25 +-
.../scheduler/fair/TestQueueManager.java | 45 +-
.../scheduler/fair/TestQueuePlacementPolicy.java | 515 +++++++++++++++------
.../scheduler/fair/TestSchedulingPolicy.java | 9 +
.../webapp/dao/TestFairSchedulerQueueInfo.java | 24 +-
27 files changed, 1336 insertions(+), 1214 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index b92c7f6..6e6c8d7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -68,6 +68,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Times;
@@ -418,7 +420,8 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
// We only replace the queue when it's a new application
if (!isRecovery) {
- replaceQueueFromPlacementContext(placementContext, submissionContext);
+ copyPlacementQueueToSubmissionContext(placementContext,
+ submissionContext);
// fail the submission if configured application timeout value is invalid
RMServerUtils.validateApplicationTimeouts(
@@ -443,38 +446,60 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
submissionContext.setPriority(appPriority);
}
- // Since FairScheduler queue mapping is done inside scheduler,
- // if FairScheduler is used and the queue doesn't exist, we should not
- // fail here because queue will be created inside FS. Ideally, FS queue
- // mapping should be done outside scheduler too like CS.
- // For now, exclude FS for the acl check.
- if (!isRecovery && YarnConfiguration.isAclEnabled(conf)
- && scheduler instanceof CapacityScheduler) {
- String queueName = submissionContext.getQueue();
- String appName = submissionContext.getApplicationName();
- CSQueue csqueue = ((CapacityScheduler) scheduler).getQueue(queueName);
-
- if (csqueue == null && placementContext != null) {
- //could be an auto created queue through queue mapping. Validate
- // parent queue exists and has valid acls
- String parentQueueName = placementContext.getParentQueue();
- csqueue = ((CapacityScheduler) scheduler).getQueue(parentQueueName);
- }
+ if (!isRecovery && YarnConfiguration.isAclEnabled(conf)) {
+ if (scheduler instanceof CapacityScheduler) {
+ String queueName = submissionContext.getQueue();
+ String appName = submissionContext.getApplicationName();
+ CSQueue csqueue = ((CapacityScheduler) scheduler).getQueue(queueName);
+
+ if (csqueue == null && placementContext != null) {
+ //could be an auto created queue through queue mapping. Validate
+ // parent queue exists and has valid acls
+ String parentQueueName = placementContext.getParentQueue();
+ csqueue = ((CapacityScheduler) scheduler).getQueue(parentQueueName);
+ }
- if (csqueue != null
- && !authorizer.checkPermission(
- new AccessRequest(csqueue.getPrivilegedEntity(), userUgi,
- SchedulerUtils.toAccessType(QueueACL.SUBMIT_APPLICATIONS),
- applicationId.toString(), appName, Server.getRemoteAddress(),
- null))
- && !authorizer.checkPermission(
- new AccessRequest(csqueue.getPrivilegedEntity(), userUgi,
- SchedulerUtils.toAccessType(QueueACL.ADMINISTER_QUEUE),
- applicationId.toString(), appName, Server.getRemoteAddress(),
- null))) {
- throw RPCUtil.getRemoteException(new AccessControlException(
- "User " + user + " does not have permission to submit "
- + applicationId + " to queue " + submissionContext.getQueue()));
+ if (csqueue != null
+ && !authorizer.checkPermission(
+ new AccessRequest(csqueue.getPrivilegedEntity(), userUgi,
+ SchedulerUtils.toAccessType(QueueACL.SUBMIT_APPLICATIONS),
+ applicationId.toString(), appName, Server.getRemoteAddress(),
+ null))
+ && !authorizer.checkPermission(
+ new AccessRequest(csqueue.getPrivilegedEntity(), userUgi,
+ SchedulerUtils.toAccessType(QueueACL.ADMINISTER_QUEUE),
+ applicationId.toString(), appName, Server.getRemoteAddress(),
+ null))) {
+ throw RPCUtil.getRemoteException(new AccessControlException(
+ "User " + user + " does not have permission to submit "
+ + applicationId + " to queue "
+ + submissionContext.getQueue()));
+ }
+ }
+ if (scheduler instanceof FairScheduler) {
+ // if we have not placed the app just skip this, the submit will be
+ // rejected in the scheduler.
+ if (placementContext != null) {
+ // The queue might not be created yet. Walk up the tree to check the
+ // parent ACL. The queueName is assured root which always exists
+ String queueName = submissionContext.getQueue();
+ FSQueue queue = ((FairScheduler) scheduler).getQueueManager().
+ getQueue(queueName);
+ while (queue == null) {
+ int sepIndex = queueName.lastIndexOf(".");
+ queueName = queueName.substring(0, sepIndex);
+ queue = ((FairScheduler) scheduler).getQueueManager().
+ getQueue(queueName);
+ }
+ if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi) &&
+ !queue.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
+ throw RPCUtil.getRemoteException(new AccessControlException(
+ "User " + user + " does not have permission to submit " +
+ applicationId + " to queue " +
+ submissionContext.getQueue() +
+ " denied by ACL for queue " + queueName));
+ }
+ }
}
}
@@ -841,34 +866,38 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
// Placement could also fail if the user doesn't exist in system
// skip if the user is not found during recovery.
if (isRecovery) {
- LOG.warn("PlaceApplication failed,skipping on recovery of rm");
+ LOG.warn("Application placement failed for user " + user +
+ " and application " + context.getApplicationId() +
+ ", skipping placement on recovery of rm", e);
return placementContext;
}
throw e;
}
}
- if (placementContext == null && (context.getQueue() == null) || context
- .getQueue().isEmpty()) {
+ // The submission context when created often has a queue set. In case of
+ // the FairScheduler a null placement context is still considered as a
+ // failure, even when a queue is provided on submit. This case handled in
+ // the scheduler.
+ if (placementContext == null && (context.getQueue() == null) ||
+ context.getQueue().isEmpty()) {
String msg = "Failed to place application " + context.getApplicationId()
- + " to queue and specified " + "queue is invalid : " + context
- .getQueue();
+ + " in a queue and submit context queue is null or empty";
LOG.error(msg);
throw new YarnException(msg);
}
return placementContext;
}
- void replaceQueueFromPlacementContext(
+ private void copyPlacementQueueToSubmissionContext(
ApplicationPlacementContext placementContext,
ApplicationSubmissionContext context) {
- // Set it to ApplicationSubmissionContext
- //apply queue mapping only to new application submissions
+ // Set the queue from the placement in the ApplicationSubmissionContext
+ // Placement rule are only considered for new applications
if (placementContext != null && !StringUtils.equalsIgnoreCase(
context.getQueue(), placementContext.getQueue())) {
- LOG.info("Placed application=" + context.getApplicationId() +
- " to queue=" + placementContext.getQueue() + ", original queue="
- + context
- .getQueue());
+ LOG.info("Placed application with ID " + context.getApplicationId() +
+ " in queue: " + placementContext.getQueue() +
+ ", original submission queue was: " + context.getQueue());
context.setQueue(placementContext.getQueue());
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/FSPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/FSPlacementRule.java
index 7e9e6ef..7ceb374 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/FSPlacementRule.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/FSPlacementRule.java
@@ -58,10 +58,12 @@ public abstract class FSPlacementRule extends PlacementRule {
}
/**
- * Set a rule to generate the parent queue dynamically.
+ * Set a rule to generate the parent queue dynamically. The parent rule
+ * should only be called on rule creation when the policy is read from the
+ * configuration.
* @param parent A PlacementRule
*/
- void setParentRule(PlacementRule parent) {
+ public void setParentRule(PlacementRule parent) {
this.parentRule = parent;
}
@@ -69,7 +71,8 @@ public abstract class FSPlacementRule extends PlacementRule {
* Get the rule that is set to generate the parent queue dynamically.
* @return The rule set or <code>null</code> if not set.
*/
- PlacementRule getParentRule() {
+ @VisibleForTesting
+ public PlacementRule getParentRule() {
return parentRule;
}
@@ -150,6 +153,14 @@ public abstract class FSPlacementRule extends PlacementRule {
}
/**
+ * Get the create flag as set during the config setup.
+ * @return The value of the {@link #createQueue} flag
+ */
+ public boolean getCreateFlag() {
+ return createQueue;
+ }
+
+ /**
* Get the create flag from the xml configuration element.
* @param conf The FS configuration element for the queue
* @return <code>false</code> only if the flag is set in the configuration to
@@ -159,7 +170,7 @@ public abstract class FSPlacementRule extends PlacementRule {
boolean getCreateFlag(Element conf) {
if (conf != null) {
String create = conf.getAttribute("create");
- return Boolean.parseBoolean(create);
+ return create.isEmpty() || Boolean.parseBoolean(create);
}
return true;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/QueuePlacementRuleUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/QueuePlacementRuleUtils.java
index 0b5fe2e..adee5d7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/QueuePlacementRuleUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/QueuePlacementRuleUtils.java
@@ -28,7 +28,7 @@ import java.io.IOException;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
/**
- * Utility class for QueuePlacementRule.
+ * Utility class for Capacity Scheduler queue PlacementRules.
*/
public final class QueuePlacementRuleUtils {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
index 826d9f5..7f06521 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
@@ -23,7 +23,6 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ReservationACL;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -94,10 +93,6 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
//Map for maximum container resource allocation per queues by queue name
private final Map<String, Resource> queueMaxContainerAllocationMap;
- // Policy for mapping apps to queues
- @VisibleForTesting
- QueuePlacementPolicy placementPolicy;
-
//Configured queues in the alloc xml
@VisibleForTesting
Map<FSQueueType, Set<String>> configuredQueues;
@@ -107,9 +102,16 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
private final Set<String> nonPreemptableQueues;
+ /**
+ * Create a fully initialised configuration for the scheduler.
+ * @param queueProperties The list of queues and their properties from the
+ * configuration.
+ * @param allocationFileParser The allocation file parser
+ * @param globalReservationQueueConfig The reservation queue config
+ * @throws AllocationConfigurationException
+ */
public AllocationConfiguration(QueueProperties queueProperties,
AllocationFileParser allocationFileParser,
- QueuePlacementPolicy newPlacementPolicy,
ReservationQueueConfiguration globalReservationQueueConfig)
throws AllocationConfigurationException {
this.minQueueResources = queueProperties.getMinQueueResources();
@@ -138,14 +140,19 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
this.resAcls = queueProperties.getReservationAcls();
this.reservableQueues = queueProperties.getReservableQueues();
this.globalReservationQueueConfig = globalReservationQueueConfig;
- this.placementPolicy = newPlacementPolicy;
this.configuredQueues = queueProperties.getConfiguredQueues();
this.nonPreemptableQueues = queueProperties.getNonPreemptableQueues();
this.queueMaxContainerAllocationMap =
queueProperties.getMaxContainerAllocation();
}
- public AllocationConfiguration(Configuration conf) {
+ /**
+ * Create a base scheduler configuration with just the defaults set.
+ * Should only be called to init a basic setup on scheduler init.
+ * @param scheduler The {@link FairScheduler} to create and initialise the
+ * placement policy.
+ */
+ public AllocationConfiguration(FairScheduler scheduler) {
minQueueResources = new HashMap<>();
maxChildQueueResources = new HashMap<>();
maxQueueResources = new HashMap<>();
@@ -169,8 +176,7 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
for (FSQueueType queueType : FSQueueType.values()) {
configuredQueues.put(queueType, new HashSet<>());
}
- placementPolicy =
- QueuePlacementPolicy.fromConfiguration(conf, configuredQueues);
+ QueuePlacementPolicy.fromConfiguration(scheduler);
nonPreemptableQueues = new HashSet<>();
queueMaxContainerAllocationMap = new HashMap<>();
}
@@ -309,10 +315,6 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
return configuredQueues;
}
- public QueuePlacementPolicy getPlacementPolicy() {
- return placementPolicy;
- }
-
@Override
public boolean isReservable(String queue) {
return reservableQueues.contains(queue);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
index b92bd5d..829188d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
@@ -78,6 +78,7 @@ public class AllocationFileLoaderService extends AbstractService {
"(?i)(hdfs)|(file)|(s3a)|(viewfs)";
private final Clock clock;
+ private final FairScheduler scheduler;
// Last time we successfully reloaded queues
private volatile long lastSuccessfulReload;
@@ -95,14 +96,15 @@ public class AllocationFileLoaderService extends AbstractService {
private Thread reloadThread;
private volatile boolean running = true;
- public AllocationFileLoaderService() {
- this(SystemClock.getInstance());
+ AllocationFileLoaderService(FairScheduler scheduler) {
+ this(SystemClock.getInstance(), scheduler);
}
private List<Permission> defaultPermissions;
- public AllocationFileLoaderService(Clock clock) {
+ AllocationFileLoaderService(Clock clock, FairScheduler scheduler) {
super(AllocationFileLoaderService.class.getName());
+ this.scheduler = scheduler;
this.clock = clock;
}
@@ -254,17 +256,15 @@ public class AllocationFileLoaderService extends AbstractService {
new AllocationFileQueueParser(allocationFileParser.getQueueElements());
QueueProperties queueProperties = queueParser.parse();
- // Load placement policy and pass it configured queues
- Configuration conf = getConfig();
- QueuePlacementPolicy newPlacementPolicy =
- getQueuePlacementPolicy(allocationFileParser, queueProperties, conf);
+ // Load placement policy
+ getQueuePlacementPolicy(allocationFileParser);
setupRootQueueProperties(allocationFileParser, queueProperties);
ReservationQueueConfiguration globalReservationQueueConfig =
createReservationQueueConfig(allocationFileParser);
AllocationConfiguration info = new AllocationConfiguration(queueProperties,
- allocationFileParser, newPlacementPolicy, globalReservationQueueConfig);
+ allocationFileParser, globalReservationQueueConfig);
lastSuccessfulReload = clock.getTime();
lastReloadAttemptFailed = false;
@@ -272,17 +272,15 @@ public class AllocationFileLoaderService extends AbstractService {
reloadListener.onReload(info);
}
- private QueuePlacementPolicy getQueuePlacementPolicy(
- AllocationFileParser allocationFileParser,
- QueueProperties queueProperties, Configuration conf)
+ private void getQueuePlacementPolicy(
+ AllocationFileParser allocationFileParser)
throws AllocationConfigurationException {
if (allocationFileParser.getQueuePlacementPolicy().isPresent()) {
- return QueuePlacementPolicy.fromXml(
+ QueuePlacementPolicy.fromXml(
allocationFileParser.getQueuePlacementPolicy().get(),
- queueProperties.getConfiguredQueues(), conf);
+ scheduler);
} else {
- return QueuePlacementPolicy.fromConfiguration(conf,
- queueProperties.getConfiguredQueues());
+ QueuePlacementPolicy.fromConfiguration(scheduler);
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 8324f8d..166bb48 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -214,7 +215,7 @@ public class FairScheduler extends
public FairScheduler() {
super(FairScheduler.class.getName());
context = new FSContext(this);
- allocsLoader = new AllocationFileLoaderService();
+ allocsLoader = new AllocationFileLoaderService(this);
queueMgr = new QueueManager(this);
maxRunningEnforcer = new MaxRunningAppsEnforcer(this);
}
@@ -223,6 +224,10 @@ public class FairScheduler extends
return context;
}
+ public RMContext getRMContext() {
+ return rmContext;
+ }
+
public boolean isAtLeastReservationThreshold(
ResourceCalculator resourceCalculator, Resource resource) {
return Resources.greaterThanOrEqual(resourceCalculator,
@@ -455,32 +460,52 @@ public class FairScheduler extends
* configured limits, but the app will not be marked as runnable.
*/
protected void addApplication(ApplicationId applicationId,
- String queueName, String user, boolean isAppRecovering) {
- if (queueName == null || queueName.isEmpty()) {
- String message =
- "Reject application " + applicationId + " submitted by user " + user
- + " with an empty queue name.";
- rejectApplicationWithMessage(applicationId, message);
- return;
- }
-
- if (queueName.startsWith(".") || queueName.endsWith(".")) {
- String message =
- "Reject application " + applicationId + " submitted by user " + user
- + " with an illegal queue name " + queueName + ". "
- + "The queue name cannot start/end with period.";
+ String queueName, String user, boolean isAppRecovering,
+ ApplicationPlacementContext placementContext) {
+ // If the placement was rejected the placementContext will be null.
+ // We ignore placement rules on recovery.
+ if (!isAppRecovering && placementContext == null) {
+ String message = "Reject application " + applicationId +
+ " submitted by user " + user +
+ " application rejected by placement rules.";
rejectApplicationWithMessage(applicationId, message);
return;
}
writeLock.lock();
try {
- RMApp rmApp = rmContext.getRMApps().get(applicationId);
- FSLeafQueue queue = assignToQueue(rmApp, queueName, user, applicationId);
+ // Assign the app to the queue creating and prevent queue delete.
+ FSLeafQueue queue = queueMgr.getLeafQueue(queueName, true,
+ applicationId);
if (queue == null) {
+ rejectApplicationWithMessage(applicationId,
+ queueName + " is not a leaf queue");
+ return;
+ }
+
+ // Enforce ACLs: 2nd check, there could be a time laps between the app
+ // creation in the RMAppManager and getting here. That means we could
+ // have a configuration change (prevent race condition)
+ UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(
+ user);
+
+ if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi) &&
+ !queue.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
+ String msg = "User " + user + " does not have permission to submit " +
+ applicationId + " to queue " + queueName;
+ rejectApplicationWithMessage(applicationId, msg);
+ queue.removeAssignedApp(applicationId);
return;
}
+ RMApp rmApp = rmContext.getRMApps().get(applicationId);
+ if (rmApp != null) {
+ rmApp.setQueue(queueName);
+ } else {
+ LOG.error("Couldn't find RM app for " + applicationId +
+ " to set queue name on");
+ }
+
if (rmApp != null && rmApp.getAMResourceRequests() != null) {
// Resources.fitsIn would always return false when queueMaxShare is 0
// for any resource, but only using Resources.fitsIn is not enough
@@ -499,7 +524,7 @@ public class FairScheduler extends
+ "it has zero amount of resource for a requested "
+ "resource! Invalid requested AM resources: %s, "
+ "maximum queue resources: %s",
- applicationId, queue.getName(),
+ applicationId, queueName,
invalidAMResourceRequests, queue.getMaxShare());
rejectApplicationWithMessage(applicationId, msg);
queue.removeAssignedApp(applicationId);
@@ -507,27 +532,13 @@ public class FairScheduler extends
}
}
- // Enforce ACLs
- UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(
- user);
-
- if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi) && !queue
- .hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
- String msg = "User " + userUgi.getUserName()
- + " cannot submit applications to queue " + queue.getName()
- + "(requested queuename is " + queueName + ")";
- rejectApplicationWithMessage(applicationId, msg);
- queue.removeAssignedApp(applicationId);
- return;
- }
-
SchedulerApplication<FSAppAttempt> application =
- new SchedulerApplication<FSAppAttempt>(queue, user);
+ new SchedulerApplication<>(queue, user);
applications.put(applicationId, application);
queue.getMetrics().submitApp(user);
LOG.info("Accepted application " + applicationId + " from user: " + user
- + ", in queue: " + queue.getName()
+ + ", in queue: " + queueName
+ ", currently num of applications: " + applications.size());
if (isAppRecovering) {
LOG.debug("{} is recovering. Skip notifying APP_ACCEPTED",
@@ -596,60 +607,6 @@ public class FairScheduler extends
}
}
- /**
- * Helper method for the tests to assign the app to a queue.
- */
- @VisibleForTesting
- FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) {
- return assignToQueue(rmApp, queueName, user, null);
- }
-
- /**
- * Helper method that attempts to assign the app to a queue. The method is
- * responsible to call the appropriate event-handler if the app is rejected.
- */
- private FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user,
- ApplicationId applicationId) {
- FSLeafQueue queue = null;
- String appRejectMsg = null;
-
- try {
- QueuePlacementPolicy placementPolicy = allocConf.getPlacementPolicy();
- queueName = placementPolicy.assignAppToQueue(queueName, user);
- if (queueName == null) {
- appRejectMsg = "Application rejected by queue placement policy";
- } else {
- queue = queueMgr.getLeafQueue(queueName, true, applicationId);
- if (queue == null) {
- appRejectMsg = queueName + " is not a leaf queue";
- }
- }
- } catch (IllegalStateException se) {
- appRejectMsg = "Unable to match app " + rmApp.getApplicationId() +
- " to a queue placement policy, and no valid terminal queue " +
- " placement rule is configured. Please contact an administrator " +
- " to confirm that the fair scheduler configuration contains a " +
- " valid terminal queue placement rule.";
- } catch (InvalidQueueNameException qne) {
- appRejectMsg = qne.getMessage();
- } catch (IOException ioe) {
- // IOException should only happen for a user without groups
- appRejectMsg = "Error assigning app to a queue: " + ioe.getMessage();
- }
-
- if (appRejectMsg != null && rmApp != null) {
- rejectApplicationWithMessage(rmApp.getApplicationId(), appRejectMsg);
- return null;
- }
-
- if (rmApp != null) {
- rmApp.setQueue(queue.getName());
- } else {
- LOG.error("Couldn't find RM app to set queue name on");
- }
- return queue;
- }
-
private void removeApplication(ApplicationId applicationId,
RMAppState finalState) {
SchedulerApplication<FSAppAttempt> application = applications.remove(
@@ -1265,7 +1222,8 @@ public class FairScheduler extends
if (queueName != null) {
addApplication(appAddedEvent.getApplicationId(),
queueName, appAddedEvent.getUser(),
- appAddedEvent.getIsAppRecovering());
+ appAddedEvent.getIsAppRecovering(),
+ appAddedEvent.getPlacementContext());
}
break;
case APP_REMOVED:
@@ -1442,12 +1400,8 @@ public class FairScheduler extends
// This stores per-application scheduling information
this.applications = new ConcurrentHashMap<>();
- allocConf = new AllocationConfiguration(conf);
- try {
- queueMgr.initialize(conf);
- } catch (Exception e) {
- throw new IOException("Failed to start FairScheduler", e);
- }
+ allocConf = new AllocationConfiguration(this);
+ queueMgr.initialize();
if (continuousSchedulingEnabled) {
// Continuous scheduling is deprecated log it on startup
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
index 6a1f953..4fe38d5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
@@ -18,34 +18,28 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
-import javax.xml.parsers.ParserConfigurationException;
-
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
+import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
-import org.xml.sax.SAXException;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.Iterator;
-import java.util.Set;
/**
* Maintains a list of queues as well as scheduling parameters for each queue,
@@ -106,8 +100,7 @@ public class QueueManager {
return rootQueue;
}
- public void initialize(Configuration conf) throws IOException,
- SAXException, AllocationConfigurationException, ParserConfigurationException {
+ public void initialize() {
// Policies of root and default queue are set to
// SchedulingPolicy.DEFAULT_POLICY since the allocation file hasn't been
// loaded yet.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
index 30ea213..904fb2f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
@@ -23,85 +23,261 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.Groups;
-import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.DefaultPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.FSPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PrimaryGroupPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.RejectPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.SecondaryGroupExistingPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.SpecifiedPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.UserPlacementRule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
+import static org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory.getPlacementRule;
+
+/**
+ * The FairScheduler rules based policy for placing an application in a queue.
+ * It parses the configuration and updates the {@link
+ * org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager}
+ * with a list of {@link PlacementRule}s to execute in order.
+ */
@Private
@Unstable
-public class QueuePlacementPolicy {
- private static final Map<String, Class<? extends QueuePlacementRule>> ruleClasses;
+final class QueuePlacementPolicy {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(QueuePlacementPolicy.class);
+
+ // Simple private class to make the rule mapping simpler.
+ private static final class RuleMap {
+ private final Class<? extends PlacementRule> ruleClass;
+ private final String terminal;
+
+ private RuleMap(Class<? extends PlacementRule> clazz, String terminate) {
+ this.ruleClass = clazz;
+ this.terminal = terminate;
+ }
+ }
+
+ // The list of known rules:
+ // key to the map is the name in the configuration.
+ // for each name the mapping contains the class name of the implementation
+ // and a flag (true, false or create) which describes the terminal state
+ // see the method getTerminal() for more comments.
+ private static final Map<String, RuleMap> RULES;
static {
- Map<String, Class<? extends QueuePlacementRule>> map =
- new HashMap<String, Class<? extends QueuePlacementRule>>();
- map.put("user", QueuePlacementRule.User.class);
- map.put("primaryGroup", QueuePlacementRule.PrimaryGroup.class);
+ Map<String, RuleMap> map = new HashMap<>();
+ map.put("user", new RuleMap(UserPlacementRule.class, "create"));
+ map.put("primaryGroup",
+ new RuleMap(PrimaryGroupPlacementRule.class, "create"));
map.put("secondaryGroupExistingQueue",
- QueuePlacementRule.SecondaryGroupExistingQueue.class);
- map.put("specified", QueuePlacementRule.Specified.class);
- map.put("nestedUserQueue",
- QueuePlacementRule.NestedUserQueue.class);
- map.put("default", QueuePlacementRule.Default.class);
- map.put("reject", QueuePlacementRule.Reject.class);
- ruleClasses = Collections.unmodifiableMap(map);
+ new RuleMap(SecondaryGroupExistingPlacementRule.class, "false"));
+ map.put("specified", new RuleMap(SpecifiedPlacementRule.class, "false"));
+ map.put("nestedUserQueue", new RuleMap(UserPlacementRule.class, "create"));
+ map.put("default", new RuleMap(DefaultPlacementRule.class, "create"));
+ map.put("reject", new RuleMap(RejectPlacementRule.class, "true"));
+ RULES = Collections.unmodifiableMap(map);
+ }
+
+ private QueuePlacementPolicy() {
}
-
- private final List<QueuePlacementRule> rules;
- private final Map<FSQueueType, Set<String>> configuredQueues;
- private final Groups groups;
-
- public QueuePlacementPolicy(List<QueuePlacementRule> rules,
- Map<FSQueueType, Set<String>> configuredQueues, Configuration conf)
+
+ /**
+ * Update the rules in the manager based on this placement policy.
+ * @param newRules The new list of rules to set in the manager.
+ * @param newTerminalState The list of terminal states for this set of rules.
+ * @param fs the reference to the scheduler needed in the rule on init.
+ * @throws AllocationConfigurationException for any errors
+ */
+ private static void updateRuleSet(List<PlacementRule> newRules,
+ List<Boolean> newTerminalState,
+ FairScheduler fs)
throws AllocationConfigurationException {
- for (int i = 0; i < rules.size()-1; i++) {
- if (rules.get(i).isTerminal()) {
+ if (newRules.isEmpty()) {
+ LOG.debug("Empty rule set defined, ignoring update");
+ return;
+ }
+ LOG.debug("Placement rule order check");
+ for (int i = 0; i < newTerminalState.size()-1; i++) {
+ if (newTerminalState.get(i)) {
throw new AllocationConfigurationException("Rules after rule "
- + i + " in queue placement policy can never be reached");
+ + (i+1) + " in queue placement policy can never be reached");
}
}
- if (!rules.get(rules.size()-1).isTerminal()) {
+ if (!newTerminalState.get(newTerminalState.size()-1)) {
throw new AllocationConfigurationException(
"Could get past last queue placement rule without assigning");
}
- this.rules = rules;
- this.configuredQueues = configuredQueues;
- groups = new Groups(conf);
+ // Set the scheduler in the rule to get queues etc
+ LOG.debug("Initialising new rule set");
+ try {
+ for (PlacementRule rule: newRules){
+ rule.initialize(fs);
+ }
+ } catch (IOException ioe) {
+ // We should never throw as we pass in a FS object, however we still
+ // should consider any exception here a config error.
+ throw new AllocationConfigurationException(
+ "Rule initialisation failed with exception", ioe);
+ }
+ // Update the placement manager with the new rule list.
+ // We only get here when all rules are OK.
+ fs.getRMContext().getQueuePlacementManager().updateRules(newRules);
+ LOG.debug("PlacementManager active with new rule set");
}
-
+
/**
- * Builds a QueuePlacementPolicy from an xml element.
+ * Builds a QueuePlacementPolicy from a xml element.
+ * @param confElement the placement policy xml snippet from the
+ * {@link FairSchedulerConfiguration}
+ * @param fs the reference to the scheduler needed in the rule on init.
+ * @throws AllocationConfigurationException for any errors
*/
- public static QueuePlacementPolicy fromXml(Element el,
- Map<FSQueueType, Set<String>> configuredQueues, Configuration conf)
+ static void fromXml(Element confElement, FairScheduler fs)
throws AllocationConfigurationException {
- List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
- NodeList elements = el.getChildNodes();
+ LOG.debug("Reloading placement policy from allocation config");
+ if (confElement == null || !confElement.hasChildNodes()) {
+ throw new AllocationConfigurationException(
+ "Empty configuration for QueuePlacementPolicy is not allowed");
+ }
+ List<PlacementRule> newRules = new ArrayList<>();
+ List<Boolean> newTerminalState = new ArrayList<>();
+ NodeList elements = confElement.getChildNodes();
for (int i = 0; i < elements.getLength(); i++) {
Node node = elements.item(i);
- if (node instanceof Element) {
- QueuePlacementRule rule = createAndInitializeRule(node);
- rules.add(rule);
+ if (node instanceof Element &&
+ node.getNodeName().equalsIgnoreCase("rule")) {
+ String name = ((Element) node).getAttribute("name");
+ LOG.debug("Creating new rule: {}", name);
+ PlacementRule rule = createRule((Element)node);
+
+ // The only child node that we currently know is a parent rule
+ PlacementRule parentRule = null;
+ String parentName = null;
+ Element child = getParentRuleElement(node);
+ if (child != null) {
+ parentName = child.getAttribute("name");
+ parentRule = getParentRule(child, fs);
+ }
+ // Need to make sure that the nestedUserQueue has a parent for
+ // backwards compatibility
+ if (name.equalsIgnoreCase("nestedUserQueue") && parentRule == null) {
+ throw new AllocationConfigurationException("Rule '" + name
+ + "' must have a parent rule set");
+ }
+ newRules.add(rule);
+ if (parentRule == null) {
+ newTerminalState.add(
+ getTerminal(RULES.get(name).terminal, rule));
+ } else {
+ ((FSPlacementRule)rule).setParentRule(parentRule);
+ newTerminalState.add(
+ getTerminal(RULES.get(name).terminal, rule) &&
+ getTerminal(RULES.get(parentName).terminal, parentRule));
+ }
+ }
+ }
+ updateRuleSet(newRules, newTerminalState, fs);
+ }
+
+ /**
+ * Find the element that defines the parent rule.
+ * @param node the xml node to check for a parent rule
+ * @return {@link Element} that describes the parent rule or
+ * <code>null</code> if none is found
+ */
+ private static Element getParentRuleElement(Node node)
+ throws AllocationConfigurationException {
+ Element parent = null;
+ // walk over the node list
+ if (node.hasChildNodes()) {
+ NodeList childList = node.getChildNodes();
+ for (int j = 0; j < childList.getLength(); j++) {
+ Node child = childList.item(j);
+ if (child instanceof Element &&
+ child.getNodeName().equalsIgnoreCase("rule")) {
+ if (parent != null) {
+ LOG.warn("Rule '{}' has multiple parent rules defined, only the " +
+ "last parent rule will be used",
+ ((Element) node).getAttribute("name"));
+ }
+ parent = ((Element) child);
+ }
+ }
+ }
+ // sanity check the rule that is configured
+ if (parent != null) {
+ String parentName = parent.getAttribute("name");
+ if (parentName.equals("reject") ||
+ parentName.equals("nestedUserQueue")) {
+ throw new AllocationConfigurationException("Rule '"
+ + parentName
+ + "' is not allowed as a parent rule for any rule");
}
}
- return new QueuePlacementPolicy(rules, configuredQueues, conf);
+ return parent;
}
-
+
/**
- * Create and initialize a rule given a xml node
- * @param node
- * @return QueuePlacementPolicy
- * @throws AllocationConfigurationException
+ * Retrieve the configured parent rule from the xml config.
+ * @param parent the xml element that contains the name of the rule to add.
+ * @param fs the reference to the scheduler needed in the rule on init.
+ * @return {@link PlacementRule} to set as a parent
+ * @throws AllocationConfigurationException for any error
*/
- public static QueuePlacementRule createAndInitializeRule(Node node)
+ private static PlacementRule getParentRule(Element parent,
+ FairScheduler fs)
+ throws AllocationConfigurationException {
+ LOG.debug("Creating new parent rule: {}", parent.getAttribute("name"));
+ PlacementRule parentRule = createRule(parent);
+ // Init the rule, we do not want to add it to the list of the
+ // placement manager
+ try {
+ parentRule.initialize(fs);
+ } catch (IOException ioe) {
+ // We should never throw as we pass in a FS object, however we
+ // still should consider any exception here a config error.
+ throw new AllocationConfigurationException(
+ "Parent Rule initialisation failed with exception", ioe);
+ }
+ return parentRule;
+ }
+
+ /**
+ * Returns the terminal status of the rule based on the definition and the
+ * create flag set in the rule.
+ * @param terminal The definition of the terminal flag
+ * @param rule The rule to check
+ * @return <code>true</code> if the rule is terminal <code>false</code> in
+ * all other cases.
+ */
+ private static Boolean getTerminal(String terminal, PlacementRule rule) {
+ switch (terminal) {
+ case "true": // rule is always terminal
+ return true;
+ case "false": // rule is never terminal
+ return false;
+ default: // rule is terminal based on the create flag
+ return ((FSPlacementRule)rule).getCreateFlag();
+ }
+ }
+
+ /**
+ * Create a rule from a given a xml node.
+ * @param element the xml element to create the rule from
+ * @return PlacementRule
+ * @throws AllocationConfigurationException for any error
+ */
+ @SuppressWarnings("unchecked")
+ private static PlacementRule createRule(Element element)
throws AllocationConfigurationException {
- Element element = (Element) node;
String ruleName = element.getAttribute("name");
if ("".equals(ruleName)) {
@@ -109,72 +285,54 @@ public class QueuePlacementPolicy {
+ "rule element");
}
- Class<? extends QueuePlacementRule> clazz = ruleClasses.get(ruleName);
- if (clazz == null) {
+ Class<? extends PlacementRule> ruleClass = null;
+ if (RULES.containsKey(ruleName)) {
+ ruleClass = RULES.get(ruleName).ruleClass;
+ }
+ if (ruleClass == null) {
throw new AllocationConfigurationException("No rule class found for "
+ ruleName);
}
- QueuePlacementRule rule = ReflectionUtils.newInstance(clazz, null);
- rule.initializeFromXml(element);
- return rule;
+ return getPlacementRule(ruleClass, element);
}
/**
- * Build a simple queue placement policy from the allow-undeclared-pools and
- * user-as-default-queue configuration options.
+ * Build a simple queue placement policy from the configuration options
+ * {@link FairSchedulerConfiguration#ALLOW_UNDECLARED_POOLS} and
+ * {@link FairSchedulerConfiguration#USER_AS_DEFAULT_QUEUE}.
+ * @param fs the reference to the scheduler needed in the rule on init.
*/
- public static QueuePlacementPolicy fromConfiguration(Configuration conf,
- Map<FSQueueType, Set<String>> configuredQueues) {
+ static void fromConfiguration(FairScheduler fs) {
+ LOG.debug("Creating base placement policy from config");
+ Configuration conf = fs.getConfig();
+
boolean create = conf.getBoolean(
FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS,
FairSchedulerConfiguration.DEFAULT_ALLOW_UNDECLARED_POOLS);
boolean userAsDefaultQueue = conf.getBoolean(
FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE,
FairSchedulerConfiguration.DEFAULT_USER_AS_DEFAULT_QUEUE);
- List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
- rules.add(new QueuePlacementRule.Specified().initialize(create, null));
+ List<PlacementRule> newRules = new ArrayList<>();
+ List<Boolean> newTerminalState = new ArrayList<>();
+ Class<? extends PlacementRule> clazz =
+ RULES.get("specified").ruleClass;
+ newRules.add(getPlacementRule(clazz, create));
+ newTerminalState.add(false);
if (userAsDefaultQueue) {
- rules.add(new QueuePlacementRule.User().initialize(create, null));
+ clazz = RULES.get("user").ruleClass;
+ newRules.add(getPlacementRule(clazz, create));
+ newTerminalState.add(create);
}
if (!userAsDefaultQueue || !create) {
- rules.add(new QueuePlacementRule.Default().initialize(true, null));
+ clazz = RULES.get("default").ruleClass;
+ newRules.add(getPlacementRule(clazz, true));
+ newTerminalState.add(true);
}
try {
- return new QueuePlacementPolicy(rules, configuredQueues, conf);
+ updateRuleSet(newRules, newTerminalState, fs);
} catch (AllocationConfigurationException ex) {
throw new RuntimeException("Should never hit exception when loading" +
- "placement policy from conf", ex);
+ "placement policy from conf", ex);
}
}
-
- /**
- * Applies this rule to an app with the given requested queue and user/group
- * information.
- *
- * @param requestedQueue
- * The queue specified in the ApplicationSubmissionContext
- * @param user
- * The user submitting the app
- * @return
- * The name of the queue to assign the app to. Or null if the app should
- * be rejected.
- * @throws IOException
- * If an exception is encountered while getting the user's groups
- */
- public String assignAppToQueue(String requestedQueue, String user)
- throws IOException {
- for (QueuePlacementRule rule : rules) {
- String queue = rule.assignAppToQueue(requestedQueue, user, groups,
- configuredQueues);
- if (queue == null || !queue.isEmpty()) {
- return queue;
- }
- }
- throw new IllegalStateException("Should have applied a rule before " +
- "reaching here");
- }
-
- public List<QueuePlacementRule> getRules() {
- return rules;
- }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java
deleted file mode 100644
index 1d48a21..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java
+++ /dev/null
@@ -1,366 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.security.Groups;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.w3c.dom.Element;
-import org.w3c.dom.NamedNodeMap;
-import org.w3c.dom.Node;
-import org.w3c.dom.NodeList;
-
-import com.google.common.annotations.VisibleForTesting;
-
-@Private
-@Unstable
-public abstract class QueuePlacementRule {
- protected boolean create;
- public static final Logger LOG =
- LoggerFactory.getLogger(QueuePlacementRule.class.getName());
-
- /**
- * Initializes the rule with any arguments.
- *
- * @param args
- * Additional attributes of the rule's xml element other than create.
- */
- public QueuePlacementRule initialize(boolean create, Map<String, String> args) {
- this.create = create;
- return this;
- }
-
- /**
- *
- * @param requestedQueue
- * The queue explicitly requested.
- * @param user
- * The user submitting the app.
- * @param groups
- * The groups of the user submitting the app.
- * @param configuredQueues
- * The queues specified in the scheduler configuration.
- * @return
- * The queue to place the app into. An empty string indicates that we should
- * continue to the next rule, and null indicates that the app should be rejected.
- */
- public String assignAppToQueue(String requestedQueue, String user,
- Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
- throws IOException {
- String queue = getQueueForApp(requestedQueue, user, groups,
- configuredQueues);
- if (create || configuredQueues.get(FSQueueType.LEAF).contains(queue)
- || configuredQueues.get(FSQueueType.PARENT).contains(queue)) {
- return queue;
- } else {
- return "";
- }
- }
-
- public void initializeFromXml(Element el)
- throws AllocationConfigurationException {
- boolean create = true;
- NamedNodeMap attributes = el.getAttributes();
- Map<String, String> args = new HashMap<String, String>();
- for (int i = 0; i < attributes.getLength(); i++) {
- Node node = attributes.item(i);
- String key = node.getNodeName();
- String value = node.getNodeValue();
- if (key.equals("create")) {
- create = Boolean.parseBoolean(value);
- } else {
- args.put(key, value);
- }
- }
- initialize(create, args);
- }
-
- /**
- * Returns true if this rule never tells the policy to continue.
- */
- public abstract boolean isTerminal();
-
- /**
- * Applies this rule to an app with the given requested queue and user/group
- * information.
- *
- * @param requestedQueue
- * The queue specified in the ApplicationSubmissionContext
- * @param user
- * The user submitting the app.
- * @param groups
- * The groups of the user submitting the app.
- * @return
- * The name of the queue to assign the app to, or null to empty string
- * continue to the next rule.
- */
- protected abstract String getQueueForApp(String requestedQueue, String user,
- Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
- throws IOException;
-
- /**
- * Places apps in queues by username of the submitter
- */
- public static class User extends QueuePlacementRule {
- @Override
- protected String getQueueForApp(String requestedQueue, String user,
- Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
- return "root." + cleanName(user);
- }
-
- @Override
- public boolean isTerminal() {
- return create;
- }
- }
-
- /**
- * Places apps in queues by primary group of the submitter
- */
- public static class PrimaryGroup extends QueuePlacementRule {
- @Override
- protected String getQueueForApp(String requestedQueue, String user,
- Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
- throws IOException {
- final List<String> groupList = groups.getGroups(user);
- if (groupList.isEmpty()) {
- throw new IOException("No groups returned for user " + user);
- }
- return "root." + cleanName(groupList.get(0));
- }
-
- @Override
- public boolean isTerminal() {
- return create;
- }
- }
-
- /**
- * Places apps in queues by secondary group of the submitter
- *
- * Match will be made on first secondary group that exist in
- * queues
- */
- public static class SecondaryGroupExistingQueue extends QueuePlacementRule {
- @Override
- protected String getQueueForApp(String requestedQueue, String user,
- Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
- throws IOException {
- List<String> groupNames = groups.getGroups(user);
- for (int i = 1; i < groupNames.size(); i++) {
- String group = cleanName(groupNames.get(i));
- if (configuredQueues.get(FSQueueType.LEAF).contains("root." + group)
- || configuredQueues.get(FSQueueType.PARENT).contains(
- "root." + group)) {
- return "root." + group;
- }
- }
-
- return "";
- }
-
- @Override
- public boolean isTerminal() {
- return false;
- }
- }
-
- /**
- * Places apps in queues with name of the submitter under the queue
- * returned by the nested rule.
- */
- public static class NestedUserQueue extends QueuePlacementRule {
- @VisibleForTesting
- QueuePlacementRule nestedRule;
-
- /**
- * Parse xml and instantiate the nested rule
- */
- @Override
- public void initializeFromXml(Element el)
- throws AllocationConfigurationException {
- NodeList elements = el.getChildNodes();
-
- for (int i = 0; i < elements.getLength(); i++) {
- Node node = elements.item(i);
- if (node instanceof Element) {
- Element element = (Element) node;
- if ("rule".equals(element.getTagName())) {
- QueuePlacementRule rule = QueuePlacementPolicy
- .createAndInitializeRule(node);
- if (rule == null) {
- throw new AllocationConfigurationException(
- "Unable to create nested rule in nestedUserQueue rule");
- }
- this.nestedRule = rule;
- break;
- } else {
- continue;
- }
- }
- }
-
- if (this.nestedRule == null) {
- throw new AllocationConfigurationException(
- "No nested rule specified in <nestedUserQueue> rule");
- }
- super.initializeFromXml(el);
- }
-
- @Override
- protected String getQueueForApp(String requestedQueue, String user,
- Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
- throws IOException {
- // Apply the nested rule
- String queueName = nestedRule.assignAppToQueue(requestedQueue, user,
- groups, configuredQueues);
-
- if (queueName != null && queueName.length() != 0) {
- if (!queueName.startsWith("root.")) {
- queueName = "root." + queueName;
- }
-
- // Verify if the queue returned by the nested rule is an configured leaf queue,
- // if yes then skip to next rule in the queue placement policy
- if (configuredQueues.get(FSQueueType.LEAF).contains(queueName)) {
- return "";
- }
- return queueName + "." + cleanName(user);
- }
- return queueName;
- }
-
- @Override
- public boolean isTerminal() {
- return false;
- }
- }
-
- /**
- * Places apps in queues by requested queue of the submitter
- */
- public static class Specified extends QueuePlacementRule {
- @Override
- protected String getQueueForApp(String requestedQueue, String user,
- Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
- if (requestedQueue.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)) {
- return "";
- } else {
- if (!requestedQueue.startsWith("root.")) {
- requestedQueue = "root." + requestedQueue;
- }
- return requestedQueue;
- }
- }
-
- @Override
- public boolean isTerminal() {
- return false;
- }
- }
-
- /**
- * Places apps in the specified default queue. If no default queue is
- * specified the app is placed in root.default queue.
- */
- public static class Default extends QueuePlacementRule {
- @VisibleForTesting
- String defaultQueueName;
-
- @Override
- public QueuePlacementRule initialize(boolean create,
- Map<String, String> args) {
- if (defaultQueueName == null) {
- defaultQueueName = "root." + YarnConfiguration.DEFAULT_QUEUE_NAME;
- }
- return super.initialize(create, args);
- }
-
- @Override
- public void initializeFromXml(Element el)
- throws AllocationConfigurationException {
- defaultQueueName = el.getAttribute("queue");
- if (defaultQueueName != null && !defaultQueueName.isEmpty()) {
- if (!defaultQueueName.startsWith("root.")) {
- defaultQueueName = "root." + defaultQueueName;
- }
- } else {
- defaultQueueName = "root." + YarnConfiguration.DEFAULT_QUEUE_NAME;
- }
- super.initializeFromXml(el);
- }
-
- @Override
- protected String getQueueForApp(String requestedQueue, String user,
- Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
- return defaultQueueName;
- }
-
- @Override
- public boolean isTerminal() {
- return true;
- }
- }
-
- /**
- * Rejects all apps
- */
- public static class Reject extends QueuePlacementRule {
- @Override
- public String assignAppToQueue(String requestedQueue, String user,
- Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
- return null;
- }
-
- @Override
- protected String getQueueForApp(String requestedQueue, String user,
- Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean isTerminal() {
- return true;
- }
- }
-
- /**
- * Replace the periods in the username or groupname with "_dot_" and
- * remove trailing and leading whitespace.
- */
- protected String cleanName(String name) {
- name = name.trim();
- if (name.contains(".")) {
- String converted = name.replaceAll("\\.", "_dot_");
- LOG.warn("Name " + name + " is converted to " + converted
- + " when it is used as a queue name.");
- return converted;
- } else {
- return name;
- }
- }
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/QueueProperties.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/QueueProperties.java
index badf05f..c2ee8c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/QueueProperties.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/QueueProperties.java
@@ -169,8 +169,7 @@ public class QueueProperties {
private Set<String> nonPreemptableQueues = new HashSet<>();
// Remember all queue names so we can display them on web UI, etc.
// configuredQueues is segregated based on whether it is a leaf queue
- // or a parent queue. This information is used for creating queues
- // and also for making queue placement decisions(QueuePlacementRule.java).
+ // or a parent queue. This information is used for creating queues.
private Map<FSQueueType, Set<String>> configuredQueues = new HashMap<>();
Builder() {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManagerWithFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManagerWithFairScheduler.java
index e30c2a6..d0dcae0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManagerWithFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManagerWithFairScheduler.java
@@ -18,32 +18,30 @@
package org.apache.hadoop.yarn.server.resourcemanager;
+import static junit.framework.TestCase.assertTrue;
import static org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException.InvalidResourceType.GREATER_THEN_MAX_ALLOCATION;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.matches;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
-import java.util.HashMap;
+import java.util.Collections;
+import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.MockApps;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -51,39 +49,34 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedule
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.Resources;
-import org.junit.AfterClass;
+import org.junit.After;
import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
/**
- * Testing applications being retired from RM with fair scheduler.
- *
+ * Testing RMAppManager application submission with fair scheduler.
*/
-public class TestAppManagerWithFairScheduler extends AppManagerTestBase{
+public class TestAppManagerWithFairScheduler extends AppManagerTestBase {
private static final String TEST_FOLDER = "test-queues";
private static YarnConfiguration conf = new YarnConfiguration();
+ private PlacementManager placementMgr;
+ private TestRMAppManager rmAppManager;
+ private RMContext rmContext;
+ private static String allocFileName =
+ GenericTestUtils.getTestDir(TEST_FOLDER).getAbsolutePath();
- @BeforeClass
- public static void setup() throws IOException {
- String allocFile =
- GenericTestUtils.getTestDir(TEST_FOLDER).getAbsolutePath();
-
- int queueMaxAllocation = 512;
-
- PrintWriter out = new PrintWriter(new FileWriter(allocFile));
+ @Before
+ public void setup() throws IOException {
+ // Basic config with one queue (override in test if needed)
+ PrintWriter out = new PrintWriter(new FileWriter(allocFileName));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
- out.println(" <queue name=\"queueA\">");
- out.println(" <maxContainerAllocation>" + queueMaxAllocation
- + " mb 1 vcores" + "</maxContainerAllocation>");
- out.println(" </queue>");
- out.println(" <queue name=\"queueB\">");
+ out.println(" <queue name=\"test\">");
out.println(" </queue>");
out.println("</allocations>");
out.close();
@@ -91,87 +84,205 @@ public class TestAppManagerWithFairScheduler extends AppManagerTestBase{
conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
ResourceScheduler.class);
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, allocFile);
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, allocFileName);
+ placementMgr = mock(PlacementManager.class);
+
+ MockRM mockRM = new MockRM(conf);
+ rmContext = mockRM.getRMContext();
+ rmContext.setQueuePlacementManager(placementMgr);
+ ApplicationMasterService masterService = new ApplicationMasterService(
+ rmContext, rmContext.getScheduler());
+
+ rmAppManager = new TestRMAppManager(rmContext,
+ new ClientToAMTokenSecretManagerInRM(), rmContext.getScheduler(),
+ masterService, new ApplicationACLsManager(conf), conf);
}
- @AfterClass
- public static void teardown(){
+ @After
+ public void teardown(){
File allocFile = GenericTestUtils.getTestDir(TEST_FOLDER);
allocFile.delete();
}
@Test
public void testQueueSubmitWithHighQueueContainerSize()
- throws YarnException {
+ throws YarnException, IOException {
+ int maxAlloc =
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB;
- ApplicationId appId = MockApps.newAppID(1);
- RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-
- Resource resource = Resources.createResource(
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
-
- ApplicationSubmissionContext asContext =
- recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
- asContext.setApplicationId(appId);
- asContext.setResource(resource);
- asContext.setPriority(Priority.newInstance(0));
- asContext.setAMContainerSpec(mockContainerLaunchContext(recordFactory));
- asContext.setQueue("queueA");
- QueueInfo mockDefaultQueueInfo = mock(QueueInfo.class);
-
- // Setup a PlacementManager returns a new queue
- PlacementManager placementMgr = mock(PlacementManager.class);
- doAnswer(new Answer<ApplicationPlacementContext>() {
-
- @Override
- public ApplicationPlacementContext answer(InvocationOnMock invocation)
- throws Throwable {
- return new ApplicationPlacementContext("queueA");
- }
-
- }).when(placementMgr).placeApplication(
- any(ApplicationSubmissionContext.class), matches("test1"));
- doAnswer(new Answer<ApplicationPlacementContext>() {
-
- @Override
- public ApplicationPlacementContext answer(InvocationOnMock invocation)
- throws Throwable {
- return new ApplicationPlacementContext("queueB");
- }
-
- }).when(placementMgr).placeApplication(
- any(ApplicationSubmissionContext.class), matches("test2"));
-
- MockRM newMockRM = new MockRM(conf);
- RMContext newMockRMContext = newMockRM.getRMContext();
- newMockRMContext.setQueuePlacementManager(placementMgr);
- ApplicationMasterService masterService = new ApplicationMasterService(
- newMockRMContext, newMockRMContext.getScheduler());
-
- TestRMAppManager newAppMonitor = new TestRMAppManager(newMockRMContext,
- new ClientToAMTokenSecretManagerInRM(), newMockRMContext.getScheduler(),
- masterService, new ApplicationACLsManager(conf), conf);
+ // scheduler config with a limited queue
+ PrintWriter out = new PrintWriter(new FileWriter(allocFileName));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println(" <queue name=\"root\">");
+ out.println(" <queue name=\"limited\">");
+ out.println(" <maxContainerAllocation>" + maxAlloc + " mb 1 vcores");
+ out.println(" </maxContainerAllocation>");
+ out.println(" </queue>");
+ out.println(" <queue name=\"unlimited\">");
+ out.println(" </queue>");
+ out.println(" </queue>");
+ out.println("</allocations>");
+ out.close();
+ rmContext.getScheduler().reinitialize(conf, rmContext);
- // only user test has permission to submit to 'test' queue
+ ApplicationId appId = MockApps.newAppID(1);
+ Resource res = Resources.createResource(maxAlloc + 1);
+ ApplicationSubmissionContext asContext = createAppSubmitCtx(appId, res);
+ // Submit to limited queue
+ when(placementMgr.placeApplication(any(), any()))
+ .thenReturn(new ApplicationPlacementContext("limited"));
try {
- newAppMonitor.submitApplication(asContext, "test1");
+ rmAppManager.submitApplication(asContext, "test");
Assert.fail("Test should fail on too high allocation!");
} catch (InvalidResourceRequestException e) {
Assert.assertEquals(GREATER_THEN_MAX_ALLOCATION,
e.getInvalidResourceType());
}
- // Should not throw exception
- newAppMonitor.submitApplication(asContext, "test2");
+ // submit same app but now place it in the unlimited queue
+ when(placementMgr.placeApplication(any(), any()))
+ .thenReturn(new ApplicationPlacementContext("root.unlimited"));
+ rmAppManager.submitApplication(asContext, "test");
}
- private static ContainerLaunchContext mockContainerLaunchContext(
- RecordFactory recordFactory) {
- ContainerLaunchContext amContainer = recordFactory.newRecordInstance(
- ContainerLaunchContext.class);
- amContainer
- .setApplicationACLs(new HashMap<ApplicationAccessType, String>());
- return amContainer;
+ @Test
+ public void testQueueSubmitWithPermissionLimits()
+ throws YarnException, IOException {
+
+ conf.set(YarnConfiguration.YARN_ACL_ENABLE, "true");
+
+ PrintWriter out = new PrintWriter(new FileWriter(allocFileName));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println(" <queue name=\"root\">");
+ out.println(" <aclSubmitApps> </aclSubmitApps>");
+ out.println(" <aclAdministerApps> </aclAdministerApps>");
+ out.println(" <queue name=\"noaccess\">");
+ out.println(" </queue>");
+ out.println(" <queue name=\"submitonly\">");
+ out.println(" <aclSubmitApps>test </aclSubmitApps>");
+ out.println(" <aclAdministerApps> </aclAdministerApps>");
+ out.println(" </queue>");
+ out.println(" <queue name=\"adminonly\">");
+ out.println(" <aclSubmitApps> </aclSubmitApps>");
+ out.println(" <aclAdministerApps>test </aclAdministerApps>");
+ out.println(" </queue>");
+ out.println(" </queue>");
+ out.println("</allocations>");
+ out.close();
+ rmContext.getScheduler().reinitialize(conf, rmContext);
+
+ ApplicationId appId = MockApps.newAppID(1);
+ Resource res = Resources.createResource(1024, 1);
+ ApplicationSubmissionContext asContext = createAppSubmitCtx(appId, res);
+
+ // Submit to no access queue
+ when(placementMgr.placeApplication(any(), any()))
+ .thenReturn(new ApplicationPlacementContext("noaccess"));
+ try {
+ rmAppManager.submitApplication(asContext, "test");
+ Assert.fail("Test should have failed with access denied");
+ } catch (YarnException e) {
+ assertTrue("Access exception not found",
+ e.getCause() instanceof AccessControlException);
+ }
+ // Submit to submit access queue
+ when(placementMgr.placeApplication(any(), any()))
+ .thenReturn(new ApplicationPlacementContext("submitonly"));
+ rmAppManager.submitApplication(asContext, "test");
+ // Submit second app to admin access queue
+ appId = MockApps.newAppID(2);
+ asContext = createAppSubmitCtx(appId, res);
+ when(placementMgr.placeApplication(any(), any()))
+ .thenReturn(new ApplicationPlacementContext("adminonly"));
+ rmAppManager.submitApplication(asContext, "test");
+ }
+
+ @Test
+ public void testQueueSubmitWithRootPermission()
+ throws YarnException, IOException {
+
+ conf.set(YarnConfiguration.YARN_ACL_ENABLE, "true");
+
+ PrintWriter out = new PrintWriter(new FileWriter(allocFileName));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println(" <queue name=\"root\">");
+ out.println(" <queue name=\"noaccess\">");
+ out.println(" <aclSubmitApps> </aclSubmitApps>");
+ out.println(" <aclAdministerApps> </aclAdministerApps>");
+ out.println(" </queue>");
+ out.println(" </queue>");
+ out.println("</allocations>");
+ out.close();
+ rmContext.getScheduler().reinitialize(conf, rmContext);
+
+ ApplicationId appId = MockApps.newAppID(1);
+ Resource res = Resources.createResource(1024, 1);
+ ApplicationSubmissionContext asContext = createAppSubmitCtx(appId, res);
+
+ // Submit to noaccess queue should be allowed by root ACL
+ when(placementMgr.placeApplication(any(), any()))
+ .thenReturn(new ApplicationPlacementContext("noaccess"));
+ rmAppManager.submitApplication(asContext, "test");
+ }
+
+ @Test
+ public void testQueueSubmitWithAutoCreateQueue()
+ throws YarnException, IOException {
+
+ conf.set(YarnConfiguration.YARN_ACL_ENABLE, "true");
+
+ PrintWriter out = new PrintWriter(new FileWriter(allocFileName));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println(" <queue name=\"root\">");
+ out.println(" <aclSubmitApps> </aclSubmitApps>");
+ out.println(" <aclAdministerApps> </aclAdministerApps>");
+ out.println(" <queue name=\"noaccess\" type=\"parent\">");
+ out.println(" </queue>");
+ out.println(" <queue name=\"submitonly\" type=\"parent\">");
+ out.println(" <aclSubmitApps>test </aclSubmitApps>");
+ out.println(" </queue>");
+ out.println(" </queue>");
+ out.println("</allocations>");
+ out.close();
+ rmContext.getScheduler().reinitialize(conf, rmContext);
+
+ ApplicationId appId = MockApps.newAppID(1);
+ Resource res = Resources.createResource(1024, 1);
+ ApplicationSubmissionContext asContext = createAppSubmitCtx(appId, res);
+
+ // Submit to noaccess parent with non existent child queue
+ when(placementMgr.placeApplication(any(), any()))
+ .thenReturn(new ApplicationPlacementContext("root.noaccess.child"));
+ try {
+ rmAppManager.submitApplication(asContext, "test");
+ Assert.fail("Test should have failed with access denied");
+ } catch (YarnException e) {
+ assertTrue("Access exception not found",
+ e.getCause() instanceof AccessControlException);
+ }
+ // Submit to submitonly parent with non existent child queue
+ when(placementMgr.placeApplication(any(), any()))
+ .thenReturn(new ApplicationPlacementContext("root.submitonly.child"));
+ rmAppManager.submitApplication(asContext, "test");
+ }
+
+ private ApplicationSubmissionContext createAppSubmitCtx(ApplicationId appId,
+ Resource res) {
+ ApplicationSubmissionContext asContext =
+ Records.newRecord(ApplicationSubmissionContext.class);
+ asContext.setApplicationId(appId);
+ ResourceRequest resReg =
+ ResourceRequest.newInstance(Priority.newInstance(0),
+ ResourceRequest.ANY, res, 1);
+ asContext.setAMContainerResourceRequests(
+ Collections.singletonList(resReg));
+ asContext.setAMContainerSpec(mock(ContainerLaunchContext.class));
+ asContext.setQueue("default");
+ return asContext;
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java
index 667d089..e952c50 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java
@@ -466,7 +466,7 @@ public class TestApplicationACLs extends ParameterizedSchedulerTestBase {
if (conf.get(YarnConfiguration.RM_SCHEDULER)
.equals(FairScheduler.class.getName())) {
Assert.assertTrue(appReport.getDiagnostics()
- .contains("Application rejected by queue placement policy"));
+ .contains("user owner application rejected by placement rules."));
} else {
Assert.assertTrue(appReport.getDiagnostics()
.contains("submitted by user owner to unknown queue: InvalidQueue"));
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java
index c6cebd9..f61d427 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -106,8 +108,14 @@ public abstract class TestSchedulerPlanFollowerBase {
ApplicationId appId = ApplicationId.newInstance(0, 1);
ApplicationAttemptId appAttemptId_0 =
ApplicationAttemptId.newInstance(appId, 0);
- AppAddedSchedulerEvent addAppEvent =
- new AppAddedSchedulerEvent(appId, q.getQueueName(), user_0);
+ AppAddedSchedulerEvent addAppEvent;
+ if (scheduler instanceof FairScheduler) {
+ addAppEvent = new AppAddedSchedulerEvent(appId, q.getQueueName(),
+ user_0, new ApplicationPlacementContext("dedicated"));
+ } else {
+ addAppEvent = new AppAddedSchedulerEvent(appId, q.getQueueName(),
+ user_0);
+ }
scheduler.handle(addAppEvent);
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId_0, false);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
index f25df07..2e645c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
@@ -81,6 +81,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMW
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -978,14 +979,17 @@ public class TestSchedulerUtils {
}
public static SchedulerApplication<SchedulerApplicationAttempt>
- verifyAppAddedAndRemovedFromScheduler(
- Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>> applications,
+ verifyAppAddedAndRemovedFromScheduler(
+ Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>>
+ applications,
EventHandler<SchedulerEvent> handler, String queueName) {
+ ApplicationPlacementContext apc =
+ new ApplicationPlacementContext(queueName);
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
AppAddedSchedulerEvent appAddedEvent =
- new AppAddedSchedulerEvent(appId, queueName, "user");
+ new AppAddedSchedulerEvent(appId, queueName, "user", apc);
handler.handle(appAddedEvent);
SchedulerApplication<SchedulerApplicationAttempt> app =
applications.get(appId);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
index 4f1f20b..f149cb0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -177,7 +178,11 @@ public class FairSchedulerTestBase {
Collection<ResourceRequest> requests, String queueId, String userId) {
ApplicationAttemptId id =
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
- scheduler.addApplication(id.getApplicationId(), queueId, userId, false);
+ // This fakes the placement which is not part of the scheduler anymore
+ ApplicationPlacementContext placementCtx =
+ new ApplicationPlacementContext(queueId);
+ scheduler.addApplication(id.getApplicationId(), queueId, userId, false,
+ placementCtx);
// This conditional is for testAclSubmitApplication where app is rejected
// and no app is added.
if (scheduler.getSchedulerApplications()
@@ -212,10 +217,15 @@ public class FairSchedulerTestBase {
String userId, List<ResourceRequest> ask) {
ApplicationAttemptId id = createAppAttemptId(this.APP_ID++,
this.ATTEMPT_ID++);
- scheduler.addApplication(id.getApplicationId(), queueId, userId, false);
+ // This fakes the placement which is not part of the scheduler anymore
+ ApplicationPlacementContext placementCtx =
+ new ApplicationPlacementContext(queueId);
+ scheduler.addApplication(id.getApplicationId(), queueId, userId, false,
+ placementCtx);
// This conditional is for testAclSubmitApplication where app is rejected
// and no app is added.
- if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) {
+ if (scheduler.getSchedulerApplications().containsKey(
+ id.getApplicationId())) {
scheduler.addApplicationAttempt(id, false, false);
}
@@ -298,8 +308,11 @@ public class FairSchedulerTestBase {
resourceManager.getRMContext().getRMApps().get(appId).handle(event);
event = new RMAppEvent(appId, RMAppEventType.APP_ACCEPTED);
resourceManager.getRMContext().getRMApps().get(appId).handle(event);
+ // This fakes the placement which is not part of the scheduler anymore
+ ApplicationPlacementContext placementCtx =
+ new ApplicationPlacementContext(queue);
AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(
- appId, queue, user);
+ appId, queue, user, placementCtx);
scheduler.handle(appAddedEvent);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
index 7afc3777..a02ef55 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
@@ -25,14 +25,23 @@ import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.DefaultPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.FSPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PrimaryGroupPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.SpecifiedPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.UserPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.NestedUserQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileWriter;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.apache.hadoop.yarn.util.ControlledClock;
+import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.CustomResourceTypesConfigurationProvider;
import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
@@ -52,7 +61,12 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+/**
+ * Test loading the allocation file for the FairScheduler.
+ */
public class TestAllocationFileLoaderService {
private static final String A_CUSTOM_RESOURCE = "a-custom-resource";
@@ -64,10 +78,28 @@ public class TestAllocationFileLoaderService {
"test-queues").getAbsolutePath();
private static final String TEST_FAIRSCHED_XML = "test-fair-scheduler.xml";
+ private FairScheduler scheduler;
+ private Configuration conf;
+
+ @Before
+ public void setup() {
+ SystemClock clock = SystemClock.getInstance();
+ PlacementManager placementManager = new PlacementManager();
+ FairSchedulerConfiguration fsConf = new FairSchedulerConfiguration();
+ RMContext rmContext = mock(RMContext.class);
+ when(rmContext.getQueuePlacementManager()).thenReturn(placementManager);
+
+ scheduler = mock(FairScheduler.class);
+ conf = new YarnConfiguration();
+ when(scheduler.getClock()).thenReturn(clock);
+ when(scheduler.getConf()).thenReturn(fsConf);
+ when(scheduler.getConfig()).thenReturn(conf);
+ when(scheduler.getRMContext()).thenReturn(rmContext);
+ }
+
@Test
public void testGetAllocationFileFromFileSystem()
throws IOException, URISyntaxException {
- Configuration conf = new YarnConfiguration();
File baseDir =
new File(TEST_DIR + Path.SEPARATOR + "getAllocHDFS").getAbsoluteFile();
FileUtil.fullyDelete(baseDir);
@@ -83,7 +115,8 @@ public class TestAllocationFileLoaderService {
fs.copyFromLocalFile(new Path(fschedURL.toURI()), new Path(fsAllocPath));
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, fsAllocPath);
- AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+ AllocationFileLoaderService allocLoader =
+ new AllocationFileLoaderService(scheduler);
Path allocationFile = allocLoader.getAllocationFile(conf);
assertEquals(fsAllocPath, allocationFile.toString());
assertTrue(fs.exists(allocationFile));
@@ -94,9 +127,9 @@ public class TestAllocationFileLoaderService {
@Test (expected = UnsupportedFileSystemException.class)
public void testDenyGetAllocationFileFromUnsupportedFileSystem()
throws UnsupportedFileSystemException {
- Configuration conf = new YarnConfiguration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, "badfs:///badfile");
- AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+ AllocationFileLoaderService allocLoader =
+ new AllocationFileLoaderService(scheduler);
allocLoader.getAllocationFile(conf);
}
@@ -104,12 +137,11 @@ public class TestAllocationFileLoaderService {
@Test
public void testGetAllocationFileFromClasspath() {
try {
- Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
TEST_FAIRSCHED_XML);
AllocationFileLoaderService allocLoader =
- new AllocationFileLoaderService();
+ new AllocationFileLoaderService(scheduler);
Path allocationFile = allocLoader.getAllocationFile(conf);
assertEquals(TEST_FAIRSCHED_XML, allocationFile.getName());
assertTrue(fs.exists(allocationFile));
@@ -135,11 +167,10 @@ public class TestAllocationFileLoaderService {
ControlledClock clock = new ControlledClock();
clock.setTime(0);
- Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(
- clock);
+ clock, scheduler);
allocLoader.reloadIntervalMs = 5;
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
@@ -148,10 +179,10 @@ public class TestAllocationFileLoaderService {
AllocationConfiguration allocConf = confHolder.allocConf;
// Verify conf
- QueuePlacementPolicy policy = allocConf.getPlacementPolicy();
- List<QueuePlacementRule> rules = policy.getRules();
+ List<PlacementRule> rules = scheduler.getRMContext()
+ .getQueuePlacementManager().getPlacementRules();
assertEquals(1, rules.size());
- assertEquals(QueuePlacementRule.Default.class, rules.get(0).getClass());
+ assertEquals(DefaultPlacementRule.class, rules.get(0).getClass());
assertEquals(1, allocConf.getQueueMaxApps("root.queueA"));
assertEquals(2, allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
.size());
@@ -160,6 +191,7 @@ public class TestAllocationFileLoaderService {
assertTrue(allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
.contains("root.queueB"));
+ // reset the conf so we can detect the reload
confHolder.allocConf = null;
// Modify file and advance the clock
@@ -174,7 +206,6 @@ public class TestAllocationFileLoaderService {
out.println(" <rule name='nestedUserQueue' >");
out.println(" <rule name='primaryGroup' />");
out.println(" </rule>");
- out.println(" <rule name='default' />");
out.println(" </queuePlacementPolicy>");
out.println("</allocations>");
out.close();
@@ -189,15 +220,13 @@ public class TestAllocationFileLoaderService {
// Verify conf
allocConf = confHolder.allocConf;
- policy = allocConf.getPlacementPolicy();
- rules = policy.getRules();
- assertEquals(3, rules.size());
- assertEquals(QueuePlacementRule.Specified.class, rules.get(0).getClass());
- assertEquals(QueuePlacementRule.NestedUserQueue.class, rules.get(1)
- .getClass());
- assertEquals(QueuePlacementRule.PrimaryGroup.class,
- ((NestedUserQueue) (rules.get(1))).nestedRule.getClass());
- assertEquals(QueuePlacementRule.Default.class, rules.get(2).getClass());
+ rules = scheduler.getRMContext().getQueuePlacementManager()
+ .getPlacementRules();
+ assertEquals(2, rules.size());
+ assertEquals(SpecifiedPlacementRule.class, rules.get(0).getClass());
+ assertEquals(UserPlacementRule.class, rules.get(1).getClass());
+ assertEquals(PrimaryGroupPlacementRule.class,
+ ((FSPlacementRule)(rules.get(1))).getParentRule().getClass());
assertEquals(3, allocConf.getQueueMaxApps("root.queueB"));
assertEquals(1, allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
.size());
@@ -207,11 +236,11 @@ public class TestAllocationFileLoaderService {
@Test
public void testAllocationFileParsing() throws Exception {
- Configuration conf = new YarnConfiguration();
CustomResourceTypesConfigurationProvider.
initResourceTypes(A_CUSTOM_RESOURCE);
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
- AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+ AllocationFileLoaderService allocLoader =
+ new AllocationFileLoaderService(scheduler);
AllocationFileWriter
.create()
@@ -455,9 +484,9 @@ public class TestAllocationFileLoaderService {
@Test
public void testBackwardsCompatibleAllocationFileParsing() throws Exception {
- Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
- AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+ AllocationFileLoaderService allocLoader =
+ new AllocationFileLoaderService(scheduler);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
@@ -569,7 +598,6 @@ public class TestAllocationFileLoaderService {
@Test
public void testSimplePlacementPolicyFromConf() throws Exception {
- Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false);
conf.setBoolean(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, false);
@@ -580,19 +608,20 @@ public class TestAllocationFileLoaderService {
out.println("</allocations>");
out.close();
- AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+ AllocationFileLoaderService allocLoader =
+ new AllocationFileLoaderService(scheduler);
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
allocLoader.reloadAllocations();
- AllocationConfiguration allocConf = confHolder.allocConf;
- QueuePlacementPolicy placementPolicy = allocConf.getPlacementPolicy();
- List<QueuePlacementRule> rules = placementPolicy.getRules();
+ List<PlacementRule> rules = scheduler.getRMContext()
+ .getQueuePlacementManager().getPlacementRules();
assertEquals(2, rules.size());
- assertEquals(QueuePlacementRule.Specified.class, rules.get(0).getClass());
- assertEquals(false, rules.get(0).create);
- assertEquals(QueuePlacementRule.Default.class, rules.get(1).getClass());
+ assertEquals(SpecifiedPlacementRule.class, rules.get(0).getClass());
+ assertFalse("Create flag was not set to false",
+ ((FSPlacementRule)rules.get(0)).getCreateFlag());
+ assertEquals(DefaultPlacementRule.class, rules.get(1).getClass());
}
/**
@@ -601,7 +630,6 @@ public class TestAllocationFileLoaderService {
*/
@Test (expected = AllocationConfigurationException.class)
public void testQueueAlongsideRoot() throws Exception {
- Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
@@ -614,7 +642,8 @@ public class TestAllocationFileLoaderService {
out.println("</allocations>");
out.close();
- AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+ AllocationFileLoaderService allocLoader =
+ new AllocationFileLoaderService(scheduler);
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
@@ -627,7 +656,6 @@ public class TestAllocationFileLoaderService {
*/
@Test (expected = AllocationConfigurationException.class)
public void testQueueNameContainingPeriods() throws Exception {
- Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
@@ -638,7 +666,8 @@ public class TestAllocationFileLoaderService {
out.println("</allocations>");
out.close();
- AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+ AllocationFileLoaderService allocLoader =
+ new AllocationFileLoaderService(scheduler);
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
@@ -651,7 +680,6 @@ public class TestAllocationFileLoaderService {
*/
@Test (expected = AllocationConfigurationException.class)
public void testQueueNameContainingOnlyWhitespace() throws Exception {
- Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
@@ -662,7 +690,8 @@ public class TestAllocationFileLoaderService {
out.println("</allocations>");
out.close();
- AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+ AllocationFileLoaderService allocLoader =
+ new AllocationFileLoaderService(scheduler);
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
@@ -671,7 +700,6 @@ public class TestAllocationFileLoaderService {
@Test
public void testParentTagWithReservation() throws Exception {
- Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
@@ -684,7 +712,8 @@ public class TestAllocationFileLoaderService {
out.println("</allocations>");
out.close();
- AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+ AllocationFileLoaderService allocLoader =
+ new AllocationFileLoaderService(scheduler);
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
@@ -700,7 +729,6 @@ public class TestAllocationFileLoaderService {
@Test
public void testParentWithReservation() throws Exception {
- Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
@@ -715,7 +743,8 @@ public class TestAllocationFileLoaderService {
out.println("</allocations>");
out.close();
- AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+ AllocationFileLoaderService allocLoader =
+ new AllocationFileLoaderService(scheduler);
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
@@ -731,7 +760,6 @@ public class TestAllocationFileLoaderService {
@Test
public void testParentTagWithChild() throws Exception {
- Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
@@ -744,7 +772,8 @@ public class TestAllocationFileLoaderService {
out.println("</allocations>");
out.close();
- AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+ AllocationFileLoaderService allocLoader =
+ new AllocationFileLoaderService(scheduler);
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
@@ -763,7 +792,6 @@ public class TestAllocationFileLoaderService {
*/
@Test (expected = AllocationConfigurationException.class)
public void testQueueNameContainingNBWhitespace() throws Exception {
- Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new OutputStreamWriter(
@@ -775,7 +803,8 @@ public class TestAllocationFileLoaderService {
out.println("</allocations>");
out.close();
- AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+ AllocationFileLoaderService allocLoader =
+ new AllocationFileLoaderService(scheduler);
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
@@ -787,17 +816,18 @@ public class TestAllocationFileLoaderService {
*/
@Test (expected = AllocationConfigurationException.class)
public void testDefaultQueueSchedulingModeIsFIFO() throws Exception {
- Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
- out.println("<defaultQueueSchedulingPolicy>fifo</defaultQueueSchedulingPolicy>");
+ out.println("<defaultQueueSchedulingPolicy>fifo" +
+ "</defaultQueueSchedulingPolicy>");
out.println("</allocations>");
out.close();
- AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+ AllocationFileLoaderService allocLoader =
+ new AllocationFileLoaderService(scheduler);
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
@@ -806,7 +836,6 @@ public class TestAllocationFileLoaderService {
@Test
public void testReservableQueue() throws Exception {
- Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
@@ -823,7 +852,8 @@ public class TestAllocationFileLoaderService {
out.println("</allocations>");
out.close();
- AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+ AllocationFileLoaderService allocLoader =
+ new AllocationFileLoaderService(scheduler);
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
@@ -845,11 +875,9 @@ public class TestAllocationFileLoaderService {
assertTrue(allocConf.getMoveOnExpiry(reservableQueueName));
assertEquals(ReservationSchedulerConfiguration.DEFAULT_RESERVATION_WINDOW,
allocConf.getReservationWindow(reservableQueueName));
- assertEquals(100, allocConf.getInstantaneousMaxCapacity
- (reservableQueueName),
- 0.0001);
- assertEquals(
- "DummyAgentName",
+ assertEquals(100,
+ allocConf.getInstantaneousMaxCapacity(reservableQueueName), 0.0001);
+ assertEquals("DummyAgentName",
allocConf.getReservationAgent(reservableQueueName));
assertEquals(100, allocConf.getAverageCapacity(reservableQueueName), 0.001);
assertFalse(allocConf.getShowReservationAsQueues(reservableQueueName));
@@ -865,12 +893,11 @@ public class TestAllocationFileLoaderService {
/**
* Verify that you can't have dynamic user queue and reservable queue on
- * the same queue
+ * the same queue.
*/
@Test (expected = AllocationConfigurationException.class)
public void testReservableCannotBeCombinedWithDynamicUserQueue()
throws Exception {
- Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
@@ -883,7 +910,8 @@ public class TestAllocationFileLoaderService {
out.println("</allocations>");
out.close();
- AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+ AllocationFileLoaderService allocLoader =
+ new AllocationFileLoaderService(scheduler);
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
@@ -891,7 +919,7 @@ public class TestAllocationFileLoaderService {
}
private class ReloadListener implements AllocationFileLoaderService.Listener {
- public AllocationConfiguration allocConf;
+ private AllocationConfiguration allocConf;
@Override
public void onReload(AllocationConfiguration info) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAppRunnability.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAppRunnability.java
index f581935..03338f4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAppRunnability.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAppRunnability.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@@ -47,7 +48,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-/*
+/**
* This class is to test the fair scheduler functionality of
* deciding the number of runnable application under various conditions.
*/
@@ -78,7 +79,7 @@ public class TestAppRunnability extends FairSchedulerTestBase {
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
scheduler.reinitialize(conf, resourceManager.getRMContext());
ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
- createApplicationWithAMResource(appAttemptId, "default", "user1", null);
+ createApplicationWithAMResource(appAttemptId, "root.user1", "user1", null);
assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true)
.getNumRunnableApps());
assertEquals(0, scheduler.getQueueManager().getLeafQueue("default", true)
@@ -110,9 +111,11 @@ public class TestAppRunnability extends FairSchedulerTestBase {
@Test
public void testAppAdditionAndRemoval() throws Exception {
ApplicationAttemptId attemptId = createAppAttemptId(1, 1);
+ ApplicationPlacementContext apc =
+ new ApplicationPlacementContext("user1");
AppAddedSchedulerEvent appAddedEvent =
- new AppAddedSchedulerEvent(attemptId.getApplicationId(), "default",
- "user1");
+ new AppAddedSchedulerEvent(attemptId.getApplicationId(), "user1",
+ "user1", apc);
scheduler.handle(appAddedEvent);
AppAttemptAddedSchedulerEvent attemptAddedEvent =
new AppAttemptAddedSchedulerEvent(createAppAttemptId(1, 1), false);
@@ -149,7 +152,7 @@ public class TestAppRunnability extends FairSchedulerTestBase {
// User1 submits one application
ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
- createApplicationWithAMResource(appAttemptId, "default", "user1", null);
+ createApplicationWithAMResource(appAttemptId, "user1", "user1", null);
// The user1 queue should inherit the configurations from the root queue
FSLeafQueue userQueue =
@@ -184,12 +187,14 @@ public class TestAppRunnability extends FairSchedulerTestBase {
FSLeafQueue jerryQueue = queueManager.getLeafQueue("jerry", false);
FSLeafQueue defaultQueue = queueManager.getLeafQueue("default", false);
+ // NOTE: placement is not inside the scheduler anymore need to fake it here.
+ // The scheduling request contains the fake placing
// Should get put into jerry
createSchedulingRequest(1024, "jerry", "someuser");
assertEquals(1, jerryQueue.getNumRunnableApps());
// Should get forced into default
- createSchedulingRequest(1024, "newqueue", "someuser");
+ createSchedulingRequest(1024, "default", "someuser");
assertEquals(1, jerryQueue.getNumRunnableApps());
assertEquals(1, defaultQueue.getNumRunnableApps());
@@ -200,7 +205,7 @@ public class TestAppRunnability extends FairSchedulerTestBase {
assertEquals(2, defaultQueue.getNumRunnableApps());
// Should get put into jerry because of user-as-default-queue
- createSchedulingRequest(1024, "default", "jerry");
+ createSchedulingRequest(1024, "jerry", "jerry");
assertEquals(2, jerryQueue.getNumRunnableApps());
assertEquals(2, defaultQueue.getNumRunnableApps());
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java
index b6ffaa5..81500db 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterNodeTracker;
@@ -113,14 +114,18 @@ public class TestContinuousScheduling extends FairSchedulerTestBase {
1, Resources.createResource(4096, 4), 1, host);
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
- NodeUpdateSchedulerEvent nodeUpdateEvent = new NodeUpdateSchedulerEvent(node1);
+ NodeUpdateSchedulerEvent nodeUpdateEvent =
+ new NodeUpdateSchedulerEvent(node1);
scheduler.handle(nodeUpdateEvent);
ApplicationAttemptId appAttemptId =
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
createMockRMApp(appAttemptId);
- scheduler.addApplication(appAttemptId.getApplicationId(), "queue11", "user11", false);
+ ApplicationPlacementContext placementCtx =
+ new ApplicationPlacementContext("queue11");
+ scheduler.addApplication(appAttemptId.getApplicationId(), "queue11",
+ "user11", false, placementCtx);
scheduler.addApplicationAttempt(appAttemptId, false, false);
List<ResourceRequest> ask = new ArrayList<>();
ask.add(createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true));
@@ -148,7 +153,8 @@ public class TestContinuousScheduling extends FairSchedulerTestBase {
scheduler.handle(nodeEvent2);
// available resource
- Assert.assertEquals(scheduler.getClusterResource().getMemorySize(), 16 * 1024);
+ Assert.assertEquals(scheduler.getClusterResource().getMemorySize(),
+ 16 * 1024);
Assert.assertEquals(scheduler.getClusterResource().getVirtualCores(), 16);
// send application request
@@ -156,14 +162,17 @@ public class TestContinuousScheduling extends FairSchedulerTestBase {
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
createMockRMApp(appAttemptId);
- scheduler.addApplication(appAttemptId.getApplicationId(),
- "queue11", "user11", false);
+ ApplicationPlacementContext placementCtx =
+ new ApplicationPlacementContext("queue11");
+ scheduler.addApplication(appAttemptId.getApplicationId(), "queue11",
+ "user11", false, placementCtx);
scheduler.addApplicationAttempt(appAttemptId, false, false);
List<ResourceRequest> ask = new ArrayList<>();
ResourceRequest request =
createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);
ask.add(request);
- scheduler.allocate(appAttemptId, ask, null, new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS);
+ scheduler.allocate(appAttemptId, ask, null, new ArrayList<>(), null, null,
+ NULL_UPDATE_REQUESTS);
triggerSchedulingAttempt();
FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
@@ -174,10 +183,11 @@ public class TestContinuousScheduling extends FairSchedulerTestBase {
createResourceRequest(1024, 1, ResourceRequest.ANY, 2, 1, true);
ask.clear();
ask.add(request);
- scheduler.allocate(appAttemptId, ask, null, new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS);
+ scheduler.allocate(appAttemptId, ask, null, new ArrayList<>(), null, null,
+ NULL_UPDATE_REQUESTS);
triggerSchedulingAttempt();
- checkAppConsumption(app, Resources.createResource(2048,2));
+ checkAppConsumption(app, Resources.createResource(2048, 2));
// 2 containers should be assigned to 2 nodes
Set<NodeId> nodes = new HashSet<NodeId>();
@@ -353,8 +363,10 @@ public class TestContinuousScheduling extends FairSchedulerTestBase {
id11 = createAppAttemptId(1, 1);
createMockRMApp(id11);
priority = Priority.newInstance(priorityValue);
+ ApplicationPlacementContext placementCtx =
+ new ApplicationPlacementContext("root.queue1");
scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1",
- false);
+ false, placementCtx);
scheduler.addApplicationAttempt(id11, false, false);
fsAppAttempt = scheduler.getApplicationAttempt(id11);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java
index 51ffd23..3719f74 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java
@@ -35,6 +35,7 @@ import static org.mockito.Mockito.spy;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@@ -298,8 +299,10 @@ public class TestFSAppAttempt extends FairSchedulerTestBase {
assertEquals(0, clusterUsage.getVirtualCores());
ApplicationAttemptId id11 = createAppAttemptId(1, 1);
createMockRMApp(id11);
+ ApplicationPlacementContext placementCtx =
+ new ApplicationPlacementContext("default");
scheduler.addApplication(id11.getApplicationId(),
- "default", "user1", false);
+ "default", "user1", false, placementCtx);
scheduler.addApplicationAttempt(id11, false, false);
assertNotNull(scheduler.getSchedulerApplications().get(id11.
getApplicationId()));
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSParentQueue.java
index c6ff5aa..32da3a7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSParentQueue.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.junit.Before;
@@ -29,23 +31,27 @@ import static org.mockito.Mockito.when;
public class TestFSParentQueue {
- private FairSchedulerConfiguration conf;
private QueueManager queueManager;
@Before
- public void setUp() throws Exception {
- conf = new FairSchedulerConfiguration();
+ public void setUp() {
+ FairSchedulerConfiguration conf = new FairSchedulerConfiguration();
+ RMContext rmContext = mock(RMContext.class);
+ SystemClock clock = SystemClock.getInstance();
+ PlacementManager placementManager = new PlacementManager();
FairScheduler scheduler = mock(FairScheduler.class);
- AllocationConfiguration allocConf = new AllocationConfiguration(conf);
- when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
+ when(scheduler.getRMContext()).thenReturn(rmContext);
+ when(scheduler.getConfig()).thenReturn(conf);
when(scheduler.getConf()).thenReturn(conf);
when(scheduler.getResourceCalculator()).thenReturn(
new DefaultResourceCalculator());
- SystemClock clock = SystemClock.getInstance();
when(scheduler.getClock()).thenReturn(clock);
+ when(rmContext.getQueuePlacementManager()).thenReturn(placementManager);
+ AllocationConfiguration allocConf = new AllocationConfiguration(scheduler);
+ when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
queueManager = new QueueManager(scheduler);
FSQueueMetrics.forQueue("root", null, true, conf);
- queueManager.initialize(conf);
+ queueManager.initialize();
}
@Test
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index 1a60693..e76e7e3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -58,6 +58,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.DefaultPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -85,7 +88,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerEx
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.Default;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -108,10 +110,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@@ -426,8 +425,11 @@ public class TestFairScheduler extends FairSchedulerTestBase {
private void allocateAppAttempt(String queueName, int id, int memorySize) {
ApplicationAttemptId id11 = createAppAttemptId(id, id);
createMockRMApp(id11);
+ ApplicationPlacementContext placementCtx =
+ new ApplicationPlacementContext(queueName);
+
scheduler.addApplication(id11.getApplicationId(), queueName, "user1",
- false);
+ false, placementCtx);
scheduler.addApplicationAttempt(id11, false, false);
List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>();
ResourceRequest request1 =
@@ -1404,8 +1406,10 @@ public class TestFairScheduler extends FairSchedulerTestBase {
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
createMockRMApp(attemptId);
+ ApplicationPlacementContext placementCtx =
+ new ApplicationPlacementContext("queue1");
scheduler.addApplication(attemptId.getApplicationId(), "queue1", "user1",
- false);
+ false, placementCtx);
scheduler.addApplicationAttempt(attemptId, false, false);
List<ResourceRequest> asks = new ArrayList<ResourceRequest>();
asks.add(createResourceRequest(2048, node2.getRackName(), 1, 1, false));
@@ -1814,10 +1818,12 @@ public class TestFairScheduler extends FairSchedulerTestBase {
// only default queue
assertEquals(1, scheduler.getQueueManager().getLeafQueues().size());
- // submit app with empty queue
+ // Submit app with empty queue
+ // Submit fails before we reach the placement check.
ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
AppAddedSchedulerEvent appAddedEvent =
- new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "", "user1");
+ new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "",
+ "user1");
scheduler.handle(appAddedEvent);
// submission rejected
@@ -1835,20 +1841,24 @@ public class TestFairScheduler extends FairSchedulerTestBase {
// only default queue
assertEquals(1, scheduler.getQueueManager().getLeafQueues().size());
- // submit app with queue name (.A)
+ // Submit app with queue name (.A)
+ // Submit fails before we reach the placement check.
ApplicationAttemptId appAttemptId1 = createAppAttemptId(1, 1);
AppAddedSchedulerEvent appAddedEvent1 =
- new AppAddedSchedulerEvent(appAttemptId1.getApplicationId(), ".A", "user1");
+ new AppAddedSchedulerEvent(appAttemptId1.getApplicationId(), ".A",
+ "user1");
scheduler.handle(appAddedEvent1);
// submission rejected
assertEquals(1, scheduler.getQueueManager().getLeafQueues().size());
assertNull(scheduler.getSchedulerApp(appAttemptId1));
assertEquals(0, resourceManager.getRMContext().getRMApps().size());
- // submit app with queue name (A.)
+ // Submit app with queue name (A.)
+ // Submit fails before we reach the placement check.
ApplicationAttemptId appAttemptId2 = createAppAttemptId(2, 1);
AppAddedSchedulerEvent appAddedEvent2 =
- new AppAddedSchedulerEvent(appAttemptId2.getApplicationId(), "A.", "user1");
+ new AppAddedSchedulerEvent(appAttemptId2.getApplicationId(), "A.",
+ "user1");
scheduler.handle(appAddedEvent2);
// submission rejected
assertEquals(1, scheduler.getQueueManager().getLeafQueues().size());
@@ -1856,9 +1866,11 @@ public class TestFairScheduler extends FairSchedulerTestBase {
assertEquals(0, resourceManager.getRMContext().getRMApps().size());
// submit app with queue name (A.B)
+ // Submit does not fail we must have a placement context.
ApplicationAttemptId appAttemptId3 = createAppAttemptId(3, 1);
AppAddedSchedulerEvent appAddedEvent3 =
- new AppAddedSchedulerEvent(appAttemptId3.getApplicationId(), "A.B", "user1");
+ new AppAddedSchedulerEvent(appAttemptId3.getApplicationId(), "A.B",
+ "user1", new ApplicationPlacementContext("A.B"));
scheduler.handle(appAddedEvent3);
// submission accepted
assertEquals(2, scheduler.getQueueManager().getLeafQueues().size());
@@ -1867,123 +1879,6 @@ public class TestFairScheduler extends FairSchedulerTestBase {
}
@Test
- public void testAssignToQueue() throws Exception {
- conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
- scheduler.init(conf);
- scheduler.start();
- scheduler.reinitialize(conf, resourceManager.getRMContext());
-
- RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
- RMApp rmApp2 = new MockRMApp(1, 1, RMAppState.NEW);
-
- FSLeafQueue queue1 = scheduler.assignToQueue(rmApp1, "default", "asterix");
- FSLeafQueue queue2 = scheduler.assignToQueue(rmApp2, "notdefault", "obelix");
-
- // assert FSLeafQueue's name is the correct name is the one set in the RMApp
- assertEquals(rmApp1.getQueue(), queue1.getName());
- assertEquals("root.asterix", rmApp1.getQueue());
- assertEquals(rmApp2.getQueue(), queue2.getName());
- assertEquals("root.notdefault", rmApp2.getQueue());
- }
-
- @Test
- public void testAssignToBadDefaultQueue() throws Exception {
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-
- PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
- out.println("<?xml version=\"1.0\"?>");
- out.println("<allocations>");
- out.println("<queuePlacementPolicy>");
- out.println("<rule name=\"specified\" create=\"false\" />");
- out.println("<rule name=\"default\" create=\"false\" />");
- out.println("</queuePlacementPolicy>");
- out.println("</allocations>");
- out.close();
- scheduler.init(conf);
- scheduler.start();
- scheduler.reinitialize(conf, resourceManager.getRMContext());
-
- RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
-
- try {
- FSLeafQueue queue1 = scheduler.assignToQueue(rmApp1, "default",
- "asterix");
- } catch (IllegalStateException ise) {
- fail("Bad queue placement policy terminal rule should not throw " +
- "exception ");
- }
- }
-
- @Test
- public void testAssignToNonLeafQueueReturnsNull() throws Exception {
- conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
- scheduler.init(conf);
- scheduler.start();
- scheduler.reinitialize(conf, resourceManager.getRMContext());
-
- scheduler.getQueueManager().getLeafQueue("root.child1.granchild", true);
- scheduler.getQueueManager().getLeafQueue("root.child2", true);
-
- RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
- RMApp rmApp2 = new MockRMApp(1, 1, RMAppState.NEW);
-
- // Trying to assign to non leaf queue would return null
- assertNull(scheduler.assignToQueue(rmApp1, "root.child1", "tintin"));
- assertNotNull(scheduler.assignToQueue(rmApp2, "root.child2", "snowy"));
- }
-
- @Test
- public void testQueuePlacementWithPolicy() throws Exception {
- conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
- SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
- scheduler.init(conf);
- scheduler.start();
- scheduler.reinitialize(conf, resourceManager.getRMContext());
-
- ApplicationAttemptId appId;
-
- List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
- rules.add(new QueuePlacementRule.Specified().initialize(true, null));
- rules.add(new QueuePlacementRule.User().initialize(false, null));
- rules.add(new QueuePlacementRule.PrimaryGroup().initialize(false, null));
- rules.add(new QueuePlacementRule.SecondaryGroupExistingQueue().initialize(false, null));
- rules.add(new QueuePlacementRule.Default().initialize(true, null));
- Set<String> queues = Sets.newHashSet("root.user1", "root.user3group",
- "root.user4subgroup1", "root.user4subgroup2" , "root.user5subgroup2");
- Map<FSQueueType, Set<String>> configuredQueues = new HashMap<FSQueueType, Set<String>>();
- configuredQueues.put(FSQueueType.LEAF, queues);
- configuredQueues.put(FSQueueType.PARENT, new HashSet<String>());
- scheduler.getAllocationConfiguration().placementPolicy =
- new QueuePlacementPolicy(rules, configuredQueues, conf);
- appId = createSchedulingRequest(1024, "somequeue", "user1");
- assertEquals("root.somequeue", scheduler.getSchedulerApp(appId).getQueueName());
- appId = createSchedulingRequest(1024, "default", "user1");
- assertEquals("root.user1", scheduler.getSchedulerApp(appId).getQueueName());
- appId = createSchedulingRequest(1024, "default", "user3");
- assertEquals("root.user3group", scheduler.getSchedulerApp(appId).getQueueName());
- appId = createSchedulingRequest(1024, "default", "user4");
- assertEquals("root.user4subgroup1", scheduler.getSchedulerApp(appId).getQueueName());
- appId = createSchedulingRequest(1024, "default", "user5");
- assertEquals("root.user5subgroup2", scheduler.getSchedulerApp(appId).getQueueName());
- appId = createSchedulingRequest(1024, "default", "otheruser");
- assertEquals("root.default", scheduler.getSchedulerApp(appId).getQueueName());
-
- // test without specified as first rule
- rules = new ArrayList<QueuePlacementRule>();
- rules.add(new QueuePlacementRule.User().initialize(false, null));
- rules.add(new QueuePlacementRule.Specified().initialize(true, null));
- rules.add(new QueuePlacementRule.Default().initialize(true, null));
- scheduler.getAllocationConfiguration().placementPolicy =
- new QueuePlacementPolicy(rules, configuredQueues, conf);
- appId = createSchedulingRequest(1024, "somequeue", "user1");
- assertEquals("root.user1", scheduler.getSchedulerApp(appId).getQueueName());
- appId = createSchedulingRequest(1024, "somequeue", "otheruser");
- assertEquals("root.somequeue", scheduler.getSchedulerApp(appId).getQueueName());
- appId = createSchedulingRequest(1024, "default", "otheruser");
- assertEquals("root.default", scheduler.getSchedulerApp(appId).getQueueName());
- }
-
- @Test
public void testFairShareWithMinAlloc() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
@@ -2027,38 +1922,6 @@ public class TestFairScheduler extends FairSchedulerTestBase {
}
}
}
-
- @Test
- public void testNestedUserQueue() throws IOException {
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
- conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
- SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
- PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
- out.println("<?xml version=\"1.0\"?>");
- out.println("<allocations>");
- out.println("<queue name=\"user1group\" type=\"parent\">");
- out.println("<minResources>1024mb,0vcores</minResources>");
- out.println("</queue>");
- out.println("<queuePlacementPolicy>");
- out.println("<rule name=\"specified\" create=\"false\" />");
- out.println("<rule name=\"nestedUserQueue\">");
- out.println(" <rule name=\"primaryGroup\" create=\"false\" />");
- out.println("</rule>");
- out.println("<rule name=\"default\" />");
- out.println("</queuePlacementPolicy>");
- out.println("</allocations>");
- out.close();
-
- scheduler.init(conf);
- scheduler.start();
- scheduler.reinitialize(conf, resourceManager.getRMContext());
- RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
-
- FSLeafQueue user1Leaf = scheduler.assignToQueue(rmApp1, "root.default",
- "user1");
-
- assertEquals("root.user1group.user1", user1Leaf.getName());
- }
@Test
public void testFairShareAndWeightsInNestedUserQueueRule() throws Exception {
@@ -2228,7 +2091,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
// Submit one application
ApplicationAttemptId appAttemptId1 = createAppAttemptId(1, 1);
- createApplicationWithAMResource(appAttemptId1, "default", "user1", null);
+ createApplicationWithAMResource(appAttemptId1, "user1", "user1", null);
assertEquals(3072, scheduler.getQueueManager()
.getLeafQueue("default", false).getSteadyFairShare().getMemorySize());
assertEquals(3072, scheduler.getQueueManager()
@@ -2249,8 +2112,10 @@ public class TestFairScheduler extends FairSchedulerTestBase {
// First ask, queue1 requests 1 large (minReqSize * 2).
ApplicationAttemptId id11 = createAppAttemptId(1, 1);
createMockRMApp(id11);
+ ApplicationPlacementContext placementCtx =
+ new ApplicationPlacementContext("root.queue1");
scheduler.addApplication(id11.getApplicationId(),
- "root.queue1", "user1", false);
+ "root.queue1", "user1", false, placementCtx);
scheduler.addApplicationAttempt(id11, false, false);
List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>();
ResourceRequest request1 = createResourceRequest(minReqSize * 2,
@@ -2262,8 +2127,9 @@ public class TestFairScheduler extends FairSchedulerTestBase {
// Second ask, queue2 requests 1 large.
ApplicationAttemptId id21 = createAppAttemptId(2, 1);
createMockRMApp(id21);
+ placementCtx = new ApplicationPlacementContext("root.queue2");
scheduler.addApplication(id21.getApplicationId(),
- "root.queue2", "user1", false);
+ "root.queue2", "user1", false, placementCtx);
scheduler.addApplicationAttempt(id21, false, false);
List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>();
ResourceRequest request2 = createResourceRequest(2 * minReqSize,
@@ -2279,7 +2145,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ApplicationAttemptId id22 = createAppAttemptId(2, 2);
createMockRMApp(id22);
scheduler.addApplication(id22.getApplicationId(),
- "root.queue2", "user1", false);
+ "root.queue2", "user1", false, placementCtx);
scheduler.addApplicationAttempt(id22, false, false);
List<ResourceRequest> ask3 = new ArrayList<ResourceRequest>();
ResourceRequest request4 = createResourceRequest(minReqSize,
@@ -2886,8 +2752,10 @@ public class TestFairScheduler extends FairSchedulerTestBase {
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
createMockRMApp(attemptId);
+ ApplicationPlacementContext placementCtx =
+ new ApplicationPlacementContext("queue1");
scheduler.addApplication(attemptId.getApplicationId(), "queue1", "user1",
- false);
+ false, placementCtx);
scheduler.addApplicationAttempt(attemptId, false, false);
// 1 request with 2 nodes on the same rack. another request with 1 node on
@@ -2900,7 +2768,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
asks.add(createResourceRequest(1024, node3.getRackName(), 1, 1, true));
asks.add(createResourceRequest(1024, ResourceRequest.ANY, 1, 2, true));
- scheduler.allocate(attemptId, asks, null, new ArrayList<ContainerId>(), null,
+ scheduler.allocate(attemptId, asks, null, new ArrayList<>(), null,
null, NULL_UPDATE_REQUESTS);
// node 1 checks in
@@ -3202,10 +3070,11 @@ public class TestFairScheduler extends FairSchedulerTestBase {
submissionContext.setApplicationId(applicationId);
submissionContext.setAMContainerSpec(clc);
RMApp application =
- new RMAppImpl(applicationId, resourceManager.getRMContext(), conf, name, user,
- queue, submissionContext, scheduler, masterService,
+ new RMAppImpl(applicationId, resourceManager.getRMContext(), conf,
+ name, user, queue, submissionContext, scheduler, masterService,
System.currentTimeMillis(), "YARN", null, null);
- resourceManager.getRMContext().getRMApps().putIfAbsent(applicationId, application);
+ resourceManager.getRMContext().getRMApps().
+ putIfAbsent(applicationId, application);
application.handle(new RMAppEvent(applicationId, RMAppEventType.START));
final int MAX_TRIES=20;
@@ -3222,16 +3091,22 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ApplicationAttemptId attId =
ApplicationAttemptId.newInstance(applicationId, this.ATTEMPT_ID++);
- scheduler.addApplication(attId.getApplicationId(), queue, user, false);
+ ApplicationPlacementContext placementCtx =
+ new ApplicationPlacementContext(queue);
+ scheduler.addApplication(attId.getApplicationId(), queue, user, false,
+ placementCtx);
numTries = 0;
while (application.getFinishTime() == 0 && numTries < MAX_TRIES) {
try {
Thread.sleep(100);
- } catch (InterruptedException ex) {ex.printStackTrace();}
+ } catch (InterruptedException ex) {
+ ex.printStackTrace();
+ }
numTries++;
}
- assertEquals(FinalApplicationStatus.FAILED, application.getFinalApplicationStatus());
+ assertEquals(FinalApplicationStatus.FAILED,
+ application.getFinalApplicationStatus());
}
@Test
@@ -3845,7 +3720,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
assertEquals("Queue queue1's fair share should be 0", 0, queue1
.getFairShare().getMemorySize());
- createSchedulingRequest(1 * 1024, "root.default", "user1");
+ createSchedulingRequest(1 * 1024, "default", "user1");
scheduler.update();
scheduler.handle(updateEvent);
@@ -4559,8 +4434,10 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ApplicationAttemptId id11 = createAppAttemptId(1, 1);
createMockRMApp(id11);
+ ApplicationPlacementContext placementCtx =
+ new ApplicationPlacementContext("root.queue1");
scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1",
- false);
+ false, placementCtx);
scheduler.addApplicationAttempt(id11, false, false);
List<ResourceRequest> ask1 = new ArrayList<>();
@@ -4573,7 +4450,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
String hostName = "127.0.0.1";
RMNode node1 = MockNodes.newNodeInfo(1,
- Resources.createResource(8 * 1024, 8), 1, hostName);
+ Resources.createResource(8 * 1024, 8), 1, hostName);
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
@@ -4611,12 +4488,12 @@ public class TestFairScheduler extends FairSchedulerTestBase {
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
- List<QueuePlacementRule> rules =
- scheduler.allocConf.placementPolicy.getRules();
+ List<PlacementRule> rules = scheduler.getRMContext()
+ .getQueuePlacementManager().getPlacementRules();
- for (QueuePlacementRule rule : rules) {
- if (rule instanceof Default) {
- Default defaultRule = (Default) rule;
+ for (PlacementRule rule : rules) {
+ if (rule instanceof DefaultPlacementRule) {
+ DefaultPlacementRule defaultRule = (DefaultPlacementRule) rule;
assertNotNull(defaultRule.defaultQueueName);
}
}
@@ -4686,7 +4563,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ApplicationAttemptId appAttId2 =
createSchedulingRequest(1024, 1, "queue1.subqueue2", "user1");
ApplicationAttemptId appAttId3 =
- createSchedulingRequest(1024, 1, "default", "user1");
+ createSchedulingRequest(1024, 1, "user1", "user1");
List<ApplicationAttemptId> apps =
scheduler.getAppsInQueue("queue1.subqueue1");
@@ -4888,11 +4765,13 @@ public class TestFairScheduler extends FairSchedulerTestBase {
scheduler.reinitialize(conf, resourceManager.getRMContext());
ApplicationAttemptId attemptId = createAppAttemptId(1, 1);
- // The placement rule will add the app to the user based queue but the
+ // The placement rule should add it to the user based queue but the
// passed in queue must exist.
+ ApplicationPlacementContext apc =
+ new ApplicationPlacementContext(testUser);
AppAddedSchedulerEvent appAddedEvent =
new AppAddedSchedulerEvent(attemptId.getApplicationId(), testUser,
- testUser);
+ testUser, apc);
scheduler.handle(appAddedEvent);
AppAttemptAddedSchedulerEvent attemptAddedEvent =
new AppAttemptAddedSchedulerEvent(createAppAttemptId(1, 1), false);
@@ -4936,9 +4815,11 @@ public class TestFairScheduler extends FairSchedulerTestBase {
scheduler.reinitialize(conf, resourceManager.getRMContext());
ApplicationAttemptId attemptId = createAppAttemptId(1, 1);
+ ApplicationPlacementContext apc =
+ new ApplicationPlacementContext(testUser);
AppAddedSchedulerEvent appAddedEvent =
new AppAddedSchedulerEvent(attemptId.getApplicationId(), testUser,
- testUser);
+ testUser, apc);
scheduler.handle(appAddedEvent);
AppAttemptAddedSchedulerEvent attemptAddedEvent =
new AppAttemptAddedSchedulerEvent(createAppAttemptId(1, 1), false);
@@ -4990,8 +4871,10 @@ public class TestFairScheduler extends FairSchedulerTestBase {
// submit app with queue name "A"
ApplicationAttemptId appAttemptId1 = createAppAttemptId(1, 1);
+ ApplicationPlacementContext apc =
+ new ApplicationPlacementContext("A");
AppAddedSchedulerEvent appAddedEvent1 = new AppAddedSchedulerEvent(
- appAttemptId1.getApplicationId(), "A", "user1");
+ appAttemptId1.getApplicationId(), "A", "user1", apc);
scheduler.handle(appAddedEvent1);
// submission accepted
assertEquals(2, scheduler.getQueueManager().getLeafQueues().size());
@@ -5008,9 +4891,15 @@ public class TestFairScheduler extends FairSchedulerTestBase {
// submit app with queue name "A "
ApplicationAttemptId appAttemptId2 = createAppAttemptId(2, 1);
+ apc = new ApplicationPlacementContext("A ");
AppAddedSchedulerEvent appAddedEvent2 = new AppAddedSchedulerEvent(
- appAttemptId2.getApplicationId(), "A ", "user1");
- scheduler.handle(appAddedEvent2);
+ appAttemptId2.getApplicationId(), "A ", "user1", apc);
+ try {
+ scheduler.handle(appAddedEvent2);
+ Assert.fail("Submit should have failed with InvalidQueueNameException");
+ } catch (InvalidQueueNameException iqne) {
+ // expected ignore: rules should have filtered this out
+ }
// submission rejected
assertEquals(2, scheduler.getQueueManager().getLeafQueues().size());
assertNull(scheduler.getSchedulerApplications().get(appAttemptId2.
@@ -5019,8 +4908,9 @@ public class TestFairScheduler extends FairSchedulerTestBase {
// submit app with queue name "B.C"
ApplicationAttemptId appAttemptId3 = createAppAttemptId(3, 1);
+ apc = new ApplicationPlacementContext("B.C");
AppAddedSchedulerEvent appAddedEvent3 = new AppAddedSchedulerEvent(
- appAttemptId3.getApplicationId(), "B.C", "user1");
+ appAttemptId3.getApplicationId(), "B.C", "user1", apc);
scheduler.handle(appAddedEvent3);
// submission accepted
assertEquals(3, scheduler.getQueueManager().getLeafQueues().size());
@@ -5037,9 +4927,14 @@ public class TestFairScheduler extends FairSchedulerTestBase {
// submit app with queue name "A\u00a0" (non-breaking space)
ApplicationAttemptId appAttemptId4 = createAppAttemptId(4, 1);
+ apc = new ApplicationPlacementContext("A\u00a0");
AppAddedSchedulerEvent appAddedEvent4 = new AppAddedSchedulerEvent(
- appAttemptId4.getApplicationId(), "A\u00a0", "user1");
- scheduler.handle(appAddedEvent4);
+ appAttemptId4.getApplicationId(), "A\u00a0", "user1", apc);
+ try {
+ scheduler.handle(appAddedEvent4);
+ } catch (InvalidQueueNameException iqne) {
+ // expected ignore: rules should have filtered this out
+ }
// submission rejected
assertEquals(3, scheduler.getQueueManager().getLeafQueues().size());
assertNull(scheduler.getSchedulerApplications().get(appAttemptId4.
@@ -5075,17 +4970,17 @@ public class TestFairScheduler extends FairSchedulerTestBase {
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
- ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
- createApplicationWithAMResource(appAttemptId, "default", " user1", null);
+ ApplicationAttemptId attId1 = createAppAttemptId(1, 1);
+ createApplicationWithAMResource(attId1, "root.user1", " user1", null);
assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true)
.getNumRunnableApps());
assertEquals(0, scheduler.getQueueManager().getLeafQueue("default", true)
.getNumRunnableApps());
assertEquals("root.user1", resourceManager.getRMContext().getRMApps()
- .get(appAttemptId.getApplicationId()).getQueue());
+ .get(attId1.getApplicationId()).getQueue());
ApplicationAttemptId attId2 = createAppAttemptId(2, 1);
- createApplicationWithAMResource(attId2, "default", "user1 ", null);
+ createApplicationWithAMResource(attId2, "root.user1", "user1 ", null);
assertEquals(2, scheduler.getQueueManager().getLeafQueue("user1", true)
.getNumRunnableApps());
assertEquals(0, scheduler.getQueueManager().getLeafQueue("default", true)
@@ -5094,7 +4989,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
.get(attId2.getApplicationId()).getQueue());
ApplicationAttemptId attId3 = createAppAttemptId(3, 1);
- createApplicationWithAMResource(attId3, "default", "user1", null);
+ createApplicationWithAMResource(attId3, "root.user1", "user1", null);
assertEquals(3, scheduler.getQueueManager().getLeafQueue("user1", true)
.getNumRunnableApps());
assertEquals(0, scheduler.getQueueManager().getLeafQueue("default", true)
@@ -5526,8 +5421,10 @@ public class TestFairScheduler extends FairSchedulerTestBase {
// Create application attempt
ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
createMockRMApp(appAttemptId);
+ ApplicationPlacementContext placementCtx =
+ new ApplicationPlacementContext("root.queue1");
scheduler.addApplication(appAttemptId.getApplicationId(), "root.queue1",
- "user1", false);
+ "user1", false, placementCtx);
scheduler.addApplicationAttempt(appAttemptId, false, false);
// Create container request that goes to a specific node.
@@ -5612,7 +5509,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ResourceRequest amReqs = ResourceRequest.newBuilder()
.capability(Resource.newInstance(5 * GB, 3)).build();
- createApplicationWithAMResource(appAttemptId1, "queueA", "user1",
+ createApplicationWithAMResource(appAttemptId1, "root.queueA", "user1",
Resource.newInstance(GB, 1), Lists.newArrayList(amReqs));
scheduler.update();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerWithMultiResourceTypes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerWithMultiResourceTypes.java
index f9fcf53..2785baa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerWithMultiResourceTypes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerWithMultiResourceTypes.java
@@ -21,6 +21,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.After;
import org.junit.Before;
@@ -33,6 +35,8 @@ import java.util.Map;
import static org.apache.hadoop.yarn.util.resource.ResourceUtils.MAXIMUM_ALLOCATION;
import static org.apache.hadoop.yarn.util.resource.ResourceUtils.UNITS;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class TestFairSchedulerWithMultiResourceTypes
extends FairSchedulerTestBase {
@@ -44,6 +48,11 @@ public class TestFairSchedulerWithMultiResourceTypes
scheduler = new FairScheduler();
conf = createConfiguration();
initResourceTypes(conf);
+ // since this runs outside of the normal context we need to set one
+ RMContext rmContext = mock(RMContext.class);
+ PlacementManager placementManager = new PlacementManager();
+ when(rmContext.getQueuePlacementManager()).thenReturn(placementManager);
+ scheduler.setRMContext(rmContext);
}
@After
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java
index 964ecf5..c692349 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java
@@ -27,10 +27,10 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.junit.Before;
@@ -46,26 +46,27 @@ public class TestMaxRunningAppsEnforcer {
private FairScheduler scheduler;
@Before
- public void setup() throws Exception {
- Configuration conf = new Configuration();
+ public void setup() {
+ FairSchedulerConfiguration conf = new FairSchedulerConfiguration();
+ PlacementManager placementManager = new PlacementManager();
+ rmContext = mock(RMContext.class);
+ when(rmContext.getQueuePlacementManager()).thenReturn(placementManager);
+ when(rmContext.getEpoch()).thenReturn(0L);
clock = new ControlledClock();
scheduler = mock(FairScheduler.class);
- when(scheduler.getConf()).thenReturn(
- new FairSchedulerConfiguration(conf));
+ when(scheduler.getConf()).thenReturn(conf);
+ when(scheduler.getConfig()).thenReturn(conf);
when(scheduler.getClock()).thenReturn(clock);
- AllocationConfiguration allocConf = new AllocationConfiguration(
- conf);
- when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
when(scheduler.getResourceCalculator()).thenReturn(
new DefaultResourceCalculator());
-
+ when(scheduler.getRMContext()).thenReturn(rmContext);
+ AllocationConfiguration allocConf = new AllocationConfiguration(scheduler);
+ when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
queueManager = new QueueManager(scheduler);
- queueManager.initialize(conf);
+ queueManager.initialize();
userMaxApps = allocConf.userMaxApps;
maxAppsEnforcer = new MaxRunningAppsEnforcer(scheduler);
appNum = 0;
- rmContext = mock(RMContext.class);
- when(rmContext.getEpoch()).thenReturn(0L);
}
private FSAppAttempt addApp(FSLeafQueue queue, String user) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java
index 9d8c960..d888d23 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java
@@ -26,6 +26,7 @@ import java.util.Set;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -37,35 +38,43 @@ import org.mockito.Mockito;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
+/**
+ * Test the {@link FairScheduler} queue manager correct queue hierarchies
+ * management (create, delete and type changes).
+ */
public class TestQueueManager {
- private FairSchedulerConfiguration conf;
private QueueManager queueManager;
private FairScheduler scheduler;
-
+
@Before
- public void setUp() throws Exception {
- conf = new FairSchedulerConfiguration();
+ public void setUp() {
+ PlacementManager placementManager = new PlacementManager();
+ FairSchedulerConfiguration conf = new FairSchedulerConfiguration();
+ RMContext rmContext = mock(RMContext.class);
+ when(rmContext.getQueuePlacementManager()).thenReturn(placementManager);
+ SystemClock clock = SystemClock.getInstance();
+
scheduler = mock(FairScheduler.class);
+ when(scheduler.getConf()).thenReturn(conf);
+ when(scheduler.getConfig()).thenReturn(conf);
+ when(scheduler.getRMContext()).thenReturn(rmContext);
+ when(scheduler.getResourceCalculator()).thenReturn(
+ new DefaultResourceCalculator());
+ when(scheduler.getClock()).thenReturn(clock);
- AllocationConfiguration allocConf = new AllocationConfiguration(conf);
+ AllocationConfiguration allocConf =
+ new AllocationConfiguration(scheduler);
+ when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
// Set up some queues to test default child max resource inheritance
allocConf.configuredQueues.get(FSQueueType.PARENT).add("root.test");
allocConf.configuredQueues.get(FSQueueType.LEAF).add("root.test.childA");
allocConf.configuredQueues.get(FSQueueType.PARENT).add("root.test.childB");
- when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
- when(scheduler.getConf()).thenReturn(conf);
- when(scheduler.getResourceCalculator()).thenReturn(
- new DefaultResourceCalculator());
-
- SystemClock clock = SystemClock.getInstance();
-
- when(scheduler.getClock()).thenReturn(clock);
queueManager = new QueueManager(scheduler);
FSQueueMetrics.forQueue("root", null, true, conf);
- queueManager.initialize(conf);
+ queueManager.initialize();
queueManager.updateAllocationConfiguration(allocConf);
}
@@ -118,7 +127,8 @@ public class TestQueueManager {
*/
@Test
public void testReloadTurnsLeafToParentWithNoLeaf() {
- AllocationConfiguration allocConf = new AllocationConfiguration(conf);
+ AllocationConfiguration allocConf =
+ new AllocationConfiguration(scheduler);
// Create a leaf queue1
allocConf.configuredQueues.get(FSQueueType.LEAF).add("root.queue1");
queueManager.updateAllocationConfiguration(allocConf);
@@ -130,7 +140,7 @@ public class TestQueueManager {
FSLeafQueue q1 = queueManager.getLeafQueue("queue1", false);
ApplicationId appId = ApplicationId.newInstance(0, 0);
q1.addAssignedApp(appId);
- allocConf = new AllocationConfiguration(conf);
+ allocConf = new AllocationConfiguration(scheduler);
allocConf.configuredQueues.get(FSQueueType.PARENT)
.add("root.queue1");
@@ -169,7 +179,8 @@ public class TestQueueManager {
private void updateConfiguredLeafQueues(QueueManager queueMgr,
String... confLeafQueues) {
- AllocationConfiguration allocConf = new AllocationConfiguration(conf);
+ AllocationConfiguration allocConf =
+ new AllocationConfiguration(scheduler);
allocConf.configuredQueues.get(FSQueueType.LEAF)
.addAll(Sets.newHashSet(confLeafQueues));
queueMgr.updateAllocationConfiguration(allocConf);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java
index 3fe9ce3..29c9d70 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java
@@ -18,44 +18,82 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.nio.charset.StandardCharsets;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.security.GroupMappingServiceProvider;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.DefaultPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.FSPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.UserPlacementRule;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
+/**
+ * Tests for the queue placement policy for the {@link FairScheduler}.
+ */
public class TestQueuePlacementPolicy {
- private final static Configuration conf = new Configuration();
- private Map<FSQueueType, Set<String>> configuredQueues;
-
+ private final static FairSchedulerConfiguration CONF =
+ new FairSchedulerConfiguration();
+ // Base setup needed, policy is an intermediate object
+ private PlacementManager placementManager;
+ private FairScheduler scheduler;
+ private QueueManager queueManager;
+
+ // Locals used in each assignment
+ private ApplicationSubmissionContext asc;
+ private ApplicationPlacementContext context;
+
@BeforeClass
public static void setup() {
- conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+ CONF.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
}
-
+
@Before
public void initTest() {
- configuredQueues = new HashMap<FSQueueType, Set<String>>();
- for (FSQueueType type : FSQueueType.values()) {
- configuredQueues.put(type, new HashSet<String>());
- }
+ SystemClock clock = SystemClock.getInstance();
+ RMContext rmContext = mock(RMContext.class);
+ placementManager = new PlacementManager();
+ scheduler = mock(FairScheduler.class);
+ when(scheduler.getClock()).thenReturn(clock);
+ when(scheduler.getRMContext()).thenReturn(rmContext);
+ when(scheduler.getConfig()).thenReturn(CONF);
+ when(scheduler.getConf()).thenReturn(CONF);
+ when(rmContext.getQueuePlacementManager()).thenReturn(placementManager);
+ AllocationConfiguration allocConf = new AllocationConfiguration(scheduler);
+ when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
+ queueManager = new QueueManager(scheduler);
+ queueManager.initialize();
+ when(scheduler.getQueueManager()).thenReturn(queueManager);
+ }
+
+ @After
+ public void cleanTest() {
+ placementManager = null;
+ queueManager = null;
+ scheduler = null;
}
@Test
@@ -65,15 +103,18 @@ public class TestQueuePlacementPolicy {
sb.append(" <rule name='specified' />");
sb.append(" <rule name='user' />");
sb.append("</queuePlacementPolicy>");
- QueuePlacementPolicy policy = parse(sb.toString());
- assertEquals("root.specifiedq",
- policy.assignAppToQueue("specifiedq", "someuser"));
- assertEquals("root.someuser",
- policy.assignAppToQueue("default", "someuser"));
- assertEquals("root.otheruser",
- policy.assignAppToQueue("default", "otheruser"));
+ createPolicy(sb.toString());
+
+ asc = newAppSubmissionContext("specifiedq");
+ context = placementManager.placeApplication(asc, "someuser");
+ assertEquals("root.specifiedq", context.getQueue());
+ asc = newAppSubmissionContext("default");
+ context = placementManager.placeApplication(asc, "someuser");
+ assertEquals("root.someuser", context.getQueue());
+ context = placementManager.placeApplication(asc, "otheruser");
+ assertEquals("root.otheruser", context.getQueue());
}
-
+
@Test
public void testNoCreate() throws Exception {
StringBuffer sb = new StringBuffer();
@@ -82,15 +123,24 @@ public class TestQueuePlacementPolicy {
sb.append(" <rule name='user' create=\"false\" />");
sb.append(" <rule name='default' />");
sb.append("</queuePlacementPolicy>");
-
- configuredQueues.get(FSQueueType.LEAF).add("root.someuser");
- QueuePlacementPolicy policy = parse(sb.toString());
- assertEquals("root.specifiedq", policy.assignAppToQueue("specifiedq", "someuser"));
- assertEquals("root.someuser", policy.assignAppToQueue("default", "someuser"));
- assertEquals("root.specifiedq", policy.assignAppToQueue("specifiedq", "otheruser"));
- assertEquals("root.default", policy.assignAppToQueue("default", "otheruser"));
+ createPolicy(sb.toString());
+
+ createQueue(FSQueueType.LEAF, "root.someuser");
+
+ asc = newAppSubmissionContext("specifiedq");
+ context = placementManager.placeApplication(asc, "someuser");
+ assertEquals("root.specifiedq", context.getQueue());
+ asc = newAppSubmissionContext("default");
+ context = placementManager.placeApplication(asc, "someuser");
+ assertEquals("root.someuser", context.getQueue());
+ asc = newAppSubmissionContext("specifiedq");
+ context = placementManager.placeApplication(asc, "otheruser");
+ assertEquals("root.specifiedq", context.getQueue());
+ asc = newAppSubmissionContext("default");
+ context = placementManager.placeApplication(asc, "otheruser");
+ assertEquals("root.default", context.getQueue());
}
-
+
@Test
public void testSpecifiedThenReject() throws Exception {
StringBuffer sb = new StringBuffer();
@@ -98,94 +148,173 @@ public class TestQueuePlacementPolicy {
sb.append(" <rule name='specified' />");
sb.append(" <rule name='reject' />");
sb.append("</queuePlacementPolicy>");
- QueuePlacementPolicy policy = parse(sb.toString());
- assertEquals("root.specifiedq",
- policy.assignAppToQueue("specifiedq", "someuser"));
- assertEquals(null, policy.assignAppToQueue("default", "someuser"));
+ createPolicy(sb.toString());
+ asc = newAppSubmissionContext("specifiedq");
+ context = placementManager.placeApplication(asc, "someuser");
+ assertEquals("root.specifiedq", context.getQueue());
+ asc = newAppSubmissionContext("default");
+ context = placementManager.placeApplication(asc, "someuser");
+ assertNull("Assignment should have been rejected and was not", context);
}
-
- @Test (expected = AllocationConfigurationException.class)
- public void testOmittedTerminalRule() throws Exception {
+
+ @Test
+ public void testOmittedTerminalRule() {
StringBuffer sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <rule name='specified' />");
sb.append(" <rule name='user' create=\"false\" />");
sb.append("</queuePlacementPolicy>");
- parse(sb.toString());
+ assertIfExceptionThrown(sb);
}
-
- @Test (expected = AllocationConfigurationException.class)
- public void testTerminalRuleInMiddle() throws Exception {
+
+ @Test
+ public void testTerminalRuleInMiddle() {
StringBuffer sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <rule name='specified' />");
sb.append(" <rule name='default' />");
sb.append(" <rule name='user' />");
sb.append("</queuePlacementPolicy>");
- parse(sb.toString());
+ assertIfExceptionThrown(sb);
}
-
+
@Test
- public void testTerminals() throws Exception {
- // Should make it through without an exception
+ public void testTerminals() {
+ // The default rule is no longer considered terminal when the create flag
+ // is false. The throw now happens when configuring not when assigning the
+ // application
StringBuffer sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <rule name='secondaryGroupExistingQueue' create='true'/>");
sb.append(" <rule name='default' queue='otherdefault' create='false'/>");
sb.append("</queuePlacementPolicy>");
- QueuePlacementPolicy policy = parse(sb.toString());
- try {
- policy.assignAppToQueue("root.otherdefault", "user1");
- fail("Expect exception from having default rule with create=\'false\'");
- } catch (IllegalStateException se) {
- }
+ assertIfExceptionThrown(sb);
}
-
+
@Test
public void testDefaultRuleWithQueueAttribute() throws Exception {
// This test covers the use case where we would like default rule
// to point to a different queue by default rather than root.default
- configuredQueues.get(FSQueueType.LEAF).add("root.someDefaultQueue");
+ createQueue(FSQueueType.LEAF, "root.someDefaultQueue");
StringBuffer sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <rule name='specified' create='false' />");
sb.append(" <rule name='default' queue='root.someDefaultQueue'/>");
sb.append("</queuePlacementPolicy>");
- QueuePlacementPolicy policy = parse(sb.toString());
- assertEquals("root.someDefaultQueue",
- policy.assignAppToQueue("root.default", "user1"));
+ createPolicy(sb.toString());
+ asc = newAppSubmissionContext("default");
+ context = placementManager.placeApplication(asc, "user1");
+ assertEquals("root.someDefaultQueue", context.getQueue());
}
-
+
@Test
public void testNestedUserQueueParsingErrors() {
// No nested rule specified in hierarchical user queue
StringBuffer sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
- sb.append(" <rule name='specified' />");
sb.append(" <rule name='nestedUserQueue'/>");
- sb.append(" <rule name='default' />");
sb.append("</queuePlacementPolicy>");
assertIfExceptionThrown(sb);
- // Specified nested rule is not a QueuePlacementRule
+ // Specified nested rule is not a FSPlacementRule
sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
- sb.append(" <rule name='specified' />");
sb.append(" <rule name='nestedUserQueue'>");
- sb.append(" <rule name='unknownRule'/>");
+ sb.append(" <rule name='unknownRule'/>");
sb.append(" </rule>");
- sb.append(" <rule name='default' />");
sb.append("</queuePlacementPolicy>");
assertIfExceptionThrown(sb);
+
+ // Parent rule is rule that cannot be one: reject or nestedUserQueue
+ sb = new StringBuffer();
+ sb.append("<queuePlacementPolicy>");
+ sb.append(" <rule name='nestedUserQueue'>");
+ sb.append(" <rule name='reject'/>");
+ sb.append(" </rule>");
+ sb.append("</queuePlacementPolicy>");
+
+ assertIfExceptionThrown(sb);
+
+ // If the parent rule does not have the create flag the nested rule is not
+ // terminal
+ sb = new StringBuffer();
+ sb.append("<queuePlacementPolicy>");
+ sb.append(" <rule name='nestedUserQueue'>");
+ sb.append(" <rule name='primaryGroup' create='false'/>");
+ sb.append(" </rule>");
+ sb.append("</queuePlacementPolicy>");
+
+ assertIfExceptionThrown(sb);
+ }
+
+ @Test
+ public void testMultipleParentRules() throws Exception {
+ StringBuffer sb = new StringBuffer();
+ sb.append("<queuePlacementPolicy>");
+ sb.append(" <rule name='nestedUserQueue'>");
+ sb.append(" <rule name='primaryGroup'/>");
+ sb.append(" <rule name='default'/>");
+ sb.append(" </rule>");
+ sb.append("</queuePlacementPolicy>");
+
+ createPolicy(sb.toString());
+ PlacementRule nested = placementManager.getPlacementRules().get(0);
+ if (nested instanceof UserPlacementRule) {
+ PlacementRule parent = ((FSPlacementRule)nested).getParentRule();
+ assertTrue("Nested rule should have been Default rule",
+ parent instanceof DefaultPlacementRule);
+ } else {
+ fail("Policy parsing failed: rule with multiple parents not set");
+ }
+ }
+
+ @Test
+ public void testBrokenRules() throws Exception {
+ // broken rule should fail configuring
+ StringBuffer sb = new StringBuffer();
+ sb.append("<queuePlacementPolicy>");
+ sb.append(" <rule />");
+ sb.append("</queuePlacementPolicy>");
+
+ assertIfExceptionThrown(sb);
+
+ // policy without rules ignoring policy
+ sb = new StringBuffer();
+ sb.append("<queuePlacementPolicy>");
+ sb.append(" <notarule />");
+ sb.append("</queuePlacementPolicy>");
+
+ createPolicy(sb.toString());
+
+ // broken rule should fail configuring
+ sb = new StringBuffer();
+ sb.append("<queuePlacementPolicy>");
+ sb.append(" <rule name='user'>");
+ sb.append(" <rule />");
+ sb.append(" </rule>");
+ sb.append("</queuePlacementPolicy>");
+
+ assertIfExceptionThrown(sb);
+
+ // parent rule not set to something known: no parent rule is required
+ // required case is only for nestedUserQueue tested earlier
+ sb = new StringBuffer();
+ sb.append("<queuePlacementPolicy>");
+ sb.append(" <rule name='user'>");
+ sb.append(" <notarule />");
+ sb.append(" </rule>");
+ sb.append("</queuePlacementPolicy>");
+
+ createPolicy(sb.toString());
}
private void assertIfExceptionThrown(StringBuffer sb) {
Throwable th = null;
try {
- parse(sb.toString());
+ createPolicy(sb.toString());
} catch (Exception e) {
th = e;
}
@@ -193,6 +322,17 @@ public class TestQueuePlacementPolicy {
assertTrue(th instanceof AllocationConfigurationException);
}
+ private void assertIfExceptionThrown(String user) {
+ Throwable th = null;
+ try {
+ placementManager.placeApplication(asc, user);
+ } catch (Exception e) {
+ th = e;
+ }
+
+ assertTrue(th instanceof YarnException);
+ }
+
@Test
public void testNestedUserQueueParsing() throws Exception {
StringBuffer sb = new StringBuffer();
@@ -201,17 +341,9 @@ public class TestQueuePlacementPolicy {
sb.append(" <rule name='nestedUserQueue'>");
sb.append(" <rule name='primaryGroup'/>");
sb.append(" </rule>");
- sb.append(" <rule name='default' />");
sb.append("</queuePlacementPolicy>");
- Throwable th = null;
- try {
- parse(sb.toString());
- } catch (Exception e) {
- th = e;
- }
-
- assertNull(th);
+ createPolicy(sb.toString());
}
@Test
@@ -222,24 +354,26 @@ public class TestQueuePlacementPolicy {
sb.append(" <rule name='nestedUserQueue'>");
sb.append(" <rule name='primaryGroup'/>");
sb.append(" </rule>");
- sb.append(" <rule name='default' />");
sb.append("</queuePlacementPolicy>");
// User queue would be created under primary group queue
- QueuePlacementPolicy policy = parse(sb.toString());
- assertEquals("root.user1group.user1",
- policy.assignAppToQueue("root.default", "user1"));
+ createPolicy(sb.toString());
+ asc = newAppSubmissionContext("default");
+ context = placementManager.placeApplication(asc, "user1");
+ assertEquals("root.user1group.user1", context.getQueue());
// Other rules above and below hierarchical user queue rule should work as
// usual
- configuredQueues.get(FSQueueType.LEAF).add("root.specifiedq");
+ createQueue(FSQueueType.LEAF, "root.specifiedq");
// test if specified rule(above nestedUserQueue rule) works ok
- assertEquals("root.specifiedq",
- policy.assignAppToQueue("root.specifiedq", "user2"));
-
- // test if default rule(below nestedUserQueue rule) works
- configuredQueues.get(FSQueueType.LEAF).add("root.user3group");
- assertEquals("root.default",
- policy.assignAppToQueue("root.default", "user3"));
+ asc = newAppSubmissionContext("root.specifiedq");
+ context = placementManager.placeApplication(asc, "user2");
+ assertEquals("root.specifiedq", context.getQueue());
+
+ // Submit should fail if we cannot create the queue
+ createQueue(FSQueueType.LEAF, "root.user3group");
+ asc = newAppSubmissionContext("default");
+ context = placementManager.placeApplication(asc, "user3");
+ assertNull("Submission should have failed and did not", context);
}
@Test
@@ -253,18 +387,18 @@ public class TestQueuePlacementPolicy {
sb.append(" <rule name='default' />");
sb.append("</queuePlacementPolicy>");
- QueuePlacementPolicy policy = parse(sb.toString());
+ createPolicy(sb.toString());
// Should return root.default since primary group 'root.user1group' is not
// configured
- assertEquals("root.default",
- policy.assignAppToQueue("root.default", "user1"));
+ asc = newAppSubmissionContext("default");
+ context = placementManager.placeApplication(asc, "user1");
+ assertEquals("root.default", context.getQueue());
// Let's configure primary group and check if user queue is created
- configuredQueues.get(FSQueueType.PARENT).add("root.user1group");
- policy = parse(sb.toString());
- assertEquals("root.user1group.user1",
- policy.assignAppToQueue("root.default", "user1"));
+ createQueue(FSQueueType.PARENT, "root.user1group");
+ context = placementManager.placeApplication(asc, "user1");
+ assertEquals("root.user1group.user1", context.getQueue());
// Both Primary group and nestedUserQueue rule has create='false'
sb = new StringBuffer();
@@ -277,16 +411,16 @@ public class TestQueuePlacementPolicy {
// Should return root.default since primary group and user queue for user 2
// are not configured.
- assertEquals("root.default",
- policy.assignAppToQueue("root.default", "user2"));
+ asc = newAppSubmissionContext("default");
+ context = placementManager.placeApplication(asc, "user2");
+ assertEquals("root.default", context.getQueue());
// Now configure both primary group and the user queue for user2
- configuredQueues.get(FSQueueType.PARENT).add("root.user2group");
- configuredQueues.get(FSQueueType.LEAF).add("root.user2group.user2");
- policy = parse(sb.toString());
+ createQueue(FSQueueType.LEAF, "root.user2group.user2");
- assertEquals("root.user2group.user2",
- policy.assignAppToQueue("root.default", "user2"));
+ // Try placing the same app again
+ context = placementManager.placeApplication(asc, "user2");
+ assertEquals("root.user2group.user2", context.getQueue());
}
@Test
@@ -299,17 +433,18 @@ public class TestQueuePlacementPolicy {
sb.append(" <rule name='default' />");
sb.append("</queuePlacementPolicy>");
- QueuePlacementPolicy policy = parse(sb.toString());
+ createPolicy(sb.toString());
// Should return root.default since secondary groups are not configured
- assertEquals("root.default",
- policy.assignAppToQueue("root.default", "user1"));
+ asc = newAppSubmissionContext("default");
+ context = placementManager.placeApplication(asc, "user1");
+ assertEquals("root.default", context.getQueue());
// configure secondary group for user1
- configuredQueues.get(FSQueueType.PARENT).add("root.user1subgroup1");
- policy = parse(sb.toString());
+ createQueue(FSQueueType.PARENT, "root.user1subgroup1");
+ createPolicy(sb.toString());
// user queue created should be created under secondary group
- assertEquals("root.user1subgroup1.user1",
- policy.assignAppToQueue("root.default", "user1"));
+ context = placementManager.placeApplication(asc, "user1");
+ assertEquals("root.user1subgroup1.user1", context.getQueue());
}
@Test
@@ -325,33 +460,66 @@ public class TestQueuePlacementPolicy {
sb.append("</queuePlacementPolicy>");
// Let's create couple of parent queues
- configuredQueues.get(FSQueueType.PARENT).add("root.parent1");
- configuredQueues.get(FSQueueType.PARENT).add("root.parent2");
-
- QueuePlacementPolicy policy = parse(sb.toString());
- assertEquals("root.parent1.user1",
- policy.assignAppToQueue("root.parent1", "user1"));
- assertEquals("root.parent2.user2",
- policy.assignAppToQueue("root.parent2", "user2"));
+ createQueue(FSQueueType.PARENT, "root.parent1");
+ createQueue(FSQueueType.PARENT, "root.parent2");
+
+ createPolicy(sb.toString());
+ asc = newAppSubmissionContext("root.parent1");
+ context = placementManager.placeApplication(asc, "user1");
+ assertEquals("root.parent1.user1", context.getQueue());
+ asc = newAppSubmissionContext("root.parent2");
+ context = placementManager.placeApplication(asc, "user2");
+ assertEquals("root.parent2.user2", context.getQueue());
}
-
+
@Test
public void testNestedUserQueueDefaultRule() throws Exception {
// This test covers the use case where we would like user queues to be
// created under a default parent queue
- configuredQueues.get(FSQueueType.PARENT).add("root.parentq");
StringBuffer sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <rule name='specified' create='false' />");
sb.append(" <rule name='nestedUserQueue'>");
- sb.append(" <rule name='default' queue='root.parentq'/>");
+ sb.append(" <rule name='default' queue='root.parent'/>");
+ sb.append(" </rule>");
+ sb.append("</queuePlacementPolicy>");
+
+ createPolicy(sb.toString());
+ asc = newAppSubmissionContext("default");
+ context = placementManager.placeApplication(asc, "user1");
+ assertEquals("root.parent.user1", context.getQueue());
+
+ // Same as above but now with the create flag false for the parent
+ createQueue(FSQueueType.PARENT, "root.parent");
+ sb = new StringBuffer();
+ sb.append("<queuePlacementPolicy>");
+ sb.append(" <rule name='specified' create='false' />");
+ sb.append(" <rule name='nestedUserQueue'>");
+ sb.append(" <rule name='default' queue='root.parent' create='false'/>");
sb.append(" </rule>");
sb.append(" <rule name='default' />");
sb.append("</queuePlacementPolicy>");
- QueuePlacementPolicy policy = parse(sb.toString());
- assertEquals("root.parentq.user1",
- policy.assignAppToQueue("root.default", "user1"));
+ createPolicy(sb.toString());
+ asc = newAppSubmissionContext("default");
+ context = placementManager.placeApplication(asc, "user1");
+ assertEquals("root.parent.user1", context.getQueue());
+
+ // Parent queue returned is already a configured LEAF, should fail and the
+ // context is null.
+ createQueue(FSQueueType.LEAF, "root.parent");
+ sb = new StringBuffer();
+ sb.append("<queuePlacementPolicy>");
+ sb.append(" <rule name='specified' create='false' />");
+ sb.append(" <rule name='nestedUserQueue'>");
+ sb.append(" <rule name='default' queue='root.parent' />");
+ sb.append(" </rule>");
+ sb.append("</queuePlacementPolicy>");
+
+ createPolicy(sb.toString());
+ asc = newAppSubmissionContext("default");
+ context = placementManager.placeApplication(asc, "user1");
+ assertNull("Submission should have failed and did not", context);
}
@Test
@@ -361,9 +529,10 @@ public class TestQueuePlacementPolicy {
sb.append("<queuePlacementPolicy>");
sb.append(" <rule name='user' />");
sb.append("</queuePlacementPolicy>");
- QueuePlacementPolicy policy = parse(sb.toString());
- assertEquals("root.first_dot_last",
- policy.assignAppToQueue("default", "first.last"));
+ createPolicy(sb.toString());
+ asc = newAppSubmissionContext("default");
+ context = placementManager.placeApplication(asc, "first.last");
+ assertEquals("root.first_dot_last", context.getQueue());
sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
@@ -371,11 +540,14 @@ public class TestQueuePlacementPolicy {
sb.append(" <rule name='nestedUserQueue'>");
sb.append(" <rule name='default'/>");
sb.append(" </rule>");
- sb.append(" <rule name='default' />");
sb.append("</queuePlacementPolicy>");
- policy = parse(sb.toString());
- assertEquals("root.default.first_dot_last",
- policy.assignAppToQueue("root.default", "first.last"));
+ // specified create is false, bypass the rule
+ // default rule has create which requires a PARENT queue: remove the LEAF
+ queueManager.removeLeafQueue("root.default");
+ createPolicy(sb.toString());
+ asc = newAppSubmissionContext("default");
+ context = placementManager.placeApplication(asc, "first_dot_last");
+ assertEquals("root.default.first_dot_last", context.getQueue());
}
@Test
@@ -386,22 +558,22 @@ public class TestQueuePlacementPolicy {
sb.append(" <rule name='nestedUserQueue'>");
sb.append(" <rule name='primaryGroup'/>");
sb.append(" </rule>");
- sb.append(" <rule name='default' />");
sb.append("</queuePlacementPolicy>");
- conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+ CONF.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
PeriodGroupsMapping.class, GroupMappingServiceProvider.class);
// User queue would be created under primary group queue, and the period
// in the group name should be converted into _dot_
- QueuePlacementPolicy policy = parse(sb.toString());
- assertEquals("root.user1_dot_group.user1",
- policy.assignAppToQueue("root.default", "user1"));
+ createPolicy(sb.toString());
+ asc = newAppSubmissionContext("default");
+ context = placementManager.placeApplication(asc, "user1");
+ assertEquals("root.user1_dot_group.user1", context.getQueue());
- conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+ CONF.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
}
- @Test(expected=IOException.class)
+ @Test
public void testEmptyGroupsPrimaryGroupRule() throws Exception {
StringBuffer sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
@@ -410,20 +582,67 @@ public class TestQueuePlacementPolicy {
sb.append("</queuePlacementPolicy>");
// Add a static mapping that returns empty groups for users
- conf.setStrings(CommonConfigurationKeys
+ CONF.setStrings(CommonConfigurationKeys
.HADOOP_USER_GROUP_STATIC_OVERRIDES, "emptygroupuser=");
- QueuePlacementPolicy policy = parse(sb.toString());
- policy.assignAppToQueue(null, "emptygroupuser");
+ createPolicy(sb.toString());
+ asc = newAppSubmissionContext("root.fake");
+ assertIfExceptionThrown("emptygroupuser");
+ }
+
+ @Test
+ public void testSpecifiedQueueWithSpaces() throws Exception {
+ StringBuffer sb = new StringBuffer();
+ sb.append("<queuePlacementPolicy>");
+ sb.append(" <rule name='specified'/>");
+ sb.append(" <rule name='default'/>");
+ sb.append("</queuePlacementPolicy>");
+
+ createPolicy(sb.toString());
+ asc = newAppSubmissionContext("A ");
+ assertIfExceptionThrown("user1");
+
+ asc = newAppSubmissionContext("A\u00a0");
+ assertIfExceptionThrown("user1");
}
- private QueuePlacementPolicy parse(String str) throws Exception {
+ private void createPolicy(String str)
+ throws AllocationConfigurationException {
// Read and parse the allocations file.
- DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory
- .newInstance();
- docBuilderFactory.setIgnoringComments(true);
- DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
- Document doc = builder.parse(IOUtils.toInputStream(str));
- Element root = doc.getDocumentElement();
- return QueuePlacementPolicy.fromXml(root, configuredQueues, conf);
+ Element root = null;
+ try {
+ DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory
+ .newInstance();
+ docBuilderFactory.setIgnoringComments(true);
+ DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
+ Document doc = builder.parse(IOUtils.toInputStream(str,
+ StandardCharsets.UTF_8));
+ root = doc.getDocumentElement();
+ } catch (Exception ex) {
+ // Don't really want to test the xml parsing side,
+ // let it fail with a null config below.
+ }
+ QueuePlacementPolicy.fromXml(root, scheduler);
+ }
+
+ private ApplicationSubmissionContext newAppSubmissionContext(String queue) {
+ ApplicationId appId = ApplicationId.newInstance(1L, 1);
+ Priority prio = Priority.UNDEFINED;
+ Resource resource = Resource.newInstance(1, 1);
+ ContainerLaunchContext amContainer =
+ ContainerLaunchContext.newInstance(null, null, null, null, null, null);
+ return ApplicationSubmissionContext.newInstance(appId, "test", queue,
+ prio, amContainer, false, false, 1, resource, "testing");
+ }
+
+ private void createQueue(FSQueueType type, String name) {
+ // Create a queue as if it is in the config.
+ FSQueue queue = queueManager.createQueue(name, type);
+ assertNotNull("Queue not created", queue);
+ // walk up the list till we have a non dynamic queue
+ // root is always non dynamic
+ do {
+ queue.setDynamic(false);
+ queue = queue.parent;
+ } while (queue.isDynamic());
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
index 9c6b0f4..e2be0d9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
@@ -30,6 +30,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
@@ -40,6 +42,8 @@ import org.junit.Test;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class TestSchedulingPolicy {
private static final Logger LOG =
@@ -53,6 +57,11 @@ public class TestSchedulingPolicy {
public void setUp() throws Exception {
scheduler = new FairScheduler();
conf = new FairSchedulerConfiguration();
+ // since this runs outside of the normal context we need to set one
+ RMContext rmContext = mock(RMContext.class);
+ PlacementManager placementManager = new PlacementManager();
+ when(rmContext.getQueuePlacementManager()).thenReturn(placementManager);
+ scheduler.setRMContext(rmContext);
}
public void testParseSchedulingPolicy()
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/TestFairSchedulerQueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/TestFairSchedulerQueueInfo.java
index 83b7e41..454609b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/TestFairSchedulerQueueInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/TestFairSchedulerQueueInfo.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
@@ -37,19 +39,25 @@ import static org.mockito.Mockito.when;
public class TestFairSchedulerQueueInfo {
@Test
- public void testEmptyChildQueues() throws Exception {
- FairSchedulerConfiguration conf = new FairSchedulerConfiguration();
+ public void testEmptyChildQueues() {
+ FairSchedulerConfiguration fsConf = new FairSchedulerConfiguration();
+ RMContext rmContext = mock(RMContext.class);
+ PlacementManager placementManager = new PlacementManager();
+ SystemClock clock = SystemClock.getInstance();
FairScheduler scheduler = mock(FairScheduler.class);
- AllocationConfiguration allocConf = new AllocationConfiguration(conf);
- when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
- when(scheduler.getConf()).thenReturn(conf);
- when(scheduler.getClusterResource()).thenReturn(Resource.newInstance(1, 1));
+ when(scheduler.getConf()).thenReturn(fsConf);
+ when(scheduler.getConfig()).thenReturn(fsConf);
+ when(scheduler.getRMContext()).thenReturn(rmContext);
+ when(rmContext.getQueuePlacementManager()).thenReturn(placementManager);
+ when(scheduler.getClusterResource()).thenReturn(
+ Resource.newInstance(1, 1));
when(scheduler.getResourceCalculator()).thenReturn(
new DefaultResourceCalculator());
- SystemClock clock = SystemClock.getInstance();
when(scheduler.getClock()).thenReturn(clock);
+ AllocationConfiguration allocConf = new AllocationConfiguration(scheduler);
+ when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
QueueManager queueManager = new QueueManager(scheduler);
- queueManager.initialize(conf);
+ queueManager.initialize();
FSQueue testQueue = queueManager.getLeafQueue("test", true);
FairSchedulerQueueInfo queueInfo =
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org