You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by pr...@apache.org on 2020/01/28 17:46:32 UTC
[hadoop] branch trunk updated: YARN-10022. RM Rest API to validate
the CapacityScheduler Configuration change
This is an automated email from the ASF dual-hosted git repository.
prabhujoseph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1ab9c69 YARN-10022. RM Rest API to validate the CapacityScheduler Configuration change
1ab9c69 is described below
commit 1ab9c692fa9107f10c0d8fbfbb9e74f54bddc960
Author: Prabhu Joseph <pr...@apache.org>
AuthorDate: Tue Jan 28 23:08:35 2020 +0530
YARN-10022. RM Rest API to validate the CapacityScheduler Configuration change
Contributed by Kinga Marton.
---
.../scheduler/MutableConfigurationProvider.java | 10 +
.../scheduler/capacity/CapacityScheduler.java | 79 ++---
.../capacity/CapacitySchedulerConfigValidator.java | 190 +++++++++++
.../capacity/CapacitySchedulerQueueManager.java | 88 +----
.../conf/MutableCSConfigurationProvider.java | 30 +-
.../server/resourcemanager/webapp/RMWSConsts.java | 6 +
.../resourcemanager/webapp/RMWebServices.java | 48 +++
.../CapacitySchedulerConfigGeneratorForTest.java | 53 +++
.../TestCapacitySchedulerConfigValidator.java | 364 +++++++++++++++++++++
.../resourcemanager/webapp/TestRMWebServices.java | 116 +++++++
10 files changed, 840 insertions(+), 144 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
index 751c9a3..b44e2bf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
@@ -54,6 +54,16 @@ public interface MutableConfigurationProvider {
SchedConfUpdateInfo confUpdate) throws Exception;
/**
+ * Apply the changes on top of the actual configuration.
+ * @param oldConfiguration actual configuration
+ * @param confUpdate changelist
+ * @return new configuration with the applied changed
+ * @throws IOException if the merge failed
+ */
+ Configuration applyChanges(Configuration oldConfiguration,
+ SchedConfUpdateInfo confUpdate) throws IOException;
+
+ /**
* Confirm last logged mutation.
* @param pendingMutation the log mutation to apply
* @param isValid if the last logged mutation is applied to scheduler
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index fcc0560..174a699 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -207,40 +207,9 @@ public class CapacityScheduler extends
private void validateConf(Configuration conf) {
// validate scheduler memory allocation setting
- int minMem = conf.getInt(
- YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
- int maxMem = conf.getInt(
- YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
-
- if (minMem <= 0 || minMem > maxMem) {
- throw new YarnRuntimeException("Invalid resource scheduler memory"
- + " allocation configuration"
- + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
- + "=" + minMem
- + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB
- + "=" + maxMem + ", min and max should be greater than 0"
- + ", max should be no smaller than min.");
- }
-
+ CapacitySchedulerConfigValidator.validateMemoryAllocation(conf);
// validate scheduler vcores allocation setting
- int minVcores = conf.getInt(
- YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
- int maxVcores = conf.getInt(
- YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
-
- if (minVcores <= 0 || minVcores > maxVcores) {
- throw new YarnRuntimeException("Invalid resource scheduler vcores"
- + " allocation configuration"
- + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES
- + "=" + minVcores
- + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES
- + "=" + maxVcores + ", min and max should be greater than 0"
- + ", max should be no smaller than min.");
- }
+ CapacitySchedulerConfigValidator.validateVCores(conf);
}
@Override
@@ -480,14 +449,17 @@ public class CapacityScheduler extends
super.serviceStop();
}
- @Override
- public void reinitialize(Configuration newConf, RMContext rmContext)
- throws IOException {
+ public void reinitialize(Configuration newConf, RMContext rmContext,
+ boolean validation) throws IOException {
writeLock.lock();
try {
Configuration configuration = new Configuration(newConf);
CapacitySchedulerConfiguration oldConf = this.conf;
- this.conf = csConfProvider.loadConfiguration(configuration);
+ if (validation) {
+ this.conf = new CapacitySchedulerConfiguration(newConf, false);
+ } else {
+ this.conf = csConfProvider.loadConfiguration(configuration);
+ }
validateConf(this.conf);
try {
LOG.info("Re-initializing queues...");
@@ -501,17 +473,26 @@ public class CapacityScheduler extends
throw new IOException("Failed to re-init queues : " + t.getMessage(),
t);
}
+ if (!validation) {
- // update lazy preemption
- this.isLazyPreemptionEnabled = this.conf.getLazyPreemptionEnabled();
+ // update lazy preemption
+ this.isLazyPreemptionEnabled = this.conf.getLazyPreemptionEnabled();
- // Setup how many containers we can allocate for each round
- offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit();
+ // Setup how many containers we can allocate for each round
+ offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit();
- super.reinitialize(newConf, rmContext);
+ super.reinitialize(newConf, rmContext);
+ }
} finally {
writeLock.unlock();
}
+
+ }
+
+ @Override
+ public void reinitialize(Configuration newConf, RMContext rmContext)
+ throws IOException {
+ reinitialize(newConf, rmContext, false);
}
long getAsyncScheduleInterval() {
@@ -714,19 +695,13 @@ public class CapacityScheduler extends
Collection<String> placementRuleStrs = conf.getStringCollection(
YarnConfiguration.QUEUE_PLACEMENT_RULES);
List<PlacementRule> placementRules = new ArrayList<>();
- Set<String> distingushRuleSet = new HashSet<>();
- // fail the case if we get duplicate placementRule add in
- for (String pls : placementRuleStrs) {
- if (!distingushRuleSet.add(pls)) {
- throw new IOException("Invalid PlacementRule inputs which "
- + "contains duplicate rule strings");
- }
- }
+ Set<String> distinguishRuleSet = CapacitySchedulerConfigValidator
+ .validatePlacementRules(placementRuleStrs);
// add UserGroupMappingPlacementRule if absent
- distingushRuleSet.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE);
+ distinguishRuleSet.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE);
- placementRuleStrs = new ArrayList<>(distingushRuleSet);
+ placementRuleStrs = new ArrayList<>(distinguishRuleSet);
for (String placementRuleStr : placementRuleStrs) {
switch (placementRuleStr) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java
new file mode 100644
index 0000000..525ea43
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.QueueState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public final class CapacitySchedulerConfigValidator {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ CapacitySchedulerConfigValidator.class);
+
+ private CapacitySchedulerConfigValidator() {
+ throw new IllegalStateException("Utility class");
+ }
+
+ public static boolean validateCSConfiguration(
+ final Configuration oldConf, final Configuration newConf,
+ final RMContext rmContext) throws IOException {
+ //TODO: extract all the validation steps and replace reinitialize with
+ //the specific validation steps
+ CapacityScheduler newCs = new CapacityScheduler();
+ newCs.setConf(oldConf);
+ newCs.setRMContext(rmContext);
+ newCs.init(oldConf);
+ newCs.reinitialize(newConf, rmContext, true);
+ return true;
+ }
+
+ public static Set<String> validatePlacementRules(
+ Collection<String> placementRuleStrs) throws IOException {
+ Set<String> distinguishRuleSet = new HashSet<>();
+ // fail the case if we get duplicate placementRule add in
+ for (String pls : placementRuleStrs) {
+ if (!distinguishRuleSet.add(pls)) {
+ throw new IOException("Invalid PlacementRule inputs which "
+ + "contains duplicate rule strings");
+ }
+ }
+ return distinguishRuleSet;
+ }
+
+ public static void validateMemoryAllocation(Configuration conf) {
+ int minMem = conf.getInt(
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+ int maxMem = conf.getInt(
+ YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+
+ if (minMem <= 0 || minMem > maxMem) {
+ throw new YarnRuntimeException("Invalid resource scheduler memory"
+ + " allocation configuration"
+ + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
+ + "=" + minMem
+ + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB
+ + "=" + maxMem + ", min and max should be greater than 0"
+ + ", max should be no smaller than min.");
+ }
+ }
+ public static void validateVCores(Configuration conf) {
+ int minVcores = conf.getInt(
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
+ int maxVcores = conf.getInt(
+ YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+
+ if (minVcores <= 0 || minVcores > maxVcores) {
+ throw new YarnRuntimeException("Invalid resource scheduler vcores"
+ + " allocation configuration"
+ + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES
+ + "=" + minVcores
+ + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES
+ + "=" + maxVcores + ", min and max should be greater than 0"
+ + ", max should be no smaller than min.");
+ }
+ }
+
+ /**
+ * Ensure all existing queues are present. Queues cannot be deleted if its not
+ * in Stopped state, Queue's cannot be moved from one hierarchy to other also.
+ * Previous child queue could be converted into parent queue if it is in
+ * STOPPED state.
+ *
+ * @param queues existing queues
+ * @param newQueues new queues
+ */
+ public static void validateQueueHierarchy(Map<String, CSQueue> queues,
+ Map<String, CSQueue> newQueues, CapacitySchedulerConfiguration newConf)
+ throws IOException {
+ // check that all static queues are included in the newQueues list
+ for (Map.Entry<String, CSQueue> e : queues.entrySet()) {
+ if (!(AbstractAutoCreatedLeafQueue.class.isAssignableFrom(e.getValue()
+ .getClass()))) {
+ String queueName = e.getKey();
+ CSQueue oldQueue = e.getValue();
+ CSQueue newQueue = newQueues.get(queueName);
+ if (null == newQueue) {
+ // old queue doesn't exist in the new XML
+ String configPrefix = newConf.getQueuePrefix(
+ oldQueue.getQueuePath());
+ QueueState newQueueState = null;
+ try {
+ newQueueState = QueueState.valueOf(
+ newConf.get(configPrefix + "state"));
+ } catch (Exception ex) {
+ LOG.warn("Not a valid queue state for queue "
+ + oldQueue.getQueuePath());
+ }
+ if (oldQueue.getState() == QueueState.STOPPED ||
+ newQueueState == QueueState.STOPPED) {
+ LOG.info("Deleting Queue " + queueName + ", as it is not"
+ + " present in the modified capacity configuration xml");
+ } else{
+ throw new IOException(oldQueue.getQueuePath() + " cannot be"
+ + " deleted from the capacity scheduler configuration, "
+ + "as the queue is not yet in stopped state. "
+ + "Current State : " + oldQueue.getState());
+ }
+ } else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) {
+ //Queue's cannot be moved from one hierarchy to other
+ throw new IOException(
+ queueName + " is moved from:" + oldQueue.getQueuePath()
+ + " to:" + newQueue.getQueuePath()
+ + " after refresh, which is not allowed.");
+ } else if (oldQueue instanceof ParentQueue
+ && !(oldQueue instanceof ManagedParentQueue)
+ && newQueue instanceof ManagedParentQueue) {
+ throw new IOException(
+ "Can not convert parent queue: " + oldQueue.getQueuePath()
+ + " to auto create enabled parent queue since "
+ + "it could have other pre-configured queues which is"
+ + " not supported");
+ } else if (oldQueue instanceof ManagedParentQueue
+ && !(newQueue instanceof ManagedParentQueue)) {
+ throw new IOException(
+ "Cannot convert auto create enabled parent queue: " + oldQueue
+ .getQueuePath() + " to leaf queue. Please check "
+ + " parent queue's configuration "
+ + CapacitySchedulerConfiguration
+ .AUTO_CREATE_CHILD_QUEUE_ENABLED
+ + " is set to true");
+ } else if (oldQueue instanceof LeafQueue
+ && newQueue instanceof ParentQueue) {
+ if (oldQueue.getState() == QueueState.STOPPED) {
+ LOG.info("Converting the leaf queue: " + oldQueue.getQueuePath()
+ + " to parent queue.");
+ } else{
+ throw new IOException(
+ "Can not convert the leaf queue: " + oldQueue.getQueuePath()
+ + " to parent queue since "
+ + "it is not yet in stopped state. Current State : "
+ + oldQueue.getState());
+ }
+ } else if (oldQueue instanceof ParentQueue
+ && newQueue instanceof LeafQueue) {
+ LOG.info("Converting the parent queue: " + oldQueue.getQueuePath()
+ + " to leaf queue.");
+ }
+ }
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
index d9b3ebd..1bbc7ca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.Permission;
@@ -177,7 +176,8 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
csContext.getRMContext().getHAServiceState()
!= HAServiceProtocol.HAServiceState.STANDBY) {
// Ensure queue hierarchy in the new XML file is proper.
- validateQueueHierarchy(queues, newQueues, newConf);
+ CapacitySchedulerConfigValidator
+ .validateQueueHierarchy(queues, newQueues, newConf);
}
// Add new queues and delete OldQeueus only after validation.
@@ -299,90 +299,6 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
return queue;
}
- /**
- * Ensure all existing queues are present. Queues cannot be deleted if its not
- * in Stopped state, Queue's cannot be moved from one hierarchy to other also.
- * Previous child queue could be converted into parent queue if it is in
- * STOPPED state.
- *
- * @param queues existing queues
- * @param newQueues new queues
- */
- private void validateQueueHierarchy(Map<String, CSQueue> queues,
- Map<String, CSQueue> newQueues, CapacitySchedulerConfiguration newConf)
- throws IOException {
- // check that all static queues are included in the newQueues list
- for (Map.Entry<String, CSQueue> e : queues.entrySet()) {
- if (!(AbstractAutoCreatedLeafQueue.class.isAssignableFrom(e.getValue()
- .getClass()))) {
- String queueName = e.getKey();
- CSQueue oldQueue = e.getValue();
- CSQueue newQueue = newQueues.get(queueName);
- if (null == newQueue) {
- // old queue doesn't exist in the new XML
- String configPrefix = newConf.getQueuePrefix(
- oldQueue.getQueuePath());
- QueueState newQueueState = null;
- try {
- newQueueState = QueueState.valueOf(
- newConf.get(configPrefix + "state"));
- } catch (Exception ex) {
- LOG.warn("Not a valid queue state for queue "
- + oldQueue.getQueuePath());
- }
- if (oldQueue.getState() == QueueState.STOPPED ||
- newQueueState == QueueState.STOPPED) {
- LOG.info("Deleting Queue " + queueName + ", as it is not"
- + " present in the modified capacity configuration xml");
- } else{
- throw new IOException(oldQueue.getQueuePath() + " cannot be"
- + " deleted from the capacity scheduler configuration, as the"
- + " queue is not yet in stopped state. Current State : "
- + oldQueue.getState());
- }
- } else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) {
- //Queue's cannot be moved from one hierarchy to other
- throw new IOException(
- queueName + " is moved from:" + oldQueue.getQueuePath() + " to:"
- + newQueue.getQueuePath()
- + " after refresh, which is not allowed.");
- } else if (oldQueue instanceof ParentQueue
- && !(oldQueue instanceof ManagedParentQueue)
- && newQueue instanceof ManagedParentQueue) {
- throw new IOException(
- "Can not convert parent queue: " + oldQueue.getQueuePath()
- + " to auto create enabled parent queue since "
- + "it could have other pre-configured queues which is not "
- + "supported");
- } else if (oldQueue instanceof ManagedParentQueue
- && !(newQueue instanceof ManagedParentQueue)) {
- throw new IOException(
- "Cannot convert auto create enabled parent queue: " + oldQueue
- .getQueuePath() + " to leaf queue. Please check "
- + " parent queue's configuration "
- + CapacitySchedulerConfiguration
- .AUTO_CREATE_CHILD_QUEUE_ENABLED
- + " is set to true");
- } else if (oldQueue instanceof LeafQueue
- && newQueue instanceof ParentQueue) {
- if (oldQueue.getState() == QueueState.STOPPED) {
- LOG.info("Converting the leaf queue: " + oldQueue.getQueuePath()
- + " to parent queue.");
- } else{
- throw new IOException(
- "Can not convert the leaf queue: " + oldQueue.getQueuePath()
- + " to parent queue since "
- + "it is not yet in stopped state. Current State : "
- + oldQueue.getState());
- }
- } else if (oldQueue instanceof ParentQueue
- && newQueue instanceof LeafQueue) {
- LOG.info("Converting the parent queue: " + oldQueue.getQueuePath()
- + " to leaf queue.");
- }
- }
- }
- }
/**
* Updates to our list of queues: Adds the new queues and deletes the removed
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
index 0914640..91bc47a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
@@ -131,17 +131,35 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
public LogMutation logAndApplyMutation(UserGroupInformation user,
SchedConfUpdateInfo confUpdate) throws Exception {
oldConf = new Configuration(schedConf);
- Map<String, String> kvUpdate = constructKeyValueConfUpdate(confUpdate);
+ CapacitySchedulerConfiguration proposedConf =
+ new CapacitySchedulerConfiguration(schedConf, false);
+ Map<String, String> kvUpdate
+ = constructKeyValueConfUpdate(proposedConf, confUpdate);
LogMutation log = new LogMutation(kvUpdate, user.getShortUserName());
confStore.logMutation(log);
+ applyMutation(proposedConf, kvUpdate);
+ schedConf = proposedConf;
+ return log;
+ }
+
+ public Configuration applyChanges(Configuration oldConfiguration,
+ SchedConfUpdateInfo confUpdate) throws IOException {
+ CapacitySchedulerConfiguration proposedConf =
+ new CapacitySchedulerConfiguration(oldConfiguration, false);
+ Map<String, String> kvUpdate
+ = constructKeyValueConfUpdate(proposedConf, confUpdate);
+ applyMutation(proposedConf, kvUpdate);
+ return proposedConf;
+ }
+
+ private void applyMutation(Configuration conf, Map<String, String> kvUpdate) {
for (Map.Entry<String, String> kv : kvUpdate.entrySet()) {
if (kv.getValue() == null) {
- schedConf.unset(kv.getKey());
+ conf.unset(kv.getKey());
} else {
- schedConf.set(kv.getKey(), kv.getValue());
+ conf.set(kv.getKey(), kv.getValue());
}
}
- return log;
}
@Override
@@ -217,9 +235,9 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
}
private Map<String, String> constructKeyValueConfUpdate(
+ CapacitySchedulerConfiguration proposedConf,
SchedConfUpdateInfo mutationInfo) throws IOException {
- CapacitySchedulerConfiguration proposedConf =
- new CapacitySchedulerConfiguration(schedConf, false);
+
Map<String, String> confUpdate = new HashMap<>();
for (String queueToRemove : mutationInfo.getRemoveQueueInfo()) {
removeQueue(queueToRemove, proposedConf, confUpdate);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java
index b738103..30406e5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java
@@ -57,6 +57,12 @@ public final class RMWSConsts {
/** Path for {@code RMWebServiceProtocol#dumpSchedulerLogs}. */
public static final String SCHEDULER_LOGS = "/scheduler/logs";
+ /**
+ * Path for {@code RMWebServiceProtocol#validateAndGetSchedulerConfiguration}.
+ */
+ public static final String SCHEDULER_CONF_VALIDATE
+ = "/scheduler-conf/validate";
+
/** Path for {@code RMWebServiceProtocol#getNodes}. */
public static final String NODES = "/nodes";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
index bdd8e64..c6858f9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
@@ -148,6 +148,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigValidator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
@@ -2618,6 +2619,53 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
}
}
+ @POST
+ @Path(RMWSConsts.SCHEDULER_CONF_VALIDATE)
+ @Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
+ MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
+ @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ public synchronized Response validateAndGetSchedulerConfiguration(
+ SchedConfUpdateInfo mutationInfo,
+ @Context HttpServletRequest hsr) throws AuthorizationException {
+ // Only admin user is allowed to read scheduler conf,
+ // in order to avoid leaking sensitive info, such as ACLs
+ UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
+ initForWritableEndpoints(callerUGI, true);
+ ResourceScheduler scheduler = rm.getResourceScheduler();
+ if (scheduler instanceof MutableConfScheduler && ((MutableConfScheduler)
+ scheduler).isConfigurationMutable()) {
+ try {
+ MutableConfigurationProvider mutableConfigurationProvider =
+ ((MutableConfScheduler) scheduler).getMutableConfProvider();
+ Configuration schedulerConf = mutableConfigurationProvider
+ .getConfiguration();
+ Configuration newConfig = mutableConfigurationProvider
+ .applyChanges(schedulerConf, mutationInfo);
+ Configuration yarnConf = ((CapacityScheduler) scheduler).getConf();
+ CapacitySchedulerConfigValidator.validateCSConfiguration(yarnConf,
+ newConfig, rm.getRMContext());
+
+ return Response.status(Status.OK)
+ .entity(new ConfInfo(newConfig))
+ .build();
+ } catch (Exception e) {
+ String errorMsg = "CapacityScheduler configuration validation failed:"
+ + e.toString();
+ LOG.warn(errorMsg);
+ return Response.status(Status.BAD_REQUEST)
+ .entity(errorMsg)
+ .build();
+ }
+ } else {
+ String errorMsg = "Configuration change validation only supported by " +
+ "MutableConfScheduler.";
+ LOG.warn(errorMsg);
+ return Response.status(Status.BAD_REQUEST)
+ .entity(errorMsg)
+ .build();
+ }
+ }
+
@PUT
@Path(RMWSConsts.SCHEDULER_CONF)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigGeneratorForTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigGeneratorForTest.java
new file mode 100644
index 0000000..1477a33
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigGeneratorForTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public final class CapacitySchedulerConfigGeneratorForTest {
+
+ private CapacitySchedulerConfigGeneratorForTest() {
+ throw new IllegalStateException("Utility class");
+ }
+
+ public static Configuration createConfiguration(Map<String, String> configs) {
+ Configuration config = new Configuration();
+ for (Map.Entry entry: configs.entrySet()) {
+ config.set((String)entry.getKey(), (String)entry.getValue());
+ }
+ return config;
+ }
+
+ public static Configuration createBasicCSConfiguration() {
+ Map<String, String> conf = new HashMap<>();
+ conf.put("yarn.scheduler.capacity.root.queues", "test1, test2");
+ conf.put("yarn.scheduler.capacity.root.test1.capacity", "50");
+ conf.put("yarn.scheduler.capacity.root.test2.capacity", "50");
+ conf.put("yarn.scheduler.capacity.root.test1.maximum-capacity", "100");
+ conf.put("yarn.scheduler.capacity.root.test1.state", "RUNNING");
+ conf.put("yarn.scheduler.capacity.root.test2.state", "RUNNING");
+ conf.put("yarn.scheduler.capacity.queue-mappings",
+ "u:test1:test1,u:test2:test2");
+ return createConfiguration(conf);
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerConfigValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerConfigValidator.java
new file mode 100644
index 0000000..04f4349
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerConfigValidator.java
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.LocalConfigurationProvider;
+import org.apache.hadoop.yarn.api.records.impl.LightWeightResource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.fail;
+
+public class TestCapacitySchedulerConfigValidator {
+
+ /**
+ * Test for the case when the scheduler.minimum-allocation-mb == 0.
+ */
+ @Test (expected = YarnRuntimeException.class)
+ public void testValidateMemoryAllocationInvalidMinMem() {
+ Map<String, String> configs = new HashMap();
+ configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, "0");
+ Configuration config = CapacitySchedulerConfigGeneratorForTest
+ .createConfiguration(configs);
+ CapacitySchedulerConfigValidator.validateMemoryAllocation(config);
+ fail(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB +
+ " should be > 0");
+ }
+
+ /**
+ * Test for the case when the scheduler.minimum-allocation-mb is greater than
+ * scheduler.maximum-allocation-mb.
+ */
+ @Test (expected = YarnRuntimeException.class)
+ public void testValidateMemoryAllocationHIgherMinThanMaxMem() {
+ Map<String, String> configs = new HashMap();
+ configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, "8192");
+ configs.put(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, "1024");
+ Configuration config = CapacitySchedulerConfigGeneratorForTest
+ .createConfiguration(configs);
+ CapacitySchedulerConfigValidator.validateMemoryAllocation(config);
+ fail(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB + " should be > "
+ + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+
+ }
+
+
+ @Test
+ public void testValidateMemoryAllocation() {
+ Map<String, String> configs = new HashMap();
+ configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, "1024");
+ configs.put(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, "8192");
+ Configuration config = CapacitySchedulerConfigGeneratorForTest
+ .createConfiguration(configs);
+ // there is no need for assertion, since there is no further method call
+ // inside the tested code and in case of a valid configuration no exception
+ // is thrown
+ CapacitySchedulerConfigValidator.validateMemoryAllocation(config);
+ }
+
+ /**
+ * Test for the case when the scheduler.minimum-allocation-vcores == 0.
+ */
+ @Test (expected = YarnRuntimeException.class)
+ public void testValidateVCoresInvalidMinVCore() {
+ Map<String, String> configs = new HashMap();
+ configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, "0");
+ Configuration config = CapacitySchedulerConfigGeneratorForTest
+ .createConfiguration(configs);
+ CapacitySchedulerConfigValidator.validateVCores(config);
+ fail(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES
+ + " should be > 0");
+ }
+
+ /**
+ * Test for the case when the scheduler.minimum-allocation-vcores is greater
+ * than scheduler.maximum-allocation-vcores.
+ */
+ @Test (expected = YarnRuntimeException.class)
+ public void testValidateVCoresHigherMinThanMaxVCore() {
+ Map<String, String> configs = new HashMap();
+ configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, "4");
+ configs.put(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, "1");
+ Configuration config = CapacitySchedulerConfigGeneratorForTest
+ .createConfiguration(configs);
+ CapacitySchedulerConfigValidator.validateVCores(config);
+ fail(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES +
+ " should be > "
+ + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+
+ }
+
+
+ @Test
+ public void testValidateVCores() {
+ Map<String, String> configs = new HashMap();
+ configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, "1");
+ configs.put(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, "4");
+ Configuration config = CapacitySchedulerConfigGeneratorForTest
+ .createConfiguration(configs);
+ // there is no need for assertion, since there is no further method call
+ // inside the tested code and in case of a valid configuration no exception
+ // is thrown
+ CapacitySchedulerConfigValidator.validateVCores(config);
+ }
+
+ @Test
+ public void testValidateCSConfigInvalidCapacity() {
+ Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
+ .createBasicCSConfiguration();
+ Configuration newConfig = new Configuration(oldConfig);
+ newConfig
+ .set("yarn.scheduler.capacity.root.test1.capacity", "500");
+ RMContext rmContext = prepareRMContext();
+ try {
+ CapacitySchedulerConfigValidator
+ .validateCSConfiguration(oldConfig, newConfig, rmContext);
+ fail("Invalid capacity");
+ } catch (IOException e) {
+ Assert.assertTrue(e.getCause().getMessage()
+ .startsWith("Illegal capacity"));
+ }
+ }
+
+ @Test
+ public void testValidateCSConfigStopALeafQueue() throws IOException {
+ Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
+ .createBasicCSConfiguration();
+ Configuration newConfig = new Configuration(oldConfig);
+ newConfig
+ .set("yarn.scheduler.capacity.root.test1.state", "STOPPED");
+ RMContext rmContext = prepareRMContext();
+ Boolean isValidConfig = CapacitySchedulerConfigValidator
+ .validateCSConfiguration(oldConfig, newConfig, rmContext);
+ Assert.assertTrue(isValidConfig);
+ }
+
+ /**
+ * Stop the root queue if there are running child queues.
+ */
+ @Test
+ public void testValidateCSConfigStopANonLeafQueueInvalid() {
+ Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
+ .createBasicCSConfiguration();
+ Configuration newConfig = new Configuration(oldConfig);
+ newConfig
+ .set("yarn.scheduler.capacity.root.state", "STOPPED");
+ RMContext rmContext = prepareRMContext();
+ try {
+ CapacitySchedulerConfigValidator
+ .validateCSConfiguration(oldConfig, newConfig, rmContext);
+ fail("There are child queues in running state");
+ } catch (IOException e) {
+ Assert.assertTrue(e.getCause().getMessage()
+ .contains("The parent queue:root cannot be STOPPED"));
+ }
+ }
+
+ @Test
+ public void testValidateCSConfigStopANonLeafQueue() throws IOException {
+ Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
+ .createBasicCSConfiguration();
+ Configuration newConfig = new Configuration(oldConfig);
+ newConfig
+ .set("yarn.scheduler.capacity.root.state", "STOPPED");
+ newConfig
+ .set("yarn.scheduler.capacity.root.test1.state", "STOPPED");
+ newConfig
+ .set("yarn.scheduler.capacity.root.test2.state", "STOPPED");
+ RMContext rmContext = prepareRMContext();
+ Boolean isValidConfig = CapacitySchedulerConfigValidator
+ .validateCSConfiguration(oldConfig, newConfig, rmContext);
+ Assert.assertTrue(isValidConfig);
+
+ }
+
+ /**
+ * Add a leaf queue without modifying the capacity of other leaf queues
+ * so the total capacity != 100.
+ */
+ @Test
+ public void testValidateCSConfigAddALeafQueueInvalid() {
+ Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
+ .createBasicCSConfiguration();
+ Configuration newConfig = new Configuration(oldConfig);
+ newConfig
+ .set("yarn.scheduler.capacity.root.queues", "test1, test2, test3");
+ newConfig
+ .set("yarn.scheduler.capacity.root.test3.state", "RUNNING");
+ newConfig
+ .set("yarn.scheduler.capacity.root.test3.capacity", "30");
+
+ RMContext rmContext = prepareRMContext();
+ try {
+ CapacitySchedulerConfigValidator
+ .validateCSConfiguration(oldConfig, newConfig, rmContext);
+ fail("Invalid capacity for children of queue root");
+ } catch (IOException e) {
+ Assert.assertTrue(e.getCause().getMessage()
+ .startsWith("Illegal capacity"));
+ }
+ }
+
+ /**
+ * Add a leaf queue by modifying the capacity of other leaf queues
+ * and adjust the capacities of other leaf queues, so total capacity = 100.
+ */
+ @Test
+ public void testValidateCSConfigAddALeafQueueValid() throws IOException {
+ Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
+ .createBasicCSConfiguration();
+ Configuration newConfig = new Configuration(oldConfig);
+ newConfig
+ .set("yarn.scheduler.capacity.root.queues", "test1, test2, test3");
+ newConfig
+ .set("yarn.scheduler.capacity.root.test3.state", "RUNNING");
+ newConfig
+ .set("yarn.scheduler.capacity.root.test3.capacity", "30");
+ newConfig
+ .set("yarn.scheduler.capacity.root.test1.capacity", "20");
+
+ RMContext rmContext = prepareRMContext();
+ Boolean isValidConfig = CapacitySchedulerConfigValidator
+ .validateCSConfiguration(oldConfig, newConfig, rmContext);
+ Assert.assertTrue(isValidConfig);
+ }
+
+ /**
+ * Delete a running queue.
+ */
+ @Test
+ public void testValidateCSConfigInvalidQueueDeletion() {
+ Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
+ .createBasicCSConfiguration();
+ Configuration newConfig = new Configuration(oldConfig);
+ newConfig.set("yarn.scheduler.capacity.root.queues", "test1");
+ newConfig.set("yarn.scheduler.capacity.root.test1.capacity", "100");
+ newConfig.unset("yarn.scheduler.capacity.root.test2.maximum-capacity");
+ newConfig.unset("yarn.scheduler.capacity.root.test2.state");
+ newConfig.set("yarn.scheduler.capacity.queue-mappings",
+ "u:test1:test1");
+ RMContext rmContext = prepareRMContext();
+ try {
+ CapacitySchedulerConfigValidator
+ .validateCSConfiguration(oldConfig, newConfig, rmContext);
+ fail("Invalid capacity for children of queue root");
+ } catch (IOException e) {
+ Assert.assertTrue(e.getCause().getMessage()
+ .contains("root.test2 cannot be deleted"));
+ Assert.assertTrue(e.getCause().getMessage()
+ .contains("the queue is not yet in stopped state"));
+ }
+ }
+
+ /**
+ * Delete a queue and not adjust capacities.
+ */
+ @Test
+ public void testValidateCSConfigInvalidQueueDeletion2() {
+ Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
+ .createBasicCSConfiguration();
+ Configuration newConfig = new Configuration(oldConfig);
+ newConfig.set("yarn.scheduler.capacity.root.queues", "test1");
+ newConfig.unset("yarn.scheduler.capacity.root.test2.maximum-capacity");
+ newConfig.unset("yarn.scheduler.capacity.root.test2.state");
+ newConfig.set("yarn.scheduler.capacity.queue-mappings",
+ "u:test1:test1");
+ RMContext rmContext = prepareRMContext();
+ try {
+ CapacitySchedulerConfigValidator
+ .validateCSConfiguration(oldConfig, newConfig, rmContext);
+ fail("Invalid capacity for children of queue root");
+ } catch (IOException e) {
+ Assert.assertTrue(e.getCause().getMessage()
+ .contains("Illegal capacity"));
+ }
+ }
+
+ /**
+ * Delete a queue and adjust capacities to have total capacity = 100.
+ */
+ @Test
+ public void testValidateCSConfigValidQueueDeletion() throws IOException {
+ Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
+ .createBasicCSConfiguration();
+ oldConfig.set("yarn.scheduler.capacity.root.test2.state", "STOPPED");
+ Configuration newConfig = new Configuration(oldConfig);
+ newConfig.set("yarn.scheduler.capacity.root.queues", "test1");
+ newConfig.set("yarn.scheduler.capacity.root.test1.capacity", "100");
+ newConfig.unset("yarn.scheduler.capacity.root.test2.maximum-capacity");
+ newConfig.unset("yarn.scheduler.capacity.root.test2.state");
+ newConfig.set("yarn.scheduler.capacity.queue-mappings",
+ "u:test1:test1");
+ RMContext rmContext = prepareRMContext();
+ boolean isValidConfig = CapacitySchedulerConfigValidator
+ .validateCSConfiguration(oldConfig, newConfig, rmContext);
+ Assert.assertTrue(isValidConfig);
+
+ }
+
+ @Test
+ public void testAddQueueToALeafQueue() throws IOException {
+ Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
+ .createBasicCSConfiguration();
+ oldConfig.set("yarn.scheduler.capacity.root.test1.state", "STOPPED");
+ Configuration newConfig = new Configuration(oldConfig);
+ newConfig.set("yarn.scheduler.capacity.root.test1.queues", "newQueue");
+ newConfig
+ .set("yarn.scheduler.capacity.root.test1.newQueue.capacity", "100");
+ newConfig.set("yarn.scheduler.capacity.queue-mappings",
+ "u:test1:test2");
+ RMContext rmContext = prepareRMContext();
+ boolean isValidConfig = CapacitySchedulerConfigValidator
+ .validateCSConfiguration(oldConfig, newConfig, rmContext);
+ Assert.assertTrue(isValidConfig);
+ }
+
+
+ public static RMContext prepareRMContext() {
+ RMContext rmContext = Mockito.mock(RMContext.class);
+ LocalConfigurationProvider configProvider = Mockito
+ .mock(LocalConfigurationProvider.class);
+ Mockito.when(rmContext.getConfigurationProvider())
+ .thenReturn(configProvider);
+ RMNodeLabelsManager nodeLabelsManager = Mockito
+ .mock(RMNodeLabelsManager.class);
+ Mockito.when(rmContext.getNodeLabelManager()).thenReturn(nodeLabelsManager);
+ LightWeightResource partitionResource = Mockito
+ .mock(LightWeightResource.class);
+ Mockito.when(nodeLabelsManager
+ .getResourceByLabel(Mockito.any(), Mockito.any()))
+ .thenReturn(partitionResource);
+ PlacementManager queuePlacementManager = Mockito
+ .mock(PlacementManager.class);
+ Mockito.when(rmContext.getQueuePlacementManager())
+ .thenReturn(queuePlacementManager);
+ return rmContext;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java
index e472683..450d07a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java
@@ -28,12 +28,14 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
+import java.io.IOException;
import java.io.StringReader;
import java.security.Principal;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
@@ -52,6 +54,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.JettyUtils;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
@@ -75,6 +78,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerConfigValidator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.MutableCSConfigurationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
@@ -89,6 +95,8 @@ import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.GuiceServletConfig;
import org.apache.hadoop.yarn.webapp.JerseyTestBase;
import org.apache.hadoop.yarn.webapp.WebServicesTestUtils;
+import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
+import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.junit.Assert;
@@ -982,4 +990,112 @@ public class TestRMWebServices extends JerseyTestBase {
assertEquals("requestedUser doesn't match: ",
requestedUser, userInfo.getRequestedUser());
}
+
+ @Test
+ public void testValidateAndGetSchedulerConfigurationInvalidScheduler()
+ throws AuthorizationException {
+ ResourceScheduler scheduler = new CapacityScheduler();
+ RMWebServices webService = prepareWebServiceForValidation(scheduler);
+ SchedConfUpdateInfo mutationInfo = new SchedConfUpdateInfo();
+ HttpServletRequest mockHsr = prepareServletRequestForValidation();
+ Response response = webService
+ .validateAndGetSchedulerConfiguration(mutationInfo, mockHsr);
+ Assert.assertEquals(Status.BAD_REQUEST
+ .getStatusCode(), response.getStatus());
+ Assert.assertTrue(response.getEntity().toString()
+ .contains("Configuration change validation only supported by"
+ +" MutableConfScheduler."));
+ }
+
+ @Test
+ public void testValidateAndGetSchedulerConfigurationInvalidConfig()
+ throws IOException {
+ Configuration config = CapacitySchedulerConfigGeneratorForTest
+ .createBasicCSConfiguration();
+ ResourceScheduler scheduler = prepareCSForValidation(config);
+
+ SchedConfUpdateInfo mutationInfo = new SchedConfUpdateInfo();
+ ArrayList<String> queuesToRemove = new ArrayList();
+ queuesToRemove.add("root.test1");
+ mutationInfo.setRemoveQueueInfo(queuesToRemove);
+
+ RMWebServices webService = prepareWebServiceForValidation(scheduler);
+ HttpServletRequest mockHsr = prepareServletRequestForValidation();
+
+ Response response = webService
+ .validateAndGetSchedulerConfiguration(mutationInfo, mockHsr);
+ Assert.assertEquals(Status.BAD_REQUEST
+ .getStatusCode(), response.getStatus());
+ Assert.assertTrue(response.getEntity().toString()
+ .contains("Illegal capacity of 0.5 for children of queue"));
+ }
+
+ @Test
+ public void testValidateAndGetSchedulerConfigurationValidScheduler()
+ throws IOException {
+ Configuration config = CapacitySchedulerConfigGeneratorForTest
+ .createBasicCSConfiguration();
+ config.set("yarn.scheduler.capacity.root.test1.state", "STOPPED");
+ config.set("yarn.scheduler.capacity.queue-mappings",
+ "u:test2:test2");
+ ResourceScheduler scheduler = prepareCSForValidation(config);
+
+ SchedConfUpdateInfo mutationInfo = new SchedConfUpdateInfo();
+ ArrayList<String> queuesToRemove = new ArrayList();
+ queuesToRemove.add("root.test1");
+ mutationInfo.setRemoveQueueInfo(queuesToRemove);
+ ArrayList<QueueConfigInfo> updateQueueInfo = new ArrayList<>();
+ String queueToUpdate = "root.test2";
+ Map<String, String> propertiesToUpdate = new HashMap<>();
+ propertiesToUpdate.put("capacity", "100");
+ updateQueueInfo.add(new QueueConfigInfo(queueToUpdate, propertiesToUpdate));
+ mutationInfo.setUpdateQueueInfo(updateQueueInfo);
+
+ RMWebServices webService = prepareWebServiceForValidation(scheduler);
+ HttpServletRequest mockHsr = prepareServletRequestForValidation();
+
+ Response response = webService
+ .validateAndGetSchedulerConfiguration(mutationInfo, mockHsr);
+ Assert.assertEquals(Status.OK
+ .getStatusCode(), response.getStatus());
+ }
+
+ private CapacityScheduler prepareCSForValidation(Configuration config)
+ throws IOException {
+ CapacityScheduler scheduler = mock(CapacityScheduler.class);
+ when(scheduler.isConfigurationMutable())
+ .thenReturn(true);
+ MutableCSConfigurationProvider configurationProvider =
+ mock(MutableCSConfigurationProvider.class);
+ when(scheduler.getMutableConfProvider())
+ .thenReturn(configurationProvider);
+
+ when(configurationProvider.getConfiguration()).thenReturn(config);
+ when(scheduler.getConf()).thenReturn(config);
+ when(configurationProvider
+ .applyChanges(any(), any())).thenCallRealMethod();
+ return scheduler;
+ }
+
+ private HttpServletRequest prepareServletRequestForValidation() {
+ HttpServletRequest mockHsr = mock(HttpServletRequest.class);
+ when(mockHsr.getUserPrincipal()).thenReturn(() -> "yarn");
+ return mockHsr;
+ }
+
+ private RMWebServices prepareWebServiceForValidation(
+ ResourceScheduler scheduler) {
+ ResourceManager mockRM = mock(ResourceManager.class);
+ ApplicationACLsManager acLsManager = mock(ApplicationACLsManager.class);
+ RMWebServices webService = new RMWebServices(mockRM, new Configuration(),
+ mock(HttpServletResponse.class));
+ when(mockRM.getResourceScheduler()).thenReturn(scheduler);
+ when(acLsManager.areACLsEnabled()).thenReturn(false);
+ when(mockRM.getApplicationACLsManager()).thenReturn(acLsManager);
+ RMContext context = TestCapacitySchedulerConfigValidator.prepareRMContext();
+ when(mockRM.getRMContext()).thenReturn(context);
+
+ return webService;
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org