You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jh...@apache.org on 2017/09/21 01:07:48 UTC
[45/50] [abbrv] hadoop git commit: YARN-5952. Create REST API for
changing YARN scheduler configurations. (Jonathan Hung via wangda)
YARN-5952. Create REST API for changing YARN scheduler configurations. (Jonathan Hung via wangda)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fbcc60ce
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fbcc60ce
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fbcc60ce
Branch: refs/heads/YARN-5734
Commit: fbcc60ce7d75812fd5957e8fe5b17abf0421a613
Parents: 0de6349
Author: Wangda Tan <wa...@apache.org>
Authored: Mon Apr 3 10:12:01 2017 -0700
Committer: Jonathan Hung <jh...@linkedin.com>
Committed: Wed Sep 20 17:40:53 2017 -0700
----------------------------------------------------------------------
.../scheduler/MutableConfScheduler.java | 40 ++
.../scheduler/MutableConfigurationProvider.java | 5 +-
.../scheduler/capacity/CapacityScheduler.java | 16 +-
.../conf/InMemoryConfigurationStore.java | 6 +-
.../conf/MutableCSConfigurationProvider.java | 24 +-
.../resourcemanager/webapp/RMWebServices.java | 172 ++++++-
.../webapp/dao/QueueConfigInfo.java | 57 +++
.../webapp/dao/QueueConfigsUpdateInfo.java | 60 +++
.../TestMutableCSConfigurationProvider.java | 6 +-
.../TestRMWebServicesConfigurationMutation.java | 477 +++++++++++++++++++
10 files changed, 851 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fbcc60ce/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java
new file mode 100644
index 0000000..35e36e1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Interface for a scheduler that supports changing configuration at runtime.
+ *
+ */
+public interface MutableConfScheduler extends ResourceScheduler {
+
+ /**
+ * Update the scheduler's configuration.
+ * @param user Caller of this update
+ * @param confUpdate key-value map of the configuration update
+ * @throws IOException if update is invalid
+ */
+ void updateConfiguration(UserGroupInformation user,
+ Map<String, String> confUpdate) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fbcc60ce/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
index da30a2b..889c3bc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+import java.io.IOException;
import java.util.Map;
/**
@@ -29,7 +30,9 @@ public interface MutableConfigurationProvider {
* Update the scheduler configuration with the provided key value pairs.
* @param user User issuing the request
* @param confUpdate Key-value pairs for configurations to be updated.
+ * @throws IOException if scheduler could not be reinitialized
*/
- void mutateConfiguration(String user, Map<String, String> confUpdate);
+ void mutateConfiguration(String user, Map<String, String> confUpdate)
+ throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fbcc60ce/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 11f2f6e..7c5839b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -86,6 +86,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
@@ -150,7 +152,7 @@ import com.google.common.util.concurrent.SettableFuture;
public class CapacityScheduler extends
AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> implements
PreemptableResourceScheduler, CapacitySchedulerContext, Configurable,
- ResourceAllocationCommitter {
+ ResourceAllocationCommitter, MutableConfScheduler {
private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
@@ -2610,4 +2612,16 @@ public class CapacityScheduler extends
// In seconds
return ((LeafQueue) queue).getMaximumApplicationLifetime();
}
+
+ @Override
+ public void updateConfiguration(UserGroupInformation user,
+ Map<String, String> confUpdate) throws IOException {
+ if (csConfProvider instanceof MutableConfigurationProvider) {
+ ((MutableConfigurationProvider) csConfProvider).mutateConfiguration(
+ user.getShortUserName(), confUpdate);
+ } else {
+ throw new UnsupportedOperationException("Configured CS configuration " +
+ "provider does not support updating configuration.");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fbcc60ce/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
index a208fb9..b97be1b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
@@ -58,7 +58,11 @@ public class InMemoryConfigurationStore implements YarnConfigurationStore {
if (isValid) {
Map<String, String> mutations = mutation.getUpdates();
for (Map.Entry<String, String> kv : mutations.entrySet()) {
- schedConf.set(kv.getKey(), kv.getValue());
+ if (kv.getValue() == null) {
+ schedConf.unset(kv.getKey());
+ } else {
+ schedConf.set(kv.getKey(), kv.getValue());
+ }
}
}
return true;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fbcc60ce/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
index 267ab6a..ea1b3c0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
@@ -60,34 +60,44 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
}
Configuration initialSchedConf = new Configuration(false);
initialSchedConf.addResource(YarnConfiguration.CS_CONFIGURATION_FILE);
- this.schedConf = initialSchedConf;
- confStore.initialize(config, initialSchedConf);
+ this.schedConf = new Configuration(false);
+ // We need to explicitly set the key-values in schedConf, otherwise
+ // these configuration keys cannot be deleted when
+ // configuration is reloaded.
+ for (Map.Entry<String, String> kv : initialSchedConf) {
+ schedConf.set(kv.getKey(), kv.getValue());
+ }
+ confStore.initialize(config, schedConf);
this.conf = config;
}
@Override
public CapacitySchedulerConfiguration loadConfiguration(Configuration
configuration) throws IOException {
- Configuration loadedConf = new Configuration(configuration);
- loadedConf.addResource(schedConf);
+ Configuration loadedConf = new Configuration(schedConf);
+ loadedConf.addResource(configuration);
return new CapacitySchedulerConfiguration(loadedConf, false);
}
@Override
public void mutateConfiguration(String user,
- Map<String, String> confUpdate) {
+ Map<String, String> confUpdate) throws IOException {
Configuration oldConf = new Configuration(schedConf);
LogMutation log = new LogMutation(confUpdate, user);
long id = confStore.logMutation(log);
for (Map.Entry<String, String> kv : confUpdate.entrySet()) {
- schedConf.set(kv.getKey(), kv.getValue());
+ if (kv.getValue() == null) {
+ schedConf.unset(kv.getKey());
+ } else {
+ schedConf.set(kv.getKey(), kv.getValue());
+ }
}
try {
rmContext.getScheduler().reinitialize(conf, rmContext);
} catch (IOException e) {
schedConf = oldConf;
confStore.confirmMutation(id, false);
- return;
+ throw e;
}
confStore.confirmMutation(id, true);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fbcc60ce/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
index 6dc3d9a..215e511 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
@@ -55,7 +55,8 @@ import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
-
+import com.google.common.base.Joiner;
+import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -134,11 +135,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
@@ -2454,4 +2458,170 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
GetContainersRequest request) throws YarnException, IOException {
return rm.getClientRMService().getContainers(request).getContainerList();
}
+
+ @PUT
+ @Path("/queues")
+ @Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
+ MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
+ @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ public Response updateSchedulerConfiguration(QueueConfigsUpdateInfo
+ mutationInfo, @Context HttpServletRequest hsr)
+ throws AuthorizationException, InterruptedException {
+ init();
+
+ UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
+ ApplicationACLsManager aclsManager = rm.getApplicationACLsManager();
+ if (aclsManager.areACLsEnabled()) {
+ if (callerUGI == null || !aclsManager.isAdmin(callerUGI)) {
+ String msg = "Only admins can carry out this operation.";
+ throw new ForbiddenException(msg);
+ }
+ }
+
+ ResourceScheduler scheduler = rm.getResourceScheduler();
+ if (scheduler instanceof MutableConfScheduler) {
+ try {
+ callerUGI.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws IOException, YarnException {
+ Map<String, String> confUpdate =
+ constructKeyValueConfUpdate(mutationInfo);
+ ((CapacityScheduler) scheduler).updateConfiguration(callerUGI,
+ confUpdate);
+ return null;
+ }
+ });
+ } catch (IOException e) {
+ return Response.status(Status.BAD_REQUEST).entity(e.getMessage())
+ .build();
+ }
+ return Response.status(Status.OK).entity("Configuration change " +
+ "successfully applied.").build();
+ } else {
+ return Response.status(Status.BAD_REQUEST)
+ .entity("Configuration change only supported by CapacityScheduler.")
+ .build();
+ }
+ }
+
+ private Map<String, String> constructKeyValueConfUpdate(
+ QueueConfigsUpdateInfo mutationInfo) throws IOException {
+ CapacitySchedulerConfiguration currentConf =
+ ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
+ CapacitySchedulerConfiguration proposedConf =
+ new CapacitySchedulerConfiguration(currentConf, false);
+ Map<String, String> confUpdate = new HashMap<>();
+ for (String queueToRemove : mutationInfo.getRemoveQueueInfo()) {
+ removeQueue(queueToRemove, proposedConf, confUpdate);
+ }
+ for (QueueConfigInfo addQueueInfo : mutationInfo.getAddQueueInfo()) {
+ addQueue(addQueueInfo, proposedConf, confUpdate);
+ }
+ for (QueueConfigInfo updateQueueInfo : mutationInfo.getUpdateQueueInfo()) {
+ updateQueue(updateQueueInfo, proposedConf, confUpdate);
+ }
+ return confUpdate;
+ }
+
+ private void removeQueue(
+ String queueToRemove, CapacitySchedulerConfiguration proposedConf,
+ Map<String, String> confUpdate) throws IOException {
+ if (queueToRemove == null) {
+ return;
+ } else {
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ String queueName = queueToRemove.substring(
+ queueToRemove.lastIndexOf('.') + 1);
+ CSQueue queue = cs.getQueue(queueName);
+ if (queue == null ||
+ !queue.getQueuePath().equals(queueToRemove)) {
+ throw new IOException("Queue " + queueToRemove + " not found");
+ } else if (queueToRemove.lastIndexOf('.') == -1) {
+ throw new IOException("Can't remove queue " + queueToRemove);
+ }
+ String parentQueuePath = queueToRemove.substring(0, queueToRemove
+ .lastIndexOf('.'));
+ String[] siblingQueues = proposedConf.getQueues(parentQueuePath);
+ List<String> newSiblingQueues = new ArrayList<>();
+ for (String siblingQueue : siblingQueues) {
+ if (!siblingQueue.equals(queueName)) {
+ newSiblingQueues.add(siblingQueue);
+ }
+ }
+ proposedConf.setQueues(parentQueuePath, newSiblingQueues
+ .toArray(new String[0]));
+ String queuesConfig = CapacitySchedulerConfiguration.PREFIX +
+ parentQueuePath + CapacitySchedulerConfiguration.DOT +
+ CapacitySchedulerConfiguration.QUEUES;
+ if (newSiblingQueues.size() == 0) {
+ confUpdate.put(queuesConfig, null);
+ } else {
+ confUpdate.put(queuesConfig, Joiner.on(',').join(newSiblingQueues));
+ }
+ for (Map.Entry<String, String> confRemove : proposedConf.getValByRegex(
+ ".*" + queueToRemove.replaceAll("\\.", "\\.") + "\\..*")
+ .entrySet()) {
+ proposedConf.unset(confRemove.getKey());
+ confUpdate.put(confRemove.getKey(), null);
+ }
+ }
+ }
+
+ private void addQueue(
+ QueueConfigInfo addInfo, CapacitySchedulerConfiguration proposedConf,
+ Map<String, String> confUpdate) throws IOException {
+ if (addInfo == null) {
+ return;
+ } else {
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ String queuePath = addInfo.getQueue();
+ String queueName = queuePath.substring(queuePath.lastIndexOf('.') + 1);
+ if (cs.getQueue(queueName) != null) {
+ throw new IOException("Can't add existing queue " + queuePath);
+ } else if (queuePath.lastIndexOf('.') == -1) {
+ throw new IOException("Can't add invalid queue " + queuePath);
+ }
+ String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.'));
+ String[] siblings = proposedConf.getQueues(parentQueue);
+ List<String> siblingQueues = siblings == null ? new ArrayList<>() :
+ new ArrayList<>(Arrays.<String>asList(siblings));
+ siblingQueues.add(queuePath.substring(queuePath.lastIndexOf('.') + 1));
+ proposedConf.setQueues(parentQueue,
+ siblingQueues.toArray(new String[0]));
+ confUpdate.put(CapacitySchedulerConfiguration.PREFIX +
+ parentQueue + CapacitySchedulerConfiguration.DOT +
+ CapacitySchedulerConfiguration.QUEUES,
+ Joiner.on(',').join(siblingQueues));
+ String keyPrefix = CapacitySchedulerConfiguration.PREFIX +
+ queuePath + CapacitySchedulerConfiguration.DOT;
+ for (Map.Entry<String, String> kv : addInfo.getParams().entrySet()) {
+ if (kv.getValue() == null) {
+ proposedConf.unset(keyPrefix + kv.getKey());
+ } else {
+ proposedConf.set(keyPrefix + kv.getKey(), kv.getValue());
+ }
+ confUpdate.put(keyPrefix + kv.getKey(), kv.getValue());
+ }
+ }
+ }
+
+ private void updateQueue(QueueConfigInfo updateInfo,
+ CapacitySchedulerConfiguration proposedConf,
+ Map<String, String> confUpdate) {
+ if (updateInfo == null) {
+ return;
+ } else {
+ String queuePath = updateInfo.getQueue();
+ String keyPrefix = CapacitySchedulerConfiguration.PREFIX +
+ queuePath + CapacitySchedulerConfiguration.DOT;
+ for (Map.Entry<String, String> kv : updateInfo.getParams().entrySet()) {
+ if (kv.getValue() == null) {
+ proposedConf.unset(keyPrefix + kv.getKey());
+ } else {
+ proposedConf.set(keyPrefix + kv.getKey(), kv.getValue());
+ }
+ confUpdate.put(keyPrefix + kv.getKey(), kv.getValue());
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fbcc60ce/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/QueueConfigInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/QueueConfigInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/QueueConfigInfo.java
new file mode 100644
index 0000000..b20eda6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/QueueConfigInfo.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * Information for adding or updating a queue to scheduler configuration
+ * for this queue.
+ */
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class QueueConfigInfo {
+
+ @XmlElement(name = "queueName")
+ private String queue;
+
+ private HashMap<String, String> params = new HashMap<>();
+
+ public QueueConfigInfo() { }
+
+ public QueueConfigInfo(String queue, Map<String, String> params) {
+ this.queue = queue;
+ this.params = new HashMap<>(params);
+ }
+
+ public String getQueue() {
+ return this.queue;
+ }
+
+ public HashMap<String, String> getParams() {
+ return this.params;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fbcc60ce/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/QueueConfigsUpdateInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/QueueConfigsUpdateInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/QueueConfigsUpdateInfo.java
new file mode 100644
index 0000000..644ec90
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/QueueConfigsUpdateInfo.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
+
+import java.util.ArrayList;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * Information for making scheduler configuration changes (supports adding,
+ * removing, or updating a queue).
+ */
+@XmlRootElement(name = "schedConf")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class QueueConfigsUpdateInfo {
+
+ @XmlElement(name = "add")
+ private ArrayList<QueueConfigInfo> addQueueInfo = new ArrayList<>();
+
+ @XmlElement(name = "remove")
+ private ArrayList<String> removeQueueInfo = new ArrayList<>();
+
+ @XmlElement(name = "update")
+ private ArrayList<QueueConfigInfo> updateQueueInfo = new ArrayList<>();
+
+ public QueueConfigsUpdateInfo() {
+ // JAXB needs this
+ }
+
+ public ArrayList<QueueConfigInfo> getAddQueueInfo() {
+ return addQueueInfo;
+ }
+
+ public ArrayList<String> getRemoveQueueInfo() {
+ return removeQueueInfo;
+ }
+
+ public ArrayList<QueueConfigInfo> getUpdateQueueInfo() {
+ return updateQueueInfo;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fbcc60ce/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java
index 3f103b1..254da31 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java
@@ -77,7 +77,11 @@ public class TestMutableCSConfigurationProvider {
assertNull(confProvider.loadConfiguration(conf).get("badKey"));
doThrow(new IOException()).when(cs).reinitialize(any(Configuration.class),
any(RMContext.class));
- confProvider.mutateConfiguration(TEST_USER, badUpdate);
+ try {
+ confProvider.mutateConfiguration(TEST_USER, badUpdate);
+ } catch (IOException e) {
+ // Expected exception.
+ }
assertNull(confProvider.loadConfiguration(conf).get("badKey"));
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fbcc60ce/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesConfigurationMutation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesConfigurationMutation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesConfigurationMutation.java
new file mode 100644
index 0000000..d149055
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesConfigurationMutation.java
@@ -0,0 +1,477 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp;
+
+import com.google.inject.Guice;
+import com.google.inject.servlet.ServletModule;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.json.JSONJAXBContext;
+import com.sun.jersey.api.json.JSONMarshaller;
+import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
+import com.sun.jersey.test.framework.WebAppDescriptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.QueueState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo;
+import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
+import org.apache.hadoop.yarn.webapp.GuiceServletConfig;
+import org.apache.hadoop.yarn.webapp.JerseyTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response.Status;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Test scheduler configuration mutation via REST API.
+ */
+public class TestRMWebServicesConfigurationMutation extends JerseyTestBase {
+
+ private static final File CONF_FILE = new File(new File("target",
+ "test-classes"), YarnConfiguration.CS_CONFIGURATION_FILE);
+ private static final File OLD_CONF_FILE = new File(new File("target",
+ "test-classes"), YarnConfiguration.CS_CONFIGURATION_FILE + ".tmp");
+
+ private static MockRM rm;
+ private static String userName;
+ private static CapacitySchedulerConfiguration csConf;
+ private static YarnConfiguration conf;
+
+ private static class WebServletModule extends ServletModule {
+ @Override
+ protected void configureServlets() {
+ bind(JAXBContextResolver.class);
+ bind(RMWebServices.class);
+ bind(GenericExceptionHandler.class);
+ try {
+ userName = UserGroupInformation.getCurrentUser().getShortUserName();
+ } catch (IOException ioe) {
+ throw new RuntimeException("Unable to get current user name "
+ + ioe.getMessage(), ioe);
+ }
+ csConf = new CapacitySchedulerConfiguration(new Configuration(false),
+ false);
+ setupQueueConfiguration(csConf);
+ conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ conf.set(CapacitySchedulerConfiguration.CS_CONF_PROVIDER,
+ CapacitySchedulerConfiguration.STORE_CS_CONF_PROVIDER);
+ conf.set(YarnConfiguration.YARN_ADMIN_ACL, userName);
+ try {
+ if (CONF_FILE.exists()) {
+ if (!CONF_FILE.renameTo(OLD_CONF_FILE)) {
+ throw new RuntimeException("Failed to rename conf file");
+ }
+ }
+ FileOutputStream out = new FileOutputStream(CONF_FILE);
+ csConf.writeXml(out);
+ out.close();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to write XML file", e);
+ }
+ rm = new MockRM(conf);
+ bind(ResourceManager.class).toInstance(rm);
+ serve("/*").with(GuiceContainer.class);
+ filter("/*").through(TestRMWebServicesAppsModification
+ .TestRMCustomAuthFilter.class);
+ }
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ GuiceServletConfig.setInjector(
+ Guice.createInjector(new WebServletModule()));
+ }
+
+ private static void setupQueueConfiguration(
+ CapacitySchedulerConfiguration config) {
+ config.setQueues(CapacitySchedulerConfiguration.ROOT,
+ new String[]{"a", "b", "c"});
+
+ final String a = CapacitySchedulerConfiguration.ROOT + ".a";
+ config.setCapacity(a, 25f);
+ config.setMaximumCapacity(a, 50f);
+
+ final String a1 = a + ".a1";
+ final String a2 = a + ".a2";
+ config.setQueues(a, new String[]{"a1", "a2"});
+ config.setCapacity(a1, 100f);
+ config.setCapacity(a2, 0f);
+
+ final String b = CapacitySchedulerConfiguration.ROOT + ".b";
+ config.setCapacity(b, 75f);
+
+ final String c = CapacitySchedulerConfiguration.ROOT + ".c";
+ config.setCapacity(c, 0f);
+
+ final String c1 = c + ".c1";
+ config.setQueues(c, new String[] {"c1"});
+ config.setCapacity(c1, 0f);
+ }
+
+ public TestRMWebServicesConfigurationMutation() {
+ super(new WebAppDescriptor.Builder(
+ "org.apache.hadoop.yarn.server.resourcemanager.webapp")
+ .contextListenerClass(GuiceServletConfig.class)
+ .filterClass(com.google.inject.servlet.GuiceFilter.class)
+ .contextPath("jersey-guice-filter").servletPath("/").build());
+ }
+
+ @Test
+ public void testAddNestedQueue() throws Exception {
+ WebResource r = resource();
+
+ ClientResponse response;
+
+ // Add parent queue root.d with two children d1 and d2.
+ QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
+ Map<String, String> d1Capacity = new HashMap<>();
+ d1Capacity.put(CapacitySchedulerConfiguration.CAPACITY, "25");
+ d1Capacity.put(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY, "25");
+ Map<String, String> nearEmptyCapacity = new HashMap<>();
+ nearEmptyCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "1E-4");
+ nearEmptyCapacity.put(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY,
+ "1E-4");
+ Map<String, String> d2Capacity = new HashMap<>();
+ d2Capacity.put(CapacitySchedulerConfiguration.CAPACITY, "75");
+ d2Capacity.put(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY, "75");
+ QueueConfigInfo d1 = new QueueConfigInfo("root.d.d1", d1Capacity);
+ QueueConfigInfo d2 = new QueueConfigInfo("root.d.d2", d2Capacity);
+ QueueConfigInfo d = new QueueConfigInfo("root.d", nearEmptyCapacity);
+ updateInfo.getAddQueueInfo().add(d1);
+ updateInfo.getAddQueueInfo().add(d2);
+ updateInfo.getAddQueueInfo().add(d);
+ response =
+ r.path("ws").path("v1").path("cluster")
+ .path("queues").queryParam("user.name", userName)
+ .accept(MediaType.APPLICATION_JSON)
+ .entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
+ MediaType.APPLICATION_JSON)
+ .put(ClientResponse.class);
+
+ assertEquals(Status.OK.getStatusCode(), response.getStatus());
+ CapacitySchedulerConfiguration newCSConf =
+ ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
+ assertEquals(4, newCSConf.getQueues("root").length);
+ assertEquals(2, newCSConf.getQueues("root.d").length);
+ assertEquals(25.0f, newCSConf.getNonLabeledQueueCapacity("root.d.d1"),
+ 0.01f);
+ assertEquals(75.0f, newCSConf.getNonLabeledQueueCapacity("root.d.d2"),
+ 0.01f);
+ }
+
+ @Test
+ public void testAddWithUpdate() throws Exception {
+ WebResource r = resource();
+
+ ClientResponse response;
+
+ // Add root.d with capacity 25, reducing root.b capacity from 75 to 50.
+ QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
+ Map<String, String> dCapacity = new HashMap<>();
+ dCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "25");
+ Map<String, String> bCapacity = new HashMap<>();
+ bCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "50");
+ QueueConfigInfo d = new QueueConfigInfo("root.d", dCapacity);
+ QueueConfigInfo b = new QueueConfigInfo("root.b", bCapacity);
+ updateInfo.getAddQueueInfo().add(d);
+ updateInfo.getUpdateQueueInfo().add(b);
+ response =
+ r.path("ws").path("v1").path("cluster")
+ .path("queues").queryParam("user.name", userName)
+ .accept(MediaType.APPLICATION_JSON)
+ .entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
+ MediaType.APPLICATION_JSON)
+ .put(ClientResponse.class);
+
+ assertEquals(Status.OK.getStatusCode(), response.getStatus());
+ CapacitySchedulerConfiguration newCSConf =
+ ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
+ assertEquals(4, newCSConf.getQueues("root").length);
+ assertEquals(25.0f, newCSConf.getNonLabeledQueueCapacity("root.d"), 0.01f);
+ assertEquals(50.0f, newCSConf.getNonLabeledQueueCapacity("root.b"), 0.01f);
+ }
+
+ @Test
+ public void testRemoveQueue() throws Exception {
+ WebResource r = resource();
+
+ ClientResponse response;
+
+ stopQueue("root.a.a2");
+ // Remove root.a.a2
+ QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
+ updateInfo.getRemoveQueueInfo().add("root.a.a2");
+ response =
+ r.path("ws").path("v1").path("cluster")
+ .path("queues").queryParam("user.name", userName)
+ .accept(MediaType.APPLICATION_JSON)
+ .entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
+ MediaType.APPLICATION_JSON)
+ .put(ClientResponse.class);
+
+ assertEquals(Status.OK.getStatusCode(), response.getStatus());
+ CapacitySchedulerConfiguration newCSConf =
+ ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
+ assertEquals(1, newCSConf.getQueues("root.a").length);
+ assertEquals("a1", newCSConf.getQueues("root.a")[0]);
+ }
+
+ @Test
+ public void testRemoveParentQueue() throws Exception {
+ WebResource r = resource();
+
+ ClientResponse response;
+
+ stopQueue("root.c", "root.c.c1");
+ // Remove root.c (parent queue)
+ QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
+ updateInfo.getRemoveQueueInfo().add("root.c");
+ response =
+ r.path("ws").path("v1").path("cluster")
+ .path("queues").queryParam("user.name", userName)
+ .accept(MediaType.APPLICATION_JSON)
+ .entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
+ MediaType.APPLICATION_JSON)
+ .put(ClientResponse.class);
+
+ assertEquals(Status.OK.getStatusCode(), response.getStatus());
+ CapacitySchedulerConfiguration newCSConf =
+ ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
+ assertEquals(2, newCSConf.getQueues("root").length);
+ assertNull(newCSConf.getQueues("root.c"));
+ }
+
+ @Test
+ public void testRemoveParentQueueWithCapacity() throws Exception {
+ WebResource r = resource();
+
+ ClientResponse response;
+
+ stopQueue("root.a", "root.a.a1", "root.a.a2");
+ // Remove root.a (parent queue) with capacity 25
+ QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
+ updateInfo.getRemoveQueueInfo().add("root.a");
+
+ // Set root.b capacity to 100
+ Map<String, String> bCapacity = new HashMap<>();
+ bCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "100");
+ QueueConfigInfo b = new QueueConfigInfo("root.b", bCapacity);
+ updateInfo.getUpdateQueueInfo().add(b);
+ response =
+ r.path("ws").path("v1").path("cluster")
+ .path("queues").queryParam("user.name", userName)
+ .accept(MediaType.APPLICATION_JSON)
+ .entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
+ MediaType.APPLICATION_JSON)
+ .put(ClientResponse.class);
+
+ assertEquals(Status.OK.getStatusCode(), response.getStatus());
+ CapacitySchedulerConfiguration newCSConf =
+ ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
+ assertEquals(2, newCSConf.getQueues("root").length);
+ assertEquals(100.0f, newCSConf.getNonLabeledQueueCapacity("root.b"),
+ 0.01f);
+ }
+
+ @Test
+ public void testRemoveMultipleQueues() throws Exception {
+ WebResource r = resource();
+
+ ClientResponse response;
+
+ stopQueue("root.b", "root.c", "root.c.c1");
+ // Remove root.b and root.c
+ QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
+ updateInfo.getRemoveQueueInfo().add("root.b");
+ updateInfo.getRemoveQueueInfo().add("root.c");
+ Map<String, String> aCapacity = new HashMap<>();
+ aCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "100");
+ aCapacity.put(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY, "100");
+ QueueConfigInfo configInfo = new QueueConfigInfo("root.a", aCapacity);
+ updateInfo.getUpdateQueueInfo().add(configInfo);
+ response =
+ r.path("ws").path("v1").path("cluster")
+ .path("queues").queryParam("user.name", userName)
+ .accept(MediaType.APPLICATION_JSON)
+ .entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
+ MediaType.APPLICATION_JSON)
+ .put(ClientResponse.class);
+
+ assertEquals(Status.OK.getStatusCode(), response.getStatus());
+ CapacitySchedulerConfiguration newCSConf =
+ ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
+ assertEquals(1, newCSConf.getQueues("root").length);
+ }
+
+ private void stopQueue(String... queuePaths) throws Exception {
+ WebResource r = resource();
+
+ ClientResponse response;
+
+ // Set state of queues to STOPPED.
+ QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
+ Map<String, String> stoppedParam = new HashMap<>();
+ stoppedParam.put(CapacitySchedulerConfiguration.STATE,
+ QueueState.STOPPED.toString());
+ for (String queue : queuePaths) {
+ QueueConfigInfo stoppedInfo = new QueueConfigInfo(queue, stoppedParam);
+ updateInfo.getUpdateQueueInfo().add(stoppedInfo);
+ }
+ response =
+ r.path("ws").path("v1").path("cluster")
+ .path("queues").queryParam("user.name", userName)
+ .accept(MediaType.APPLICATION_JSON)
+ .entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
+ MediaType.APPLICATION_JSON)
+ .put(ClientResponse.class);
+ assertEquals(Status.OK.getStatusCode(), response.getStatus());
+ CapacitySchedulerConfiguration newCSConf =
+ ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
+ for (String queue : queuePaths) {
+ assertEquals(QueueState.STOPPED, newCSConf.getState(queue));
+ }
+ }
+
+ @Test
+ public void testUpdateQueue() throws Exception {
+ WebResource r = resource();
+
+ ClientResponse response;
+
+ // Update config value.
+ QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
+ Map<String, String> updateParam = new HashMap<>();
+ updateParam.put(CapacitySchedulerConfiguration.MAXIMUM_AM_RESOURCE_SUFFIX,
+ "0.2");
+ QueueConfigInfo aUpdateInfo = new QueueConfigInfo("root.a", updateParam);
+ updateInfo.getUpdateQueueInfo().add(aUpdateInfo);
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+ assertEquals(CapacitySchedulerConfiguration
+ .DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT,
+ cs.getConfiguration()
+ .getMaximumApplicationMasterResourcePerQueuePercent("root.a"),
+ 0.001f);
+ response =
+ r.path("ws").path("v1").path("cluster")
+ .path("queues").queryParam("user.name", userName)
+ .accept(MediaType.APPLICATION_JSON)
+ .entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
+ MediaType.APPLICATION_JSON)
+ .put(ClientResponse.class);
+ assertEquals(Status.OK.getStatusCode(), response.getStatus());
+ CapacitySchedulerConfiguration newCSConf = cs.getConfiguration();
+ assertEquals(0.2f, newCSConf
+ .getMaximumApplicationMasterResourcePerQueuePercent("root.a"), 0.001f);
+
+ // Remove config. Config value should be reverted to default.
+ updateParam.put(CapacitySchedulerConfiguration.MAXIMUM_AM_RESOURCE_SUFFIX,
+ null);
+ aUpdateInfo = new QueueConfigInfo("root.a", updateParam);
+ updateInfo.getUpdateQueueInfo().clear();
+ updateInfo.getUpdateQueueInfo().add(aUpdateInfo);
+ response =
+ r.path("ws").path("v1").path("cluster")
+ .path("queues").queryParam("user.name", userName)
+ .accept(MediaType.APPLICATION_JSON)
+ .entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
+ MediaType.APPLICATION_JSON)
+ .put(ClientResponse.class);
+ assertEquals(Status.OK.getStatusCode(), response.getStatus());
+ newCSConf = cs.getConfiguration();
+ assertEquals(CapacitySchedulerConfiguration
+ .DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT, newCSConf
+ .getMaximumApplicationMasterResourcePerQueuePercent("root.a"),
+ 0.001f);
+ }
+
+ @Test
+ public void testUpdateQueueCapacity() throws Exception {
+ WebResource r = resource();
+
+ ClientResponse response;
+
+ // Update root.a and root.b capacity to 50.
+ QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
+ Map<String, String> updateParam = new HashMap<>();
+ updateParam.put(CapacitySchedulerConfiguration.CAPACITY, "50");
+ QueueConfigInfo aUpdateInfo = new QueueConfigInfo("root.a", updateParam);
+ QueueConfigInfo bUpdateInfo = new QueueConfigInfo("root.b", updateParam);
+ updateInfo.getUpdateQueueInfo().add(aUpdateInfo);
+ updateInfo.getUpdateQueueInfo().add(bUpdateInfo);
+
+ response =
+ r.path("ws").path("v1").path("cluster")
+ .path("queues").queryParam("user.name", userName)
+ .accept(MediaType.APPLICATION_JSON)
+ .entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
+ MediaType.APPLICATION_JSON)
+ .put(ClientResponse.class);
+ assertEquals(Status.OK.getStatusCode(), response.getStatus());
+ CapacitySchedulerConfiguration newCSConf =
+ ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
+ assertEquals(50.0f, newCSConf.getNonLabeledQueueCapacity("root.a"), 0.01f);
+ assertEquals(50.0f, newCSConf.getNonLabeledQueueCapacity("root.b"), 0.01f);
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ if (rm != null) {
+ rm.stop();
+ }
+ CONF_FILE.delete();
+ if (!OLD_CONF_FILE.renameTo(CONF_FILE)) {
+ throw new RuntimeException("Failed to re-copy old configuration file");
+ }
+ super.tearDown();
+ }
+
+ @SuppressWarnings("rawtypes")
+ private String toJson(Object nsli, Class klass) throws Exception {
+ StringWriter sw = new StringWriter();
+ JSONJAXBContext ctx = new JSONJAXBContext(klass);
+ JSONMarshaller jm = ctx.createJSONMarshaller();
+ jm.marshallToJSON(nsli, sw);
+ return sw.toString();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org