You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by pb...@apache.org on 2020/11/11 16:10:34 UTC
[hadoop] 01/02: Revert "YARN-10425. Replace the legacy placement
engine in CS with the new one. Contributed by Gergely Pollak."
This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit cd0490e8c6dd502c639de2d26980556b78eeeb06
Author: Peter Bacsko <pb...@cloudera.com>
AuthorDate: Wed Nov 11 17:06:12 2020 +0100
Revert "YARN-10425. Replace the legacy placement engine in CS with the new one. Contributed by Gergely Pollak."
This reverts commit b0ab222a6c0073c31c285e89e43508b3e0ca9576.
---
.../protocol/RefreshMountTableEntriesRequest.java | 34 ++
.../yarn/server/resourcemanager/RMAppManager.java | 4 +-
.../placement/AppNameMappingPlacementRule.java | 204 ++++++++
.../placement/CSMappingPlacementRule.java | 92 +---
.../MappingRuleValidationContextImpl.java | 16 +-
.../placement/PlacementManager.java | 10 +-
.../resourcemanager/placement/PlacementRule.java | 25 -
.../placement/UserGroupMappingPlacementRule.java | 558 +++++++++++++++++++++
.../scheduler/capacity/CapacityScheduler.java | 44 +-
.../capacity/CapacitySchedulerConfiguration.java | 18 -
.../server/resourcemanager/TestAppManager.java | 4 +-
.../TestAppManagerWithFairScheduler.java | 14 +-
.../placement/TestPlacementManager.java | 31 +-
.../TestUserGroupMappingPlacementRule.java | 4 +-
.../TestAbsoluteResourceWithAutoQueue.java | 1 -
.../TestCapacitySchedulerAutoCreatedQueueBase.java | 28 +-
.../TestCapacitySchedulerAutoQueueCreation.java | 15 +-
.../TestCapacitySchedulerQueueMappingFactory.java | 43 +-
.../scheduler/capacity/TestQueueMappings.java | 29 +-
19 files changed, 947 insertions(+), 227 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RefreshMountTableEntriesRequest.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RefreshMountTableEntriesRequest.java
index e69de29..899afe7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RefreshMountTableEntriesRequest.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RefreshMountTableEntriesRequest.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+
+/**
+ * API request for refreshing mount table cached entries from state store.
+ */
+public abstract class RefreshMountTableEntriesRequest {
+
+ public static RefreshMountTableEntriesRequest newInstance()
+ throws IOException {
+ return StateStoreSerializer
+ .newRecord(RefreshMountTableEntriesRequest.class);
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index 13c2ec7..fe18d82 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -864,9 +864,9 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
if (placementManager != null) {
try {
String usernameUsedForPlacement =
- getUserNameForPlacement(user, context, placementManager);
+ getUserNameForPlacement(user, context, placementManager);
placementContext = placementManager
- .placeApplication(context, usernameUsedForPlacement, isRecovery);
+ .placeApplication(context, usernameUsedForPlacement);
} catch (YarnException e) {
// Placement could also fail if the user doesn't exist in system
// skip if the user is not found during recovery.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/AppNameMappingPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/AppNameMappingPlacementRule.java
new file mode 100644
index 0000000..63d98ba
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/AppNameMappingPlacementRule.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.placement;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.placement.QueuePlacementRuleUtils.getPlacementContext;
+import static org.apache.hadoop.yarn.server.resourcemanager.placement.QueuePlacementRuleUtils.isStaticQueueMapping;
+import static org.apache.hadoop.yarn.server.resourcemanager.placement.QueuePlacementRuleUtils.validateAndGetAutoCreatedQueueMapping;
+import static org.apache.hadoop.yarn.server.resourcemanager.placement.QueuePlacementRuleUtils.validateAndGetQueueMapping;
+
+public class AppNameMappingPlacementRule extends PlacementRule {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(AppNameMappingPlacementRule.class);
+
+ public static final String CURRENT_APP_MAPPING = "%application";
+
+ private static final String QUEUE_MAPPING_NAME = "app-name";
+
+ private boolean overrideWithQueueMappings = false;
+ private List<QueueMapping> mappings = null;
+ protected CapacitySchedulerQueueManager queueManager;
+
+ public AppNameMappingPlacementRule() {
+ this(false, null);
+ }
+
+ public AppNameMappingPlacementRule(boolean overrideWithQueueMappings,
+ List<QueueMapping> newMappings) {
+ this.overrideWithQueueMappings = overrideWithQueueMappings;
+ this.mappings = newMappings;
+ }
+
+ @Override
+ public boolean initialize(ResourceScheduler scheduler)
+ throws IOException {
+ if (!(scheduler instanceof CapacityScheduler)) {
+ throw new IOException(
+ "AppNameMappingPlacementRule can be configured only for "
+ + "CapacityScheduler");
+ }
+ CapacitySchedulerContext schedulerContext =
+ (CapacitySchedulerContext) scheduler;
+ CapacitySchedulerConfiguration conf = schedulerContext.getConfiguration();
+ boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
+ LOG.info(
+ "Initialized App Name queue mappings, override: " + overrideWithQueueMappings);
+
+ List<QueueMapping> queueMappings =
+ conf.getQueueMappingEntity(QUEUE_MAPPING_NAME);
+
+ // Get new user mappings
+ List<QueueMapping> newMappings = new ArrayList<>();
+
+ queueManager = schedulerContext.getCapacitySchedulerQueueManager();
+
+ // check if mappings refer to valid queues
+ for (QueueMapping mapping : queueMappings) {
+ if (isStaticQueueMapping(mapping)) {
+ //at this point mapping.getQueueName() return only the queue name, since
+ //the config parsing have been changed making QueueMapping more
+ //consistent
+
+ CSQueue queue = queueManager.getQueue(mapping.getFullPath());
+ if (ifQueueDoesNotExist(queue)) {
+ //Try getting queue by its full path name, if it exists it is a static
+ //leaf queue indeed, without any auto creation magic
+
+ if (queueManager.isAmbiguous(mapping.getFullPath())) {
+ throw new IOException(
+ "mapping contains ambiguous leaf queue reference " + mapping
+ .getFullPath());
+ }
+
+ //if leaf queue does not exist,
+ // this could be a potential auto created leaf queue
+ //validate if parent queue is specified,
+ // then it should exist and
+ // be an instance of AutoCreateEnabledParentQueue
+ QueueMapping newMapping =
+ validateAndGetAutoCreatedQueueMapping(queueManager, mapping);
+ if (newMapping == null) {
+ throw new IOException(
+ "mapping contains invalid or non-leaf queue " + mapping
+ .getQueue());
+ }
+ newMappings.add(newMapping);
+ } else {
+ // if queue exists, validate
+ // if its an instance of leaf queue
+ // if its an instance of auto created leaf queue,
+ // then extract parent queue name and update queue mapping
+ QueueMapping newMapping = validateAndGetQueueMapping(
+ queueManager, queue, mapping);
+ newMappings.add(newMapping);
+ }
+ } else {
+ //If it is a dynamic queue mapping,
+ // we can safely assume leaf queue name does not have '.' in it
+ // validate
+ // if parent queue is specified, then
+ // parent queue exists and an instance of AutoCreateEnabledParentQueue
+ //
+ QueueMapping newMapping = validateAndGetAutoCreatedQueueMapping(
+ queueManager, mapping);
+ if (newMapping != null) {
+ newMappings.add(newMapping);
+ } else{
+ newMappings.add(mapping);
+ }
+ }
+ }
+
+ if (newMappings.size() > 0) {
+ this.mappings = newMappings;
+ this.overrideWithQueueMappings = overrideWithQueueMappings;
+ LOG.info("get valid queue mapping from app name config: " +
+ newMappings.toString() + ", override: " + overrideWithQueueMappings);
+ return true;
+ }
+ return false;
+ }
+
+ private static boolean ifQueueDoesNotExist(CSQueue queue) {
+ return queue == null;
+ }
+
+ private ApplicationPlacementContext getAppPlacementContext(String user,
+ String applicationName) throws IOException {
+ for (QueueMapping mapping : mappings) {
+ if (mapping.getSource().equals(CURRENT_APP_MAPPING)) {
+ if (mapping.getQueue().equals(CURRENT_APP_MAPPING)) {
+ return getPlacementContext(mapping, applicationName, queueManager);
+ } else {
+ return getPlacementContext(mapping, queueManager);
+ }
+ }
+ if (mapping.getSource().equals(applicationName)) {
+ return getPlacementContext(mapping, queueManager);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public ApplicationPlacementContext getPlacementForApp(
+ ApplicationSubmissionContext asc, String user) throws YarnException {
+ String queueName = asc.getQueue();
+ String applicationName = asc.getApplicationName();
+ if (mappings != null && mappings.size() > 0) {
+ try {
+ ApplicationPlacementContext mappedQueue = getAppPlacementContext(user,
+ applicationName);
+ if (mappedQueue != null) {
+ // We have a mapping, should we use it?
+ if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)
+ //queueName will be same as mapped queue name in case of recovery
+ || queueName.equals(mappedQueue.getQueue())
+ || overrideWithQueueMappings) {
+ LOG.info("Application {} mapping [{}] to [{}] override {}",
+ applicationName, queueName, mappedQueue.getQueue(),
+ overrideWithQueueMappings);
+ return mappedQueue;
+ }
+ }
+ } catch (IOException ioex) {
+ String message = "Failed to submit application " + applicationName +
+ " reason: " + ioex.getMessage();
+ throw new YarnException(message);
+ }
+ }
+ return null;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/CSMappingPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/CSMappingPlacementRule.java
index aff75ba..cad7f85 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/CSMappingPlacementRule.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/CSMappingPlacementRule.java
@@ -125,11 +125,7 @@ public class CSMappingPlacementRule extends PlacementRule {
overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
if (groups == null) {
- //We cannot use Groups#getUserToGroupsMappingService here, because when
- //tests change the HADOOP_SECURITY_GROUP_MAPPING, Groups won't refresh its
- //cached instance of groups, so we might get a Group instance which
- //ignores the HADOOP_SECURITY_GROUP_MAPPING settings.
- groups = new Groups(conf);
+ groups = Groups.getUserToGroupsMappingService(conf);
}
MappingRuleValidationContext validationContext = buildValidationContext();
@@ -149,8 +145,8 @@ public class CSMappingPlacementRule extends PlacementRule {
}
LOG.info("Initialized queue mappings, can override user specified " +
- "queues: {} number of rules: {} mapping rules: {}",
- overrideWithQueueMappings, mappingRules.size(), mappingRules);
+ "queues: {} number of rules: {}", overrideWithQueueMappings,
+ mappingRules.size());
if (LOG.isDebugEnabled()) {
LOG.debug("Initialized with the following mapping rules:");
@@ -174,12 +170,6 @@ public class CSMappingPlacementRule extends PlacementRule {
*/
private void setupGroupsForVariableContext(VariableContext vctx, String user)
throws IOException {
- if (groups == null) {
- LOG.warn(
- "Group provider hasn't been set, cannot query groups for user {}",
- user);
- return;
- }
Set<String> groupsSet = groups.getGroupsSet(user);
String secondaryGroup = null;
Iterator<String> it = groupsSet.iterator();
@@ -203,18 +193,14 @@ public class CSMappingPlacementRule extends PlacementRule {
}
private VariableContext createVariableContext(
- ApplicationSubmissionContext asc, String user) {
+ ApplicationSubmissionContext asc, String user) throws IOException {
VariableContext vctx = new VariableContext();
vctx.put("%user", user);
vctx.put("%specified", asc.getQueue());
vctx.put("%application", asc.getApplicationName());
vctx.put("%default", "root.default");
- try {
- setupGroupsForVariableContext(vctx, user);
- } catch (IOException e) {
- LOG.warn("Unable to setup groups: {}", e.getMessage());
- }
+ setupGroupsForVariableContext(vctx, user);
vctx.setImmutables(immutableVariables);
return vctx;
@@ -352,43 +338,34 @@ public class CSMappingPlacementRule extends PlacementRule {
@Override
public ApplicationPlacementContext getPlacementForApp(
ApplicationSubmissionContext asc, String user) throws YarnException {
- return getPlacementForApp(asc, user, false);
- }
-
- @Override
- public ApplicationPlacementContext getPlacementForApp(
- ApplicationSubmissionContext asc, String user, boolean recovery)
- throws YarnException {
//We only use the mapping rules if overrideWithQueueMappings enabled
//or the application is submitted to the default queue, which effectively
//means the application doesn't have any specific queue.
String appQueue = asc.getQueue();
- LOG.debug("Looking placement for app '{}' originally submitted to queue " +
- "'{}', with override enabled '{}'",
- asc.getApplicationName(), appQueue, overrideWithQueueMappings);
if (appQueue != null &&
!appQueue.equals(YarnConfiguration.DEFAULT_QUEUE_NAME) &&
!appQueue.equals(YarnConfiguration.DEFAULT_QUEUE_FULL_NAME) &&
- !overrideWithQueueMappings &&
- !recovery) {
+ !overrideWithQueueMappings) {
LOG.info("Have no jurisdiction over application submission '{}', " +
"moving to next PlacementRule engine", asc.getApplicationName());
return null;
}
VariableContext variables;
- variables = createVariableContext(asc, user);
+ try {
+ variables = createVariableContext(asc, user);
+ } catch (IOException e) {
+ LOG.error("Unable to setup variable context", e);
+ throw new YarnException(e);
+ }
- ApplicationPlacementContext ret = null;
for (MappingRule rule : mappingRules) {
MappingRuleResult result = evaluateRule(rule, variables);
switch (result.getResult()) {
case PLACE_TO_DEFAULT:
- ret = placeToDefault(asc, variables, rule);
- break;
+ return placeToDefault(asc, variables, rule);
case PLACE:
- ret = placeToQueue(asc, rule, result);
- break;
+ return placeToQueue(asc, rule, result);
case REJECT:
LOG.info("Rejecting application '{}', reason: Mapping rule '{}' " +
" fallback action is set to REJECT.",
@@ -400,42 +377,17 @@ public class CSMappingPlacementRule extends PlacementRule {
case SKIP:
//SKIP means skip to the next rule, which is the default behaviour of
//the for loop, so we don't need to take any extra actions
- break;
+ break;
default:
LOG.error("Invalid result '{}'", result);
}
-
- //If we already have a return value, we can return it!
- if (ret != null) {
- break;
- }
- }
-
- if (ret == null) {
- //If no rule was applied we return null, to let the engine move onto the
- //next placementRule class
- LOG.info("No matching rule found for application '{}', moving to next " +
- "PlacementRule engine", asc.getApplicationName());
- }
-
- if (recovery) {
- //we need this part for backwards compatibility with recovery
- //the legacy code checked if the placement matches the queue of the
- //application to be recovered, and if it did, it created an
- //ApplicationPlacementContext.
- //However at a later point this is going to be changed, there are two
- //major issues with this approach:
- // 1) The recovery only uses LEAF queue names, which must be updated
- // 2) The ORIGINAL queue which the application was submitted is NOT
- // stored this might result in different placement evaluation since
- // now we can have rules which give different result based on what
- // the user submitted.
- if (ret == null || !ret.getQueue().equals(asc.getQueue())) {
- return null;
- }
}
- return ret;
+ //If no rule was applied we return null, to let the engine move onto the
+ //next placementRule class
+ LOG.info("No matching rule found for application '{}', moving to next " +
+ "PlacementRule engine", asc.getApplicationName());
+ return null;
}
private ApplicationPlacementContext placeToQueue(
@@ -458,13 +410,13 @@ public class CSMappingPlacementRule extends PlacementRule {
String queueName = validateAndNormalizeQueue(
variables.replacePathVariables("%default"), false);
LOG.debug("Application '{}' have been placed to queue '{}' by " +
- "the fallback option of rule {}",
+ "the fallback option of rule {}",
asc.getApplicationName(), queueName, rule);
return createPlacementContext(queueName);
} catch (YarnException e) {
LOG.error("Rejecting application due to a failed fallback" +
" action '{}'" + ", reason: {}", asc.getApplicationName(),
- e);
+ e.getMessage());
//We intentionally omit the details, we don't want any server side
//config information to leak to the client side
throw new YarnException("Application submission have been rejected by a" +
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/MappingRuleValidationContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/MappingRuleValidationContextImpl.java
index 80bf929..cbde33f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/MappingRuleValidationContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/MappingRuleValidationContextImpl.java
@@ -154,22 +154,20 @@ public class MappingRuleValidationContextImpl
}
if (!(parentQueue instanceof ManagedParentQueue)) {
- if (parentQueue.getChildQueues() != null) {
- for (CSQueue queue : parentQueue.getChildQueues()) {
- if (queue instanceof LeafQueue) {
- //if a non managed parent queue has at least one leaf queue, this
- //mapping can be valid, we cannot do any more checks
- return true;
- }
+ for (CSQueue queue : parentQueue.getChildQueues()) {
+ if (queue instanceof LeafQueue) {
+ //if a non managed parent queue has at least one leaf queue, this
+ //mapping can be valid, we cannot do any more checks
+ return true;
}
}
//There is no way we can place anything into the queue referenced by the
// rule, because we cannot auto create, and we don't have any leaf queues
- //Actually this branch is not accessible with the current queue hierarchy,
+ //Actually this branch is not accessibe with the current queue hierarchy,
//there should be no parents without any leaf queues. This condition says
//for sanity checks
- throw new YarnException("Target queue path '" + path + "' has " +
+ throw new YarnException("Target queue path '" + path + "' has" +
"a non-managed parent queue which has no LeafQueues either.");
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java
index efde8f9..4e9195d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java
@@ -54,8 +54,7 @@ public class PlacementManager {
}
public ApplicationPlacementContext placeApplication(
- ApplicationSubmissionContext asc, String user, boolean recovery)
- throws YarnException {
+ ApplicationSubmissionContext asc, String user) throws YarnException {
readLock.lock();
try {
if (null == rules || rules.isEmpty()) {
@@ -64,7 +63,7 @@ public class PlacementManager {
ApplicationPlacementContext placement = null;
for (PlacementRule rule : rules) {
- placement = rule.getPlacementForApp(asc, user, recovery);
+ placement = rule.getPlacementForApp(asc, user);
if (placement != null) {
break;
}
@@ -75,11 +74,6 @@ public class PlacementManager {
readLock.unlock();
}
}
-
- public ApplicationPlacementContext placeApplication(
- ApplicationSubmissionContext asc, String user) throws YarnException {
- return placeApplication(asc, user, false);
- }
@VisibleForTesting
public List<PlacementRule> getPlacementRules() {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java
index 50d686a..dde632e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java
@@ -79,29 +79,4 @@ public abstract class PlacementRule {
*/
public abstract ApplicationPlacementContext getPlacementForApp(
ApplicationSubmissionContext asc, String user) throws YarnException;
-
-
- /**
- * Return the scheduler queue name the application should be placed in
- * wrapped in an {@link ApplicationPlacementContext} object.
- *
- * A non <code>null</code> return value places the application in a queue,
- * a <code>null</code> value means the queue is not yet determined. The
- * next {@link PlacementRule} in the list maintained in the
- * {@link PlacementManager} will be executed.
- *
- * @param asc The context of the application created on submission
- * @param user The name of the user submitting the application
- * @param recovery Indicates if the submission is a recovery
- *
- * @throws YarnException for any error while executing the rule
- *
- * @return The queue name wrapped in {@link ApplicationPlacementContext} or
- * <code>null</code> if no queue was resolved
- */
- public ApplicationPlacementContext getPlacementForApp(
- ApplicationSubmissionContext asc, String user, boolean recovery)
- throws YarnException {
- return getPlacementForApp(asc, user);
- }
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java
new file mode 100644
index 0000000..46de0f8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java
@@ -0,0 +1,558 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.placement;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.MappingType;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.QueueMappingBuilder;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedLeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ManagedParentQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+public class UserGroupMappingPlacementRule extends PlacementRule {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(UserGroupMappingPlacementRule.class);
+
+ public static final String CURRENT_USER_MAPPING = "%user";
+
+ public static final String PRIMARY_GROUP_MAPPING = "%primary_group";
+
+ public static final String SECONDARY_GROUP_MAPPING = "%secondary_group";
+
+ private boolean overrideWithQueueMappings = false;
+ private List<QueueMapping> mappings = null;
+ private Groups groups;
+ private CapacitySchedulerQueueManager queueManager;
+
+ public UserGroupMappingPlacementRule(){
+ this(false, null, null);
+ }
+
+ @VisibleForTesting
+ UserGroupMappingPlacementRule(boolean overrideWithQueueMappings,
+ List<QueueMapping> newMappings, Groups groups) {
+ this.mappings = newMappings;
+ this.overrideWithQueueMappings = overrideWithQueueMappings;
+ this.groups = groups;
+ }
+
+ private String getPrimaryGroup(String user) throws IOException {
+ return groups.getGroupsSet(user).iterator().next();
+ }
+
+ private String getSecondaryGroup(String user) throws IOException {
+ Set<String> groupsSet = groups.getGroupsSet(user);
+ String secondaryGroup = null;
+ // Traverse all secondary groups (as there could be more than one
+ // and position is not guaranteed) and ensure there is queue with
+ // the same name
+ Iterator<String> it = groupsSet.iterator();
+ it.next();
+ while (it.hasNext()) {
+ String group = it.next();
+ if (this.queueManager.getQueue(group) != null) {
+ secondaryGroup = group;
+ break;
+ }
+ }
+
+ if (secondaryGroup == null && LOG.isDebugEnabled()) {
+ LOG.debug("User {} is not associated with any Secondary "
+ + "Group. Hence it may use the 'default' queue", user);
+ }
+ return secondaryGroup;
+ }
+
+ private ApplicationPlacementContext getPlacementForUser(String user)
+ throws IOException {
+ for (QueueMapping mapping : mappings) {
+ if (mapping.getType().equals(MappingType.USER)) {
+ if (mapping.getSource().equals(CURRENT_USER_MAPPING)) {
+ if (mapping.getParentQueue() != null
+ && mapping.getParentQueue().equals(PRIMARY_GROUP_MAPPING)
+ && mapping.getQueue().equals(CURRENT_USER_MAPPING)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating placement context for user {} using " +
+ "primary group current user mapping", user);
+ }
+ return getContextForGroupParent(user, mapping,
+ getPrimaryGroup(user));
+ } else if (mapping.getParentQueue() != null
+ && mapping.getParentQueue().equals(SECONDARY_GROUP_MAPPING)
+ && mapping.getQueue().equals(CURRENT_USER_MAPPING)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating placement context for user {} using " +
+ "secondary group current user mapping", user);
+ }
+ return getContextForGroupParent(user, mapping,
+ getSecondaryGroup(user));
+ } else if (mapping.getQueue().equals(CURRENT_USER_MAPPING)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating placement context for user {} using " +
+ "current user mapping", user);
+ }
+ return getPlacementContext(mapping, user);
+ } else if (mapping.getQueue().equals(PRIMARY_GROUP_MAPPING)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating placement context for user {} using " +
+ "primary group mapping", user);
+ }
+ return getPlacementContext(mapping, getPrimaryGroup(user));
+ } else if (mapping.getQueue().equals(SECONDARY_GROUP_MAPPING)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating placement context for user {} using " +
+ "secondary group mapping", user);
+ }
+ return getPlacementContext(mapping, getSecondaryGroup(user));
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating placement context for user {} using " +
+ "current user static mapping", user);
+ }
+ return getPlacementContext(mapping);
+ }
+ }
+
+ if (user.equals(mapping.getSource())) {
+ if (mapping.getQueue().equals(PRIMARY_GROUP_MAPPING)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating placement context for user {} using " +
+ "static user primary group mapping", user);
+ }
+ return getPlacementContext(mapping, getPrimaryGroup(user));
+ } else if (mapping.getQueue().equals(SECONDARY_GROUP_MAPPING)) {
+ String secondaryGroup = getSecondaryGroup(user);
+ if (secondaryGroup != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating placement context for user {} using " +
+ "static user secondary group mapping", user);
+ }
+ return getPlacementContext(mapping, secondaryGroup);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Wanted to create placement context for user {}" +
+ " using static user secondary group mapping," +
+ " but user has no secondary group!", user);
+ }
+ return null;
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating placement context for user {} using " +
+ "current user static mapping", user);
+ }
+ return getPlacementContext(mapping);
+ }
+ }
+ }
+ if (mapping.getType().equals(MappingType.GROUP)) {
+ for (String userGroups : groups.getGroupsSet(user)) {
+ if (userGroups.equals(mapping.getSource())) {
+ if (mapping.getQueue().equals(CURRENT_USER_MAPPING)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating placement context for user {} using " +
+ "static group current user mapping", user);
+ }
+ return getPlacementContext(mapping, user);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating placement context for user {} using " +
+ "static group static mapping", user);
+ }
+ return getPlacementContext(mapping);
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * This convenience method allows to change the parent path or a leafName in
+ * a mapping object, by creating a new one, using the builder and copying the
+ * rest of the parameters.
+ * @param mapping The mapping to be changed
+ * @param parentPath The new parentPath of the mapping
+ * @param leafName The new leafQueueName of the mapping
+ * @return The updated NEW mapping
+ */
+ private QueueMapping alterMapping(
+ QueueMapping mapping, String parentPath, String leafName) {
+ return QueueMappingBuilder.create()
+ .type(mapping.getType())
+ .source(mapping.getSource())
+ .queue(leafName)
+ .parentQueue(parentPath)
+ .build();
+ }
+
+ // invoked for mappings:
+ // u:%user:%primary_group.%user
+ // u:%user:%secondary_group.%user
+ private ApplicationPlacementContext getContextForGroupParent(
+ String user,
+ QueueMapping mapping,
+ String group) throws IOException {
+
+ CSQueue groupQueue = this.queueManager.getQueue(group);
+ if (groupQueue != null) {
+ // replace the group string
+ QueueMapping resolvedGroupMapping = alterMapping(
+ mapping,
+ groupQueue.getQueuePath(),
+ user);
+ validateQueueMapping(resolvedGroupMapping);
+ return getPlacementContext(resolvedGroupMapping, user);
+ } else {
+ if (queueManager.isAmbiguous(group)) {
+ LOG.info("Queue mapping rule expect group queue to exist with name {}" +
+ " but the reference is ambiguous!", group);
+ } else {
+ LOG.info("Queue mapping rule expect group queue to exist with name {}" +
+ " but it does not exist!", group);
+ }
+ return null;
+ }
+ }
+
+ @Override
+ public ApplicationPlacementContext getPlacementForApp(
+ ApplicationSubmissionContext asc, String user)
+ throws YarnException {
+ String queueName = asc.getQueue();
+ ApplicationId applicationId = asc.getApplicationId();
+ if (mappings != null && mappings.size() > 0) {
+ try {
+ ApplicationPlacementContext mappedQueue = getPlacementForUser(user);
+ if (mappedQueue != null) {
+ // We have a mapping, should we use it?
+ if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)
+ //queueName will be same as mapped queue name in case of recovery
+ || queueName.equals(mappedQueue.getQueue())
+ || overrideWithQueueMappings) {
+ LOG.info("Application {} user {} mapping [{}] to [{}] override {}",
+ applicationId, user, queueName, mappedQueue.getQueue(),
+ overrideWithQueueMappings);
+ return mappedQueue;
+ }
+ }
+ } catch (IOException ioex) {
+ String message = "Failed to submit application " + applicationId +
+ " submitted by user " + user + " reason: " + ioex.getMessage();
+ throw new YarnException(message, ioex);
+ }
+ }
+ return null;
+ }
+
+ private ApplicationPlacementContext getPlacementContext(
+ QueueMapping mapping) throws IOException {
+ return getPlacementContext(mapping, mapping.getQueue());
+ }
+
+ private ApplicationPlacementContext getPlacementContext(QueueMapping mapping,
+ String leafQueueName) throws IOException {
+ //leafQueue name no longer identifies a queue uniquely checking ambiguity
+ if (!mapping.hasParentQueue() && queueManager.isAmbiguous(leafQueueName)) {
+ throw new IOException("mapping contains ambiguous leaf queue reference " +
+ leafQueueName);
+ }
+
+ if (!StringUtils.isEmpty(mapping.getParentQueue())) {
+ return getPlacementContextWithParent(mapping, leafQueueName);
+ } else {
+ return getPlacementContextNoParent(leafQueueName);
+ }
+ }
+
+ private ApplicationPlacementContext getPlacementContextWithParent(
+ QueueMapping mapping,
+ String leafQueueName) {
+ CSQueue parent = queueManager.getQueue(mapping.getParentQueue());
+ //we don't find the specified parent, so the placement rule is invalid
+ //for this case
+ if (parent == null) {
+ if (queueManager.isAmbiguous(mapping.getParentQueue())) {
+ LOG.warn("Placement rule specified a parent queue {}, but it is" +
+ "ambiguous.", mapping.getParentQueue());
+ } else {
+ LOG.warn("Placement rule specified a parent queue {}, but it does" +
+ "not exist.", mapping.getParentQueue());
+ }
+ return null;
+ }
+
+ String parentPath = parent.getQueuePath();
+
+ //if we have a parent which is not a managed parent, we check if the leaf
+ //queue exists under this parent
+ if (!(parent instanceof ManagedParentQueue)) {
+ CSQueue queue = queueManager.getQueue(parentPath + "." + leafQueueName);
+ //if the queue doesn't exit we return null
+ if (queue == null) {
+ LOG.warn("Placement rule specified a parent queue {}, but it is" +
+ " not a managed parent queue, and no queue exists with name {} " +
+ "under it.", mapping.getParentQueue(), leafQueueName);
+ return null;
+ }
+ }
+ //at this point we either have a managed parent or the queue actually
+ //exists so we have a placement context, returning it
+ return new ApplicationPlacementContext(leafQueueName, parentPath);
+ }
+
+ private ApplicationPlacementContext getPlacementContextNoParent(
+ String leafQueueName) {
+ //in this case we don't have a parent specified so we expect the queue to
+ //exist, otherwise the mapping will not be valid for this case
+ CSQueue queue = queueManager.getQueue(leafQueueName);
+ if (queue == null) {
+ if (queueManager.isAmbiguous(leafQueueName)) {
+ LOG.warn("Queue {} specified in placement rule is ambiguous",
+ leafQueueName);
+ } else {
+ LOG.warn("Queue {} specified in placement rule does not exist",
+ leafQueueName);
+ }
+ return null;
+ }
+
+ //getting parent path to make sure if the leaf name would become ambiguous
+ //the placement context stays valid.
+ CSQueue parent = queueManager.getQueue(leafQueueName).getParent();
+ return new ApplicationPlacementContext(
+ leafQueueName, parent.getQueuePath());
+ }
+
+ @VisibleForTesting
+ @Override
+ public boolean initialize(ResourceScheduler scheduler)
+ throws IOException {
+ if (!(scheduler instanceof CapacityScheduler)) {
+ throw new IOException(
+ "UserGroupMappingPlacementRule can be configured only for "
+ + "CapacityScheduler");
+ }
+ CapacitySchedulerContext schedulerContext =
+ (CapacitySchedulerContext) scheduler;
+ CapacitySchedulerConfiguration conf = schedulerContext.getConfiguration();
+ boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
+ LOG.info(
+ "Initialized queue mappings, override: " + overrideWithQueueMappings);
+
+ List<QueueMapping> queueMappings = conf.getQueueMappings();
+
+ // Get new user/group mappings
+ List<QueueMapping> newMappings = new ArrayList<>();
+
+ queueManager = schedulerContext.getCapacitySchedulerQueueManager();
+
+ // check if mappings refer to valid queues
+ for (QueueMapping mapping : queueMappings) {
+ //at this point mapping.getQueueName() return only the queue name, since
+ //the config parsing have been changed making QueueMapping more consistent
+
+ if (isStaticQueueMapping(mapping)) {
+ //Try getting queue by its full path name, if it exists it is a static
+ //leaf queue indeed, without any auto creation magic
+ CSQueue queue = queueManager.getQueue(mapping.getFullPath());
+ if (ifQueueDoesNotExist(queue)) {
+ //We might not be able to find the queue, because the reference was
+ // ambiguous this should only happen if the queue was referenced by
+ // leaf name only
+ if (queueManager.isAmbiguous(mapping.getFullPath())) {
+ throw new IOException(
+ "mapping contains ambiguous leaf queue reference " + mapping
+ .getFullPath());
+ }
+
+ //if leaf queue does not exist,
+ // this could be a potential auto created leaf queue
+ //validate if parent queue is specified,
+ // then it should exist and
+ // be an instance of AutoCreateEnabledParentQueue
+ QueueMapping newMapping = validateAndGetAutoCreatedQueueMapping(
+ queueManager, mapping);
+ if (newMapping == null) {
+ throw new IOException(
+ "mapping contains invalid or non-leaf queue " + mapping
+ .getQueue());
+ }
+ newMappings.add(newMapping);
+ } else {
+ // if queue exists, validate
+ // if its an instance of leaf queue
+ // if its an instance of auto created leaf queue,
+ // then extract parent queue name and update queue mapping
+ QueueMapping newMapping = validateAndGetQueueMapping(queueManager,
+ queue, mapping);
+ newMappings.add(newMapping);
+ }
+ } else{
+ //If it is a dynamic queue mapping,
+ // we can safely assume leaf queue name does not have '.' in it
+ // validate
+ // if parent queue is specified, then
+ // parent queue exists and an instance of AutoCreateEnabledParentQueue
+ //
+ QueueMapping newMapping = validateAndGetAutoCreatedQueueMapping(
+ queueManager, mapping);
+ if (newMapping != null) {
+ newMappings.add(newMapping);
+ } else{
+ newMappings.add(mapping);
+ }
+ }
+ }
+
+ // initialize groups if mappings are present
+ if (newMappings.size() > 0) {
+ this.mappings = newMappings;
+ this.groups = Groups.getUserToGroupsMappingService(
+ ((CapacityScheduler)scheduler).getConf());
+ this.overrideWithQueueMappings = overrideWithQueueMappings;
+ return true;
+ }
+ return false;
+ }
+
+ private static QueueMapping validateAndGetQueueMapping(
+ CapacitySchedulerQueueManager queueManager, CSQueue queue,
+ QueueMapping mapping) throws IOException {
+ if (!(queue instanceof LeafQueue)) {
+ throw new IOException(
+ "mapping contains invalid or non-leaf queue : " +
+ mapping.getFullPath());
+ }
+
+ if (queue instanceof AutoCreatedLeafQueue && queue
+ .getParent() instanceof ManagedParentQueue) {
+
+ QueueMapping newMapping = validateAndGetAutoCreatedQueueMapping(
+ queueManager, mapping);
+ if (newMapping == null) {
+ throw new IOException(
+ "mapping contains invalid or non-leaf queue "
+ + mapping.getFullPath());
+ }
+ return newMapping;
+ }
+ return mapping;
+ }
+
+ private static boolean ifQueueDoesNotExist(CSQueue queue) {
+ return queue == null;
+ }
+
+ private static QueueMapping validateAndGetAutoCreatedQueueMapping(
+ CapacitySchedulerQueueManager queueManager, QueueMapping mapping)
+ throws IOException {
+ if (mapping.hasParentQueue()
+ && (mapping.getParentQueue().equals(PRIMARY_GROUP_MAPPING)
+ || mapping.getParentQueue().equals(SECONDARY_GROUP_MAPPING))) {
+ // dynamic parent queue
+ return mapping;
+ } else if (mapping.hasParentQueue()) {
+ //if parent queue is specified,
+ // then it should exist and be an instance of ManagedParentQueue
+ QueuePlacementRuleUtils.validateQueueMappingUnderParentQueue(
+ queueManager.getQueue(mapping.getParentQueue()),
+ mapping.getParentQueue(), mapping.getQueue());
+ return mapping;
+ }
+
+ return null;
+ }
+
+ private static boolean isStaticQueueMapping(QueueMapping mapping) {
+ return !mapping.getQueue()
+ .contains(UserGroupMappingPlacementRule.CURRENT_USER_MAPPING)
+ && !mapping.getQueue()
+ .contains(UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING)
+ && !mapping.getQueue()
+ .contains(UserGroupMappingPlacementRule.SECONDARY_GROUP_MAPPING);
+ }
+
+ private void validateQueueMapping(QueueMapping queueMapping)
+ throws IOException {
+ String parentQueueName = queueMapping.getParentQueue();
+ String leafQueueFullName = queueMapping.getFullPath();
+ CSQueue parentQueue = queueManager.getQueueByFullName(parentQueueName);
+ CSQueue leafQueue = queueManager.getQueue(leafQueueFullName);
+
+ if (leafQueue == null || (!(leafQueue instanceof LeafQueue))) {
+ //this might be confusing, but a mapping is not guaranteed to provide the
+ //parent queue's name, which can result in ambiguous queue references
+ //if no parent queueName is provided mapping.getFullPath() is the same
+ //as mapping.getQueue()
+ if (leafQueue == null && queueManager.isAmbiguous(leafQueueFullName)) {
+ throw new IOException("mapping contains ambiguous leaf queue name: "
+ + leafQueueFullName);
+ } else if (parentQueue == null ||
+ (!(parentQueue instanceof ManagedParentQueue))) {
+ throw new IOException("mapping contains invalid or non-leaf queue " +
+ " and no managed parent is found: "
+ + leafQueueFullName);
+ }
+ } else if (parentQueue == null || (!(parentQueue instanceof ParentQueue))) {
+ throw new IOException(
+ "mapping contains invalid parent queue [" + parentQueueName + "]");
+ } else if (!parentQueue.getQueuePath()
+ .equals(leafQueue.getParent().getQueuePath())) {
+ throw new IOException("mapping contains invalid parent queue "
+ + "which does not match existing leaf queue's parent : ["
+ + parentQueue.getQueuePath() + "] does not match [ "
+ + leafQueue.getParent().getQueuePath() + "]");
+ }
+ }
+
+ @VisibleForTesting
+ public List<QueueMapping> getQueueMappings() {
+ return mappings;
+ }
+
+ @VisibleForTesting
+ @Private
+ public void setQueueManager(CapacitySchedulerQueueManager queueManager) {
+ this.queueManager = queueManager;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index e25301b..259cd5c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -35,10 +35,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
-import org.apache.hadoop.yarn.server.resourcemanager.placement.CSMappingPlacementRule;
-import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory;
-import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
@@ -75,6 +71,11 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.AppNameMappingPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
@@ -678,14 +679,24 @@ public class CapacityScheduler extends
}
}
-
@VisibleForTesting
- public PlacementRule getCSMappingPlacementRule() throws IOException {
+ public PlacementRule getUserGroupMappingPlacementRule() throws IOException {
readLock.lock();
try {
- CSMappingPlacementRule mappingRule = new CSMappingPlacementRule();
- mappingRule.initialize(this);
- return mappingRule;
+ UserGroupMappingPlacementRule ugRule = new UserGroupMappingPlacementRule();
+ ugRule.initialize(this);
+ return ugRule;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public PlacementRule getAppNameMappingPlacementRule() throws IOException {
+ readLock.lock();
+ try {
+ AppNameMappingPlacementRule anRule = new AppNameMappingPlacementRule();
+ anRule.initialize(this);
+ return anRule;
} finally {
readLock.unlock();
}
@@ -707,18 +718,19 @@ public class CapacityScheduler extends
}
placementRuleStrs = new ArrayList<>(distinguishRuleSet);
- boolean csMappingAdded = false;
for (String placementRuleStr : placementRuleStrs) {
switch (placementRuleStr) {
case YarnConfiguration.USER_GROUP_PLACEMENT_RULE:
+ PlacementRule ugRule = getUserGroupMappingPlacementRule();
+ if (null != ugRule) {
+ placementRules.add(ugRule);
+ }
+ break;
case YarnConfiguration.APP_NAME_PLACEMENT_RULE:
- if (!csMappingAdded) {
- PlacementRule csMappingRule = getCSMappingPlacementRule();
- if (null != csMappingRule) {
- placementRules.add(csMappingRule);
- csMappingAdded = true;
- }
+ PlacementRule anRule = getAppNameMappingPlacementRule();
+ if (null != anRule) {
+ placementRules.add(anRule);
}
break;
default:
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index aa78c21..0b74e50 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -1229,24 +1229,6 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
setStrings(QUEUE_MAPPING, StringUtils.join(",", queueMappingStrs));
}
-
- @Private
- @VisibleForTesting
- public void setAppNameMappings(List<QueueMapping> queueMappings) {
- if (queueMappings == null) {
- return;
- }
-
- List<String> queueMappingStrs = new ArrayList<>();
- for (QueueMapping mapping : queueMappings) {
- String rule = mapping.toString();
- String[] parts = rule.split(":");
- queueMappingStrs.add(parts[1] + ":" + parts[2]);
- }
-
- setStrings(QUEUE_MAPPING_NAME, StringUtils.join(",", queueMappingStrs));
- }
-
@Private
@VisibleForTesting
void setWorkflowPriorityMappings(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
index 8e53b1a..e8b4105 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
@@ -1053,9 +1053,7 @@ public class TestAppManager extends AppManagerTestBase{
}
}).when(placementMgr).placeApplication(
- any(ApplicationSubmissionContext.class),
- any(String.class),
- any(Boolean.class));
+ any(ApplicationSubmissionContext.class), any(String.class));
rmContext.setQueuePlacementManager(placementMgr);
asContext.setQueue("oldQueue");
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManagerWithFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManagerWithFairScheduler.java
index b3ed5ef..37ff02d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManagerWithFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManagerWithFairScheduler.java
@@ -127,7 +127,7 @@ public class TestAppManagerWithFairScheduler extends AppManagerTestBase {
ApplicationSubmissionContext asContext = createAppSubmitCtx(appId, res);
// Submit to limited queue
- when(placementMgr.placeApplication(any(), any(), any(Boolean.class)))
+ when(placementMgr.placeApplication(any(), any()))
.thenReturn(new ApplicationPlacementContext("limited"));
try {
rmAppManager.submitApplication(asContext, "test");
@@ -138,7 +138,7 @@ public class TestAppManagerWithFairScheduler extends AppManagerTestBase {
}
// submit same app but now place it in the unlimited queue
- when(placementMgr.placeApplication(any(), any(), any(Boolean.class)))
+ when(placementMgr.placeApplication(any(), any()))
.thenReturn(new ApplicationPlacementContext("root.unlimited"));
rmAppManager.submitApplication(asContext, "test");
}
@@ -172,7 +172,7 @@ public class TestAppManagerWithFairScheduler extends AppManagerTestBase {
ApplicationSubmissionContext asContext = createAppSubmitCtx(appId, res);
// Submit to no access queue
- when(placementMgr.placeApplication(any(), any(), any(Boolean.class)))
+ when(placementMgr.placeApplication(any(), any()))
.thenReturn(new ApplicationPlacementContext("noaccess"));
try {
rmAppManager.submitApplication(asContext, "test");
@@ -182,13 +182,13 @@ public class TestAppManagerWithFairScheduler extends AppManagerTestBase {
e.getCause() instanceof AccessControlException);
}
// Submit to submit access queue
- when(placementMgr.placeApplication(any(), any(), any(Boolean.class)))
+ when(placementMgr.placeApplication(any(), any()))
.thenReturn(new ApplicationPlacementContext("submitonly"));
rmAppManager.submitApplication(asContext, "test");
// Submit second app to admin access queue
appId = MockApps.newAppID(2);
asContext = createAppSubmitCtx(appId, res);
- when(placementMgr.placeApplication(any(), any(), any(Boolean.class)))
+ when(placementMgr.placeApplication(any(), any()))
.thenReturn(new ApplicationPlacementContext("adminonly"));
rmAppManager.submitApplication(asContext, "test");
}
@@ -245,7 +245,7 @@ public class TestAppManagerWithFairScheduler extends AppManagerTestBase {
ApplicationSubmissionContext asContext = createAppSubmitCtx(appId, res);
// Submit to noaccess parent with non existent child queue
- when(placementMgr.placeApplication(any(), any(), any(Boolean.class)))
+ when(placementMgr.placeApplication(any(), any()))
.thenReturn(new ApplicationPlacementContext("root.noaccess.child"));
try {
rmAppManager.submitApplication(asContext, "test");
@@ -255,7 +255,7 @@ public class TestAppManagerWithFairScheduler extends AppManagerTestBase {
e.getCause() instanceof AccessControlException);
}
// Submit to submitonly parent with non existent child queue
- when(placementMgr.placeApplication(any(), any(), any(Boolean.class)))
+ when(placementMgr.placeApplication(any(), any()))
.thenReturn(new ApplicationPlacementContext("root.submitonly.child"));
rmAppManager.submitApplication(asContext, "test");
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestPlacementManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestPlacementManager.java
index 7200247..22a9125 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestPlacementManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestPlacementManager.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.placement;
-import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@@ -34,6 +33,7 @@ import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
@@ -83,12 +83,9 @@ public class TestPlacementManager {
USER1))
.build();
- cs.getConfiguration().setQueueMappings(
- Lists.newArrayList(userQueueMapping));
- CSMappingPlacementRule ugRule = new CSMappingPlacementRule();
- ugRule.initialize(cs);
+ UserGroupMappingPlacementRule ugRule = new UserGroupMappingPlacementRule(
+ false, Arrays.asList(userQueueMapping), null);
queuePlacementRules.add(ugRule);
-
pm.updateRules(queuePlacementRules);
ApplicationSubmissionContext asc = Records.newRecord(
@@ -105,14 +102,17 @@ public class TestPlacementManager {
.parentQueue(PARENT_QUEUE)
.build();
- cs.getConfiguration().setAppNameMappings(
- Lists.newArrayList(queueMappingEntity));
- CSMappingPlacementRule anRule = new CSMappingPlacementRule();
- anRule.initialize(cs);
+ AppNameMappingPlacementRule anRule = new AppNameMappingPlacementRule(false,
+ Arrays.asList(queueMappingEntity));
queuePlacementRules.add(anRule);
pm.updateRules(queuePlacementRules);
- ApplicationPlacementContext pc = pm.placeApplication(asc, USER2);
- Assert.assertNotNull(pc);
+ try {
+ ApplicationPlacementContext pc = pm.placeApplication(asc, USER2);
+ Assert.assertNotNull(pc);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Exception not expected");
+ }
}
@Test
@@ -121,9 +121,10 @@ public class TestPlacementManager {
QueueMapping userQueueMapping = QueueMappingBuilder.create()
.type(MappingType.USER).source(USER1)
.queue(getQueueMapping(PARENT_QUEUE, USER1)).build();
+ UserGroupMappingPlacementRule ugRule = new UserGroupMappingPlacementRule(
+ false, Arrays.asList(userQueueMapping), null);
- CSMappingPlacementRule ugRule = new CSMappingPlacementRule();
-
+ // Configure placement rule
conf.set(YarnConfiguration.QUEUE_PLACEMENT_RULES, ugRule.getName());
queueMappings.add(userQueueMapping);
conf.setQueueMappings(queueMappings);
@@ -134,7 +135,7 @@ public class TestPlacementManager {
PlacementManager pm = cs.getRMContext().getQueuePlacementManager();
// As we are setting placement rule, It shouldn't update default
- // placement rule ie user-group. Number of placement rules should be 1.
+ // placement rule ie user-group. Number of placemnt rules should be 1.
Assert.assertEquals(1, pm.getPlacementRules().size());
// Verifying if placement rule set is same as the one we configured
Assert.assertEquals(ugRule.getName(),
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java
index d93496b..79f1d40 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.placement;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.isNull;
@@ -156,7 +157,7 @@ public class TestUserGroupMappingPlacementRule {
.build());
}
- @Test
+ @Test(expected = YarnException.class)
public void testNullGroupMapping() throws IOException, YarnException {
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
NullGroupsMapping.class, GroupMappingServiceProvider.class);
@@ -170,6 +171,7 @@ public class TestUserGroupMappingPlacementRule {
.inputUser("a")
.expectedQueue("default")
.build());
+ fail("No Groups for user 'a'");
}
@Test
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceWithAutoQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceWithAutoQueue.java
index 683e9fc..84d3756 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceWithAutoQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceWithAutoQueue.java
@@ -159,7 +159,6 @@ public class TestAbsoluteResourceWithAutoQueue
csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
- csConf.setOverrideWithQueueMappings(true);
mockRM = new MockRM(csConf);
cs = (CapacityScheduler) mockRM.getResourceScheduler();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java
index 4757cd7..a414ab4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java
@@ -114,9 +114,9 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
public static final String C = CapacitySchedulerConfiguration.ROOT + ".c";
public static final String D = CapacitySchedulerConfiguration.ROOT + ".d";
public static final String E = CapacitySchedulerConfiguration.ROOT + ".e";
- public static final String ESUBGROUP1 =
+ public static final String ASUBGROUP1 =
CapacitySchedulerConfiguration.ROOT + ".esubgroup1";
- public static final String FGROUP =
+ public static final String AGROUP =
CapacitySchedulerConfiguration.ROOT + ".fgroup";
public static final String A1 = A + ".a1";
public static final String A2 = A + ".a2";
@@ -124,14 +124,14 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
public static final String B2 = B + ".b2";
public static final String B3 = B + ".b3";
public static final String B4 = B + ".b4subgroup1";
- public static final String ESUBGROUP1_A = ESUBGROUP1 + ".e";
- public static final String FGROUP_F = FGROUP + ".f";
+ public static final String ASUBGROUP1_A = ASUBGROUP1 + ".e";
+ public static final String AGROUP_A = AGROUP + ".f";
public static final float A_CAPACITY = 20f;
public static final float B_CAPACITY = 20f;
public static final float C_CAPACITY = 20f;
public static final float D_CAPACITY = 20f;
- public static final float ESUBGROUP1_CAPACITY = 10f;
- public static final float FGROUP_CAPACITY = 10f;
+ public static final float ASUBGROUP1_CAPACITY = 10f;
+ public static final float AGROUP_CAPACITY = 10f;
public static final float A1_CAPACITY = 30;
public static final float A2_CAPACITY = 70;
@@ -371,8 +371,8 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
conf.setCapacity(B, B_CAPACITY);
conf.setCapacity(C, C_CAPACITY);
conf.setCapacity(D, D_CAPACITY);
- conf.setCapacity(ESUBGROUP1, ESUBGROUP1_CAPACITY);
- conf.setCapacity(FGROUP, FGROUP_CAPACITY);
+ conf.setCapacity(ASUBGROUP1, ASUBGROUP1_CAPACITY);
+ conf.setCapacity(AGROUP, AGROUP_CAPACITY);
// Define 2nd-level queues
conf.setQueues(A, new String[] { "a1", "a2" });
@@ -391,12 +391,12 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
conf.setCapacity(B4, B4_CAPACITY);
conf.setUserLimitFactor(B4, 100.0f);
- conf.setQueues(ESUBGROUP1, new String[] {"e"});
- conf.setCapacity(ESUBGROUP1_A, 100f);
- conf.setUserLimitFactor(ESUBGROUP1_A, 100.0f);
- conf.setQueues(FGROUP, new String[] {"f"});
- conf.setCapacity(FGROUP_F, 100f);
- conf.setUserLimitFactor(FGROUP_F, 100.0f);
+ conf.setQueues(ASUBGROUP1, new String[] {"e"});
+ conf.setCapacity(ASUBGROUP1_A, 100f);
+ conf.setUserLimitFactor(ASUBGROUP1_A, 100.0f);
+ conf.setQueues(AGROUP, new String[] {"f"});
+ conf.setCapacity(AGROUP_A, 100f);
+ conf.setUserLimitFactor(AGROUP_A, 100.0f);
conf.setUserLimitFactor(C, 1.0f);
conf.setAutoCreateChildQueueEnabled(C, true);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java
index 084a177..596cca1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java
@@ -86,6 +86,7 @@ import java.util.Set;
import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager
.NO_LABEL;
+import static org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.CURRENT_USER_MAPPING;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON;
import static org.junit.Assert.assertEquals;
@@ -105,8 +106,6 @@ public class TestCapacitySchedulerAutoQueueCreation
private static final Logger LOG = LoggerFactory.getLogger(
TestCapacitySchedulerAutoQueueCreation.class);
- private static final String CURRENT_USER_MAPPING = "%user";
-
private static final Resource TEMPLATE_MAX_RES = Resource.newInstance(16 *
GB,
48);
@@ -425,16 +424,16 @@ public class TestCapacitySchedulerAutoQueueCreation
//dynamic queue mapping
try {
- setupQueueMapping(newCS, CURRENT_USER_MAPPING, "a1",
+ setupQueueMapping(newCS, CURRENT_USER_MAPPING, "a",
CURRENT_USER_MAPPING);
newCS.updatePlacementRules();
fail("Expected invalid parent queue mapping failure");
} catch (IOException e) {
//expected exception
-
assertTrue(e.getMessage().contains(
- "Target queue path 'a1.%user' has a non-managed parent queue"));
+ "invalid parent queue which does not have auto creation of leaf "
+ + "queues enabled [" + "a" + "]"));
}
//"a" is not auto create enabled and app_user does not exist as a leaf
@@ -447,8 +446,8 @@ public class TestCapacitySchedulerAutoQueueCreation
fail("Expected invalid parent queue mapping failure");
} catch (IOException e) {
//expected exception
- assertTrue(e.getMessage().contains(
- "contains an invalid parent queue 'INVALID_PARENT_QUEUE'"));
+ assertTrue(e.getMessage()
+ .contains("invalid parent queue [" + "INVALID_PARENT_QUEUE" + "]"));
}
} finally {
if (newMockRM != null) {
@@ -478,7 +477,7 @@ public class TestCapacitySchedulerAutoQueueCreation
fail("Expected invalid parent queue mapping failure");
} catch (IOException e) {
//expected exception
- assertTrue(e.getMessage().contains("invalid parent queue"));
+ assertTrue(e.getMessage().contains("invalid parent queue []"));
}
} finally {
if (newMockRM != null) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueueMappingFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueueMappingFactory.java
index 6a478a0..5beda25 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueueMappingFactory.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueueMappingFactory.java
@@ -25,11 +25,11 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
-import org.apache.hadoop.yarn.server.resourcemanager.placement.CSMappingPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping;
import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.MappingType;
import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.QueueMappingBuilder;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SimpleGroupsMapping;
import org.apache.hadoop.yarn.util.Records;
@@ -46,8 +46,10 @@ import static org.junit.Assert.*;
public class TestCapacitySchedulerQueueMappingFactory {
private static final String QUEUE_MAPPING_NAME = "app-name";
- private static final String QUEUE_MAPPING_RULE =
- CSMappingPlacementRule.class.getCanonicalName();
+ private static final String QUEUE_MAPPING_RULE_APP_NAME =
+ "org.apache.hadoop.yarn.server.resourcemanager.placement.AppNameMappingPlacementRule";
+ private static final String QUEUE_MAPPING_RULE_USER_GROUP =
+ "org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule";
public static final String USER = "user_";
public static final String PARENT_QUEUE = "c";
@@ -57,7 +59,8 @@ public class TestCapacitySchedulerQueueMappingFactory {
List<String> queuePlacementRules = new ArrayList<>();
- queuePlacementRules.add(QUEUE_MAPPING_RULE);
+ queuePlacementRules.add(QUEUE_MAPPING_RULE_USER_GROUP);
+ queuePlacementRules.add(QUEUE_MAPPING_RULE_APP_NAME);
conf.setQueuePlacementRules(queuePlacementRules);
@@ -131,7 +134,8 @@ public class TestCapacitySchedulerQueueMappingFactory {
}
// verify both placement rules were added successfully
- assertThat(placementRuleNames, hasItems(QUEUE_MAPPING_RULE));
+ assertThat(placementRuleNames, hasItems(QUEUE_MAPPING_RULE_USER_GROUP));
+ assertThat(placementRuleNames, hasItems(QUEUE_MAPPING_RULE_APP_NAME));
} finally {
if(mockRM != null) {
mockRM.close();
@@ -150,7 +154,7 @@ public class TestCapacitySchedulerQueueMappingFactory {
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
List<String> queuePlacementRules = new ArrayList<>();
- queuePlacementRules.add(QUEUE_MAPPING_RULE);
+ queuePlacementRules.add(QUEUE_MAPPING_RULE_USER_GROUP);
conf.setQueuePlacementRules(queuePlacementRules);
List<QueueMapping> existingMappingsForUG = conf.getQueueMappings();
@@ -196,8 +200,8 @@ public class TestCapacitySchedulerQueueMappingFactory {
List<PlacementRule> rules =
cs.getRMContext().getQueuePlacementManager().getPlacementRules();
- CSMappingPlacementRule r =
- (CSMappingPlacementRule) rules.get(0);
+ UserGroupMappingPlacementRule r =
+ (UserGroupMappingPlacementRule) rules.get(0);
ApplicationPlacementContext ctx = r.getPlacementForApp(asc, "user1");
assertEquals("Queue", "b1", ctx.getQueue());
@@ -323,7 +327,7 @@ public class TestCapacitySchedulerQueueMappingFactory {
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
List<String> queuePlacementRules = new ArrayList<>();
- queuePlacementRules.add(QUEUE_MAPPING_RULE);
+ queuePlacementRules.add(QUEUE_MAPPING_RULE_USER_GROUP);
conf.setQueuePlacementRules(queuePlacementRules);
List<QueueMapping> existingMappingsForUG = conf.getQueueMappings();
@@ -349,8 +353,8 @@ public class TestCapacitySchedulerQueueMappingFactory {
List<PlacementRule> rules =
cs.getRMContext().getQueuePlacementManager().getPlacementRules();
- CSMappingPlacementRule r =
- (CSMappingPlacementRule) rules.get(0);
+ UserGroupMappingPlacementRule r =
+ (UserGroupMappingPlacementRule) rules.get(0);
ApplicationPlacementContext ctx = r.getPlacementForApp(asc, user);
assertEquals("Queue", user, ctx.getQueue());
@@ -378,7 +382,7 @@ public class TestCapacitySchedulerQueueMappingFactory {
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
List<String> queuePlacementRules = new ArrayList<>();
- queuePlacementRules.add(QUEUE_MAPPING_RULE);
+ queuePlacementRules.add(QUEUE_MAPPING_RULE_USER_GROUP);
conf.setQueuePlacementRules(queuePlacementRules);
List<QueueMapping> existingMappingsForUG = conf.getQueueMappings();
@@ -422,8 +426,8 @@ public class TestCapacitySchedulerQueueMappingFactory {
List<PlacementRule> rules =
cs.getRMContext().getQueuePlacementManager().getPlacementRules();
- CSMappingPlacementRule r =
- (CSMappingPlacementRule) rules.get(0);
+ UserGroupMappingPlacementRule r =
+ (UserGroupMappingPlacementRule) rules.get(0);
ApplicationPlacementContext ctx = r.getPlacementForApp(asc, "user1");
assertEquals("Queue", "b1", ctx.getQueue());
@@ -447,7 +451,7 @@ public class TestCapacitySchedulerQueueMappingFactory {
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
List<String> queuePlacementRules = new ArrayList<>();
- queuePlacementRules.add(QUEUE_MAPPING_RULE);
+ queuePlacementRules.add(QUEUE_MAPPING_RULE_USER_GROUP);
conf.setQueuePlacementRules(queuePlacementRules);
List<QueueMapping> existingMappingsForUG = conf.getQueueMappings();
@@ -469,11 +473,11 @@ public class TestCapacitySchedulerQueueMappingFactory {
.queue("%primary_group")
.build();
- // u:b4:c.%secondary_group
+ // u:b4:%secondary_group
QueueMapping userQueueMapping3 = QueueMappingBuilder.create()
.type(QueueMapping.MappingType.USER)
.source("e")
- .queue("c.%secondary_group")
+ .queue("%secondary_group")
.build();
queueMappingsForUG.add(userQueueMapping1);
@@ -499,8 +503,8 @@ public class TestCapacitySchedulerQueueMappingFactory {
List<PlacementRule> rules =
cs.getRMContext().getQueuePlacementManager().getPlacementRules();
- CSMappingPlacementRule r =
- (CSMappingPlacementRule) rules.get(0);
+ UserGroupMappingPlacementRule r =
+ (UserGroupMappingPlacementRule) rules.get(0);
ApplicationPlacementContext ctx = r.getPlacementForApp(asc, "user1");
assertEquals("Queue", "b1", ctx.getQueue());
@@ -510,7 +514,6 @@ public class TestCapacitySchedulerQueueMappingFactory {
ApplicationPlacementContext ctx2 = r.getPlacementForApp(asc, "e");
assertEquals("Queue", "esubgroup1", ctx2.getQueue());
- assertEquals("Queue", "root.c", ctx2.getParentQueue());
} finally {
if (mockRM != null) {
mockRM.close();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java
index dcd0fe0..039b9da 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java
@@ -18,14 +18,15 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
-import java.util.List;
-import org.apache.hadoop.yarn.server.resourcemanager.placement.MappingRule;
-import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.MappingType;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.QueueMappingBuilder;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -91,13 +92,12 @@ public class TestQueueMappings {
// space trimming
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, " u : a : " + Q1);
cs.reinitialize(conf, null);
-
- List<MappingRule> rules = cs.getConfiguration().getMappingRules();
-
- String ruleStr = rules.get(0).toString();
- assert(ruleStr.contains("variable='%user'"));
- assert(ruleStr.contains("value='a'"));
- assert(ruleStr.contains("queueName='q1'"));
+ checkQMapping(
+ QueueMappingBuilder.create()
+ .type(MappingType.USER)
+ .source("a")
+ .queue(Q1)
+ .build());
}
@Test
@@ -155,4 +155,13 @@ public class TestQueueMappings {
Assert.assertTrue("invalid mapping did not throw exception for " + reason,
fail);
}
+
+ private void checkQMapping(QueueMapping expected)
+ throws IOException {
+ UserGroupMappingPlacementRule rule =
+ (UserGroupMappingPlacementRule) cs.getRMContext()
+ .getQueuePlacementManager().getPlacementRules().get(0);
+ QueueMapping queueMapping = rule.getQueueMappings().get(0);
+ Assert.assertEquals(queueMapping, expected);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org