You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sn...@apache.org on 2022/11/12 17:36:02 UTC
[hadoop] branch trunk updated: YARN-10005. Code improvements in MutableCSConfigurationProvider. Contributed by Peter Szucs
This is an automated email from the ASF dual-hosted git repository.
snemeth pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 22c9f28f4d8 YARN-10005. Code improvements in MutableCSConfigurationProvider. Contributed by Peter Szucs
22c9f28f4d8 is described below
commit 22c9f28f4d85a92d23024b03c9094edb8e56f369
Author: Szilard Nemeth <sn...@apache.org>
AuthorDate: Sat Nov 12 18:35:07 2022 +0100
YARN-10005. Code improvements in MutableCSConfigurationProvider. Contributed by Peter Szucs
---
.../conf/ConfigurationUpdateAssembler.java | 181 ++++++++++++++++++++
.../conf/MutableCSConfigurationProvider.java | 188 ++-------------------
.../conf/TestConfigurationUpdateAssembler.java | 173 +++++++++++++++++++
3 files changed, 370 insertions(+), 172 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationUpdateAssembler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationUpdateAssembler.java
new file mode 100644
index 00000000000..88c93019680
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationUpdateAssembler.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
+import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ORDERING_POLICY;
+
+public final class ConfigurationUpdateAssembler {
+
+ private ConfigurationUpdateAssembler() {
+ }
+
+ public static Map<String, String> constructKeyValueConfUpdate(
+ CapacitySchedulerConfiguration proposedConf,
+ SchedConfUpdateInfo mutationInfo) throws IOException {
+
+ Map<String, String> confUpdate = new HashMap<>();
+ for (String queueToRemove : mutationInfo.getRemoveQueueInfo()) {
+ removeQueue(queueToRemove, proposedConf, confUpdate);
+ }
+ for (QueueConfigInfo addQueueInfo : mutationInfo.getAddQueueInfo()) {
+ addQueue(addQueueInfo, proposedConf, confUpdate);
+ }
+ for (QueueConfigInfo updateQueueInfo : mutationInfo.getUpdateQueueInfo()) {
+ updateQueue(updateQueueInfo, proposedConf, confUpdate);
+ }
+ for (Map.Entry<String, String> global : mutationInfo.getGlobalParams()
+ .entrySet()) {
+ confUpdate.put(global.getKey(), global.getValue());
+ }
+ return confUpdate;
+ }
+
+ private static void removeQueue(
+ String queueToRemove, CapacitySchedulerConfiguration proposedConf,
+ Map<String, String> confUpdate) throws IOException {
+ if (queueToRemove == null) {
+ return;
+ }
+ if (queueToRemove.lastIndexOf('.') == -1) {
+ throw new IOException("Can't remove queue " + queueToRemove);
+ }
+ String queueName = queueToRemove.substring(
+ queueToRemove.lastIndexOf('.') + 1);
+ List<String> siblingQueues = getSiblingQueues(queueToRemove,
+ proposedConf);
+ if (!siblingQueues.contains(queueName)) {
+ throw new IOException("Queue " + queueToRemove + " not found");
+ }
+ siblingQueues.remove(queueName);
+ String parentQueuePath = queueToRemove.substring(0, queueToRemove
+ .lastIndexOf('.'));
+ proposedConf.setQueues(parentQueuePath, siblingQueues.toArray(
+ new String[0]));
+ String queuesConfig = CapacitySchedulerConfiguration.PREFIX
+ + parentQueuePath + CapacitySchedulerConfiguration.DOT
+ + CapacitySchedulerConfiguration.QUEUES;
+ if (siblingQueues.isEmpty()) {
+ confUpdate.put(queuesConfig, null);
+ // Unset Ordering Policy of Leaf Queue converted from
+ // Parent Queue after removeQueue
+ String queueOrderingPolicy = CapacitySchedulerConfiguration.PREFIX
+ + parentQueuePath + CapacitySchedulerConfiguration.DOT
+ + ORDERING_POLICY;
+ proposedConf.unset(queueOrderingPolicy);
+ confUpdate.put(queueOrderingPolicy, null);
+ } else {
+ confUpdate.put(queuesConfig, Joiner.on(',').join(siblingQueues));
+ }
+ for (Map.Entry<String, String> confRemove : proposedConf.getValByRegex(
+ ".*" + queueToRemove + "\\..*")
+ .entrySet()) {
+ proposedConf.unset(confRemove.getKey());
+ confUpdate.put(confRemove.getKey(), null);
+ }
+ }
+
+ private static void addQueue(
+ QueueConfigInfo addInfo, CapacitySchedulerConfiguration proposedConf,
+ Map<String, String> confUpdate) throws IOException {
+ if (addInfo == null) {
+ return;
+ }
+ String queuePath = addInfo.getQueue();
+ String queueName = queuePath.substring(queuePath.lastIndexOf('.') + 1);
+ if (queuePath.lastIndexOf('.') == -1) {
+ throw new IOException("Can't add invalid queue " + queuePath);
+ } else if (getSiblingQueues(queuePath, proposedConf).contains(
+ queueName)) {
+ throw new IOException("Can't add existing queue " + queuePath);
+ }
+ String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.'));
+ String[] siblings = proposedConf.getQueues(parentQueue);
+ List<String> siblingQueues = siblings == null ? new ArrayList<>() :
+ new ArrayList<>(Arrays.asList(siblings));
+ siblingQueues.add(queuePath.substring(queuePath.lastIndexOf('.') + 1));
+ proposedConf.setQueues(parentQueue,
+ siblingQueues.toArray(new String[0]));
+ confUpdate.put(CapacitySchedulerConfiguration.PREFIX
+ + parentQueue + CapacitySchedulerConfiguration.DOT
+ + CapacitySchedulerConfiguration.QUEUES,
+ Joiner.on(',').join(siblingQueues));
+ String keyPrefix = CapacitySchedulerConfiguration.PREFIX
+ + queuePath + CapacitySchedulerConfiguration.DOT;
+ for (Map.Entry<String, String> kv : addInfo.getParams().entrySet()) {
+ String keyValue = kv.getValue();
+ if (keyValue == null || keyValue.isEmpty()) {
+ proposedConf.unset(keyPrefix + kv.getKey());
+ confUpdate.put(keyPrefix + kv.getKey(), null);
+ } else {
+ proposedConf.set(keyPrefix + kv.getKey(), keyValue);
+ confUpdate.put(keyPrefix + kv.getKey(), keyValue);
+ }
+ }
+ // Unset Ordering Policy of Parent Queue converted from
+ // Leaf Queue after addQueue
+ String queueOrderingPolicy = CapacitySchedulerConfiguration.PREFIX
+ + parentQueue + CapacitySchedulerConfiguration.DOT + ORDERING_POLICY;
+ if (siblingQueues.size() == 1) {
+ proposedConf.unset(queueOrderingPolicy);
+ confUpdate.put(queueOrderingPolicy, null);
+ }
+ }
+
+ private static void updateQueue(QueueConfigInfo updateInfo,
+ CapacitySchedulerConfiguration proposedConf,
+ Map<String, String> confUpdate) {
+ if (updateInfo == null) {
+ return;
+ }
+ String queuePath = updateInfo.getQueue();
+ String keyPrefix = CapacitySchedulerConfiguration.PREFIX
+ + queuePath + CapacitySchedulerConfiguration.DOT;
+ for (Map.Entry<String, String> kv : updateInfo.getParams().entrySet()) {
+ String keyValue = kv.getValue();
+ if (keyValue == null || keyValue.isEmpty()) {
+ proposedConf.unset(keyPrefix + kv.getKey());
+ confUpdate.put(keyPrefix + kv.getKey(), null);
+ } else {
+ proposedConf.set(keyPrefix + kv.getKey(), keyValue);
+ confUpdate.put(keyPrefix + kv.getKey(), keyValue);
+ }
+ }
+ }
+
+ private static List<String> getSiblingQueues(String queuePath, Configuration conf) {
+ String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.'));
+ String childQueuesKey = CapacitySchedulerConfiguration.PREFIX +
+ parentQueue + CapacitySchedulerConfiguration.DOT +
+ CapacitySchedulerConfiguration.QUEUES;
+ return new ArrayList<>(conf.getTrimmedStringCollection(childQueuesKey));
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
index c4cb273b495..cbd217f4f2b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import org.apache.hadoop.classification.VisibleForTesting;
-import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -31,19 +30,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMuta
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
-import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ORDERING_POLICY;
-
/**
* CS configuration provider which implements
* {@link MutableConfigurationProvider} for modifying capacity scheduler
@@ -79,15 +71,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
@Override
public void init(Configuration config) throws IOException {
this.confStore = YarnConfigurationStoreFactory.getStore(config);
- Configuration initialSchedConf = getInitSchedulerConfig();
- initialSchedConf.addResource(YarnConfiguration.CS_CONFIGURATION_FILE);
- this.schedConf = new Configuration(false);
- // We need to explicitly set the key-values in schedConf, otherwise
- // these configuration keys cannot be deleted when
- // configuration is reloaded.
- for (Map.Entry<String, String> kv : initialSchedConf) {
- schedConf.set(kv.getKey(), kv.getValue());
- }
+ initializeSchedConf();
try {
confStore.initialize(config, schedConf, rmContext);
confStore.checkVersion();
@@ -108,7 +92,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
}
@VisibleForTesting
- public YarnConfigurationStore getConfStore() {
+ protected YarnConfigurationStore getConfStore() {
return confStore;
}
@@ -142,7 +126,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
CapacitySchedulerConfiguration proposedConf =
new CapacitySchedulerConfiguration(schedConf, false);
Map<String, String> kvUpdate
- = constructKeyValueConfUpdate(proposedConf, confUpdate);
+ = ConfigurationUpdateAssembler.constructKeyValueConfUpdate(proposedConf, confUpdate);
LogMutation log = new LogMutation(kvUpdate, user.getShortUserName());
confStore.logMutation(log);
applyMutation(proposedConf, kvUpdate);
@@ -155,7 +139,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
CapacitySchedulerConfiguration proposedConf =
new CapacitySchedulerConfiguration(oldConfiguration, false);
Map<String, String> kvUpdate
- = constructKeyValueConfUpdate(proposedConf, confUpdate);
+ = ConfigurationUpdateAssembler.constructKeyValueConfUpdate(proposedConf, confUpdate);
applyMutation(proposedConf, kvUpdate);
return proposedConf;
}
@@ -177,15 +161,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
try {
confStore.format();
oldConf = new Configuration(schedConf);
- Configuration initialSchedConf = new Configuration(false);
- initialSchedConf.addResource(YarnConfiguration.CS_CONFIGURATION_FILE);
- this.schedConf = new Configuration(false);
- // We need to explicitly set the key-values in schedConf, otherwise
- // these configuration keys cannot be deleted when
- // configuration is reloaded.
- for (Map.Entry<String, String> kv : initialSchedConf) {
- schedConf.set(kv.getKey(), kv.getValue());
- }
+ initializeSchedConf();
confStore.initialize(config, schedConf, rmContext);
confStore.checkVersion();
} catch (Exception e) {
@@ -195,6 +171,17 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
}
}
+ private void initializeSchedConf() {
+ Configuration initialSchedConf = getInitSchedulerConfig();
+ this.schedConf = new Configuration(false);
+ // We need to explicitly set the key-values in schedConf, otherwise
+ // these configuration keys cannot be deleted when
+ // configuration is reloaded.
+ for (Map.Entry<String, String> kv : initialSchedConf) {
+ schedConf.set(kv.getKey(), kv.getValue());
+ }
+ }
+
@Override
public void revertToOldConfig(Configuration config) throws Exception {
formatLock.writeLock().lock();
@@ -233,147 +220,4 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
formatLock.readLock().unlock();
}
}
-
- private List<String> getSiblingQueues(String queuePath, Configuration conf) {
- String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.'));
- String childQueuesKey = CapacitySchedulerConfiguration.PREFIX +
- parentQueue + CapacitySchedulerConfiguration.DOT +
- CapacitySchedulerConfiguration.QUEUES;
- return new ArrayList<>(conf.getTrimmedStringCollection(childQueuesKey));
- }
-
- private Map<String, String> constructKeyValueConfUpdate(
- CapacitySchedulerConfiguration proposedConf,
- SchedConfUpdateInfo mutationInfo) throws IOException {
-
- Map<String, String> confUpdate = new HashMap<>();
- for (String queueToRemove : mutationInfo.getRemoveQueueInfo()) {
- removeQueue(queueToRemove, proposedConf, confUpdate);
- }
- for (QueueConfigInfo addQueueInfo : mutationInfo.getAddQueueInfo()) {
- addQueue(addQueueInfo, proposedConf, confUpdate);
- }
- for (QueueConfigInfo updateQueueInfo : mutationInfo.getUpdateQueueInfo()) {
- updateQueue(updateQueueInfo, proposedConf, confUpdate);
- }
- for (Map.Entry<String, String> global : mutationInfo.getGlobalParams()
- .entrySet()) {
- confUpdate.put(global.getKey(), global.getValue());
- }
- return confUpdate;
- }
-
- private void removeQueue(
- String queueToRemove, CapacitySchedulerConfiguration proposedConf,
- Map<String, String> confUpdate) throws IOException {
- if (queueToRemove == null) {
- return;
- } else {
- String queueName = queueToRemove.substring(
- queueToRemove.lastIndexOf('.') + 1);
- if (queueToRemove.lastIndexOf('.') == -1) {
- throw new IOException("Can't remove queue " + queueToRemove);
- } else {
- List<String> siblingQueues = getSiblingQueues(queueToRemove,
- proposedConf);
- if (!siblingQueues.contains(queueName)) {
- throw new IOException("Queue " + queueToRemove + " not found");
- }
- siblingQueues.remove(queueName);
- String parentQueuePath = queueToRemove.substring(0, queueToRemove
- .lastIndexOf('.'));
- proposedConf.setQueues(parentQueuePath, siblingQueues.toArray(
- new String[0]));
- String queuesConfig = CapacitySchedulerConfiguration.PREFIX
- + parentQueuePath + CapacitySchedulerConfiguration.DOT
- + CapacitySchedulerConfiguration.QUEUES;
- if (siblingQueues.size() == 0) {
- confUpdate.put(queuesConfig, null);
- // Unset Ordering Policy of Leaf Queue converted from
- // Parent Queue after removeQueue
- String queueOrderingPolicy = CapacitySchedulerConfiguration.PREFIX
- + parentQueuePath + CapacitySchedulerConfiguration.DOT
- + ORDERING_POLICY;
- proposedConf.unset(queueOrderingPolicy);
- confUpdate.put(queueOrderingPolicy, null);
- } else {
- confUpdate.put(queuesConfig, Joiner.on(',').join(siblingQueues));
- }
- for (Map.Entry<String, String> confRemove : proposedConf.getValByRegex(
- ".*" + queueToRemove.replaceAll("\\.", "\\.") + "\\..*")
- .entrySet()) {
- proposedConf.unset(confRemove.getKey());
- confUpdate.put(confRemove.getKey(), null);
- }
- }
- }
- }
-
- private void addQueue(
- QueueConfigInfo addInfo, CapacitySchedulerConfiguration proposedConf,
- Map<String, String> confUpdate) throws IOException {
- if (addInfo == null) {
- return;
- } else {
- String queuePath = addInfo.getQueue();
- String queueName = queuePath.substring(queuePath.lastIndexOf('.') + 1);
- if (queuePath.lastIndexOf('.') == -1) {
- throw new IOException("Can't add invalid queue " + queuePath);
- } else if (getSiblingQueues(queuePath, proposedConf).contains(
- queueName)) {
- throw new IOException("Can't add existing queue " + queuePath);
- }
- String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.'));
- String[] siblings = proposedConf.getQueues(parentQueue);
- List<String> siblingQueues = siblings == null ? new ArrayList<>() :
- new ArrayList<>(Arrays.<String>asList(siblings));
- siblingQueues.add(queuePath.substring(queuePath.lastIndexOf('.') + 1));
- proposedConf.setQueues(parentQueue,
- siblingQueues.toArray(new String[0]));
- confUpdate.put(CapacitySchedulerConfiguration.PREFIX
- + parentQueue + CapacitySchedulerConfiguration.DOT
- + CapacitySchedulerConfiguration.QUEUES,
- Joiner.on(',').join(siblingQueues));
- String keyPrefix = CapacitySchedulerConfiguration.PREFIX
- + queuePath + CapacitySchedulerConfiguration.DOT;
- for (Map.Entry<String, String> kv : addInfo.getParams().entrySet()) {
- if (kv.getValue() == null) {
- proposedConf.unset(keyPrefix + kv.getKey());
- } else {
- proposedConf.set(keyPrefix + kv.getKey(), kv.getValue());
- }
- confUpdate.put(keyPrefix + kv.getKey(), kv.getValue());
- }
- // Unset Ordering Policy of Parent Queue converted from
- // Leaf Queue after addQueue
- String queueOrderingPolicy = CapacitySchedulerConfiguration.PREFIX
- + parentQueue + CapacitySchedulerConfiguration.DOT + ORDERING_POLICY;
- if (siblingQueues.size() == 1) {
- proposedConf.unset(queueOrderingPolicy);
- confUpdate.put(queueOrderingPolicy, null);
- }
- }
- }
-
- private void updateQueue(QueueConfigInfo updateInfo,
- CapacitySchedulerConfiguration proposedConf,
- Map<String, String> confUpdate) {
- if (updateInfo == null) {
- return;
- } else {
- String queuePath = updateInfo.getQueue();
- String keyPrefix = CapacitySchedulerConfiguration.PREFIX
- + queuePath + CapacitySchedulerConfiguration.DOT;
- for (Map.Entry<String, String> kv : updateInfo.getParams().entrySet()) {
- String keyValue = kv.getValue();
- if (keyValue == null || keyValue.isEmpty()) {
- keyValue = null;
- proposedConf.unset(keyPrefix + kv.getKey());
- } else {
- proposedConf.set(keyPrefix + kv.getKey(), keyValue);
- }
- confUpdate.put(keyPrefix + kv.getKey(), keyValue);
- }
- }
- }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestConfigurationUpdateAssembler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestConfigurationUpdateAssembler.java
new file mode 100644
index 00000000000..890996ac23e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestConfigurationUpdateAssembler.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
+import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests {@link ConfigurationUpdateAssembler}.
+ */
+public class TestConfigurationUpdateAssembler {
+
+ private static final String A_PATH = "root.a";
+ private static final String B_PATH = "root.b";
+ private static final String C_PATH = "root.c";
+
+ private static final String CONFIG_NAME = "testConfigName";
+ private static final String A_CONFIG_PATH = CapacitySchedulerConfiguration.PREFIX + A_PATH +
+ CapacitySchedulerConfiguration.DOT + CONFIG_NAME;
+ private static final String B_CONFIG_PATH = CapacitySchedulerConfiguration.PREFIX + B_PATH +
+ CapacitySchedulerConfiguration.DOT + CONFIG_NAME;
+ private static final String C_CONFIG_PATH = CapacitySchedulerConfiguration.PREFIX + C_PATH +
+ CapacitySchedulerConfiguration.DOT + CONFIG_NAME;
+ private static final String ROOT_QUEUES_PATH = CapacitySchedulerConfiguration.PREFIX +
+ CapacitySchedulerConfiguration.ROOT + CapacitySchedulerConfiguration.DOT +
+ CapacitySchedulerConfiguration.QUEUES;
+
+ private static final String A_INIT_CONFIG_VALUE = "aInitValue";
+ private static final String A_CONFIG_VALUE = "aValue";
+ private static final String B_INIT_CONFIG_VALUE = "bInitValue";
+ private static final String B_CONFIG_VALUE = "bValue";
+ private static final String C_CONFIG_VALUE = "cValue";
+
+ private CapacitySchedulerConfiguration csConfig;
+
+ @Before
+ public void setUp() {
+ csConfig = crateInitialCSConfig();
+ }
+
+ @Test
+ public void testAddQueue() throws Exception {
+ SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
+ Map<String, String> updateMap = new HashMap<>();
+ updateMap.put(CONFIG_NAME, C_CONFIG_VALUE);
+ QueueConfigInfo queueConfigInfo = new QueueConfigInfo(C_PATH, updateMap);
+ updateInfo.getAddQueueInfo().add(queueConfigInfo);
+
+ Map<String, String> configurationUpdate =
+ ConfigurationUpdateAssembler.constructKeyValueConfUpdate(csConfig, updateInfo);
+
+ assertEquals(C_CONFIG_VALUE, configurationUpdate.get(C_CONFIG_PATH));
+ assertEquals("a,b,c", configurationUpdate.get(ROOT_QUEUES_PATH));
+ }
+
+ @Test
+ public void testAddExistingQueue() {
+ SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
+ Map<String, String> updateMap = new HashMap<>();
+ updateMap.put(CONFIG_NAME, A_CONFIG_VALUE);
+ QueueConfigInfo queueConfigInfo = new QueueConfigInfo(A_PATH, updateMap);
+ updateInfo.getAddQueueInfo().add(queueConfigInfo);
+
+ assertThrows(IOException.class, () -> {
+ ConfigurationUpdateAssembler.constructKeyValueConfUpdate(csConfig, updateInfo);
+ });
+ }
+
+ @Test
+ public void testAddInvalidQueue() {
+ SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
+ Map<String, String> updateMap = new HashMap<>();
+ updateMap.put(CONFIG_NAME, A_CONFIG_VALUE);
+ QueueConfigInfo queueConfigInfo = new QueueConfigInfo("invalidPath", updateMap);
+ updateInfo.getAddQueueInfo().add(queueConfigInfo);
+
+ assertThrows(IOException.class, () -> {
+ ConfigurationUpdateAssembler.constructKeyValueConfUpdate(csConfig, updateInfo);
+ });
+ }
+
+ @Test
+ public void testUpdateQueue() throws Exception {
+ SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
+ Map<String, String> updateMap = new HashMap<>();
+ updateMap.put(CONFIG_NAME, A_CONFIG_VALUE);
+ QueueConfigInfo queueAConfigInfo = new QueueConfigInfo(A_PATH, updateMap);
+ updateInfo.getUpdateQueueInfo().add(queueAConfigInfo);
+
+ Map<String, String> updateMapQueueB = new HashMap<>();
+ updateMapQueueB.put(CONFIG_NAME, B_CONFIG_VALUE);
+ QueueConfigInfo queueBConfigInfo = new QueueConfigInfo(B_PATH, updateMapQueueB);
+
+ updateInfo.getUpdateQueueInfo().add(queueBConfigInfo);
+
+ Map<String, String> configurationUpdate =
+ ConfigurationUpdateAssembler.constructKeyValueConfUpdate(csConfig, updateInfo);
+
+ assertEquals(A_CONFIG_VALUE, configurationUpdate.get(A_CONFIG_PATH));
+ assertEquals(B_CONFIG_VALUE, configurationUpdate.get(B_CONFIG_PATH));
+ }
+
+ @Test
+ public void testRemoveQueue() throws Exception {
+ SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
+ updateInfo.getRemoveQueueInfo().add(A_PATH);
+
+ Map<String, String> configurationUpdate =
+ ConfigurationUpdateAssembler.constructKeyValueConfUpdate(csConfig, updateInfo);
+
+ assertTrue(configurationUpdate.containsKey(A_CONFIG_PATH));
+ assertNull(configurationUpdate.get(A_CONFIG_PATH));
+ assertEquals("b", configurationUpdate.get(ROOT_QUEUES_PATH));
+ }
+
+ @Test
+ public void testRemoveInvalidQueue() {
+ SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
+ updateInfo.getRemoveQueueInfo().add("invalidPath");
+
+ assertThrows(IOException.class, () -> {
+ ConfigurationUpdateAssembler.constructKeyValueConfUpdate(csConfig, updateInfo);
+ });
+ }
+
+ @Test
+ public void testRemoveNonExistingQueue() {
+ SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
+ updateInfo.getRemoveQueueInfo().add("root.d");
+
+ assertThrows(IOException.class, () -> {
+ ConfigurationUpdateAssembler.constructKeyValueConfUpdate(csConfig, updateInfo);
+ });
+ }
+
+ private CapacitySchedulerConfiguration crateInitialCSConfig() {
+ CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a, b"});
+
+ csConf.set(A_CONFIG_PATH, A_INIT_CONFIG_VALUE);
+ csConf.set(B_CONFIG_PATH, B_INIT_CONFIG_VALUE);
+
+ return csConf;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org