You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sn...@apache.org on 2022/11/12 17:36:02 UTC

[hadoop] branch trunk updated: YARN-10005. Code improvements in MutableCSConfigurationProvider. Contributed by Peter Szucs

This is an automated email from the ASF dual-hosted git repository.

snemeth pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 22c9f28f4d8 YARN-10005. Code improvements in MutableCSConfigurationProvider. Contributed by Peter Szucs
22c9f28f4d8 is described below

commit 22c9f28f4d85a92d23024b03c9094edb8e56f369
Author: Szilard Nemeth <sn...@apache.org>
AuthorDate: Sat Nov 12 18:35:07 2022 +0100

    YARN-10005. Code improvements in MutableCSConfigurationProvider. Contributed by Peter Szucs
---
 .../conf/ConfigurationUpdateAssembler.java         | 181 ++++++++++++++++++++
 .../conf/MutableCSConfigurationProvider.java       | 188 ++-------------------
 .../conf/TestConfigurationUpdateAssembler.java     | 173 +++++++++++++++++++
 3 files changed, 370 insertions(+), 172 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationUpdateAssembler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationUpdateAssembler.java
new file mode 100644
index 00000000000..88c93019680
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationUpdateAssembler.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
+import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ORDERING_POLICY;
+
+public final class ConfigurationUpdateAssembler {
+
+  private ConfigurationUpdateAssembler() {
+  }
+
+  public static Map<String, String> constructKeyValueConfUpdate(
+          CapacitySchedulerConfiguration proposedConf,
+          SchedConfUpdateInfo mutationInfo) throws IOException {
+
+    Map<String, String> confUpdate = new HashMap<>();
+    for (String queueToRemove : mutationInfo.getRemoveQueueInfo()) {
+      removeQueue(queueToRemove, proposedConf, confUpdate);
+    }
+    for (QueueConfigInfo addQueueInfo : mutationInfo.getAddQueueInfo()) {
+      addQueue(addQueueInfo, proposedConf, confUpdate);
+    }
+    for (QueueConfigInfo updateQueueInfo : mutationInfo.getUpdateQueueInfo()) {
+      updateQueue(updateQueueInfo, proposedConf, confUpdate);
+    }
+    for (Map.Entry<String, String> global : mutationInfo.getGlobalParams()
+            .entrySet()) {
+      confUpdate.put(global.getKey(), global.getValue());
+    }
+    return confUpdate;
+  }
+
+  private static void removeQueue(
+          String queueToRemove, CapacitySchedulerConfiguration proposedConf,
+          Map<String, String> confUpdate) throws IOException {
+    if (queueToRemove == null) {
+      return;
+    }
+    if (queueToRemove.lastIndexOf('.') == -1) {
+      throw new IOException("Can't remove queue " + queueToRemove);
+    }
+    String queueName = queueToRemove.substring(
+            queueToRemove.lastIndexOf('.') + 1);
+    List<String> siblingQueues = getSiblingQueues(queueToRemove,
+            proposedConf);
+    if (!siblingQueues.contains(queueName)) {
+      throw new IOException("Queue " + queueToRemove + " not found");
+    }
+    siblingQueues.remove(queueName);
+    String parentQueuePath = queueToRemove.substring(0, queueToRemove
+            .lastIndexOf('.'));
+    proposedConf.setQueues(parentQueuePath, siblingQueues.toArray(
+            new String[0]));
+    String queuesConfig = CapacitySchedulerConfiguration.PREFIX
+            + parentQueuePath + CapacitySchedulerConfiguration.DOT
+            + CapacitySchedulerConfiguration.QUEUES;
+    if (siblingQueues.isEmpty()) {
+      confUpdate.put(queuesConfig, null);
+      // Unset Ordering Policy of Leaf Queue converted from
+      // Parent Queue after removeQueue
+      String queueOrderingPolicy = CapacitySchedulerConfiguration.PREFIX
+              + parentQueuePath + CapacitySchedulerConfiguration.DOT
+              + ORDERING_POLICY;
+      proposedConf.unset(queueOrderingPolicy);
+      confUpdate.put(queueOrderingPolicy, null);
+    } else {
+      confUpdate.put(queuesConfig, Joiner.on(',').join(siblingQueues));
+    }
+    for (Map.Entry<String, String> confRemove : proposedConf.getValByRegex(
+                    ".*" + queueToRemove + "\\..*")
+            .entrySet()) {
+      proposedConf.unset(confRemove.getKey());
+      confUpdate.put(confRemove.getKey(), null);
+    }
+  }
+
+  private static void addQueue(
+          QueueConfigInfo addInfo, CapacitySchedulerConfiguration proposedConf,
+          Map<String, String> confUpdate) throws IOException {
+    if (addInfo == null) {
+      return;
+    }
+    String queuePath = addInfo.getQueue();
+    String queueName = queuePath.substring(queuePath.lastIndexOf('.') + 1);
+    if (queuePath.lastIndexOf('.') == -1) {
+      throw new IOException("Can't add invalid queue " + queuePath);
+    } else if (getSiblingQueues(queuePath, proposedConf).contains(
+            queueName)) {
+      throw new IOException("Can't add existing queue " + queuePath);
+    }
+    String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.'));
+    String[] siblings = proposedConf.getQueues(parentQueue);
+    List<String> siblingQueues = siblings == null ? new ArrayList<>() :
+            new ArrayList<>(Arrays.asList(siblings));
+    siblingQueues.add(queuePath.substring(queuePath.lastIndexOf('.') + 1));
+    proposedConf.setQueues(parentQueue,
+            siblingQueues.toArray(new String[0]));
+    confUpdate.put(CapacitySchedulerConfiguration.PREFIX
+                    + parentQueue + CapacitySchedulerConfiguration.DOT
+                    + CapacitySchedulerConfiguration.QUEUES,
+            Joiner.on(',').join(siblingQueues));
+    String keyPrefix = CapacitySchedulerConfiguration.PREFIX
+            + queuePath + CapacitySchedulerConfiguration.DOT;
+    for (Map.Entry<String, String> kv : addInfo.getParams().entrySet()) {
+      String keyValue = kv.getValue();
+      if (keyValue == null || keyValue.isEmpty()) {
+        proposedConf.unset(keyPrefix + kv.getKey());
+        confUpdate.put(keyPrefix + kv.getKey(), null);
+      } else {
+        proposedConf.set(keyPrefix + kv.getKey(), keyValue);
+        confUpdate.put(keyPrefix + kv.getKey(), keyValue);
+      }
+    }
+    // Unset Ordering Policy of Parent Queue converted from
+    // Leaf Queue after addQueue
+    String queueOrderingPolicy = CapacitySchedulerConfiguration.PREFIX
+            + parentQueue + CapacitySchedulerConfiguration.DOT + ORDERING_POLICY;
+    if (siblingQueues.size() == 1) {
+      proposedConf.unset(queueOrderingPolicy);
+      confUpdate.put(queueOrderingPolicy, null);
+    }
+  }
+
+  private static void updateQueue(QueueConfigInfo updateInfo,
+                                  CapacitySchedulerConfiguration proposedConf,
+                                  Map<String, String> confUpdate) {
+    if (updateInfo == null) {
+      return;
+    }
+    String queuePath = updateInfo.getQueue();
+    String keyPrefix = CapacitySchedulerConfiguration.PREFIX
+            + queuePath + CapacitySchedulerConfiguration.DOT;
+    for (Map.Entry<String, String> kv : updateInfo.getParams().entrySet()) {
+      String keyValue = kv.getValue();
+      if (keyValue == null || keyValue.isEmpty()) {
+        proposedConf.unset(keyPrefix + kv.getKey());
+        confUpdate.put(keyPrefix + kv.getKey(), null);
+      } else {
+        proposedConf.set(keyPrefix + kv.getKey(), keyValue);
+        confUpdate.put(keyPrefix + kv.getKey(), keyValue);
+      }
+    }
+  }
+
+  private static List<String> getSiblingQueues(String queuePath, Configuration conf) {
+    String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.'));
+    String childQueuesKey = CapacitySchedulerConfiguration.PREFIX +
+            parentQueue + CapacitySchedulerConfiguration.DOT +
+            CapacitySchedulerConfiguration.QUEUES;
+    return new ArrayList<>(conf.getTrimmedStringCollection(childQueuesKey));
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
index c4cb273b495..cbd217f4f2b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
 
 import org.apache.hadoop.classification.VisibleForTesting;
-import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -31,19 +30,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMuta
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
-import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
 import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ORDERING_POLICY;
-
 /**
  * CS configuration provider which implements
  * {@link MutableConfigurationProvider} for modifying capacity scheduler
@@ -79,15 +71,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
   @Override
   public void init(Configuration config) throws IOException {
     this.confStore = YarnConfigurationStoreFactory.getStore(config);
-    Configuration initialSchedConf = getInitSchedulerConfig();
-    initialSchedConf.addResource(YarnConfiguration.CS_CONFIGURATION_FILE);
-    this.schedConf = new Configuration(false);
-    // We need to explicitly set the key-values in schedConf, otherwise
-    // these configuration keys cannot be deleted when
-    // configuration is reloaded.
-    for (Map.Entry<String, String> kv : initialSchedConf) {
-      schedConf.set(kv.getKey(), kv.getValue());
-    }
+    initializeSchedConf();
     try {
       confStore.initialize(config, schedConf, rmContext);
       confStore.checkVersion();
@@ -108,7 +92,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
   }
 
   @VisibleForTesting
-  public YarnConfigurationStore getConfStore() {
+  protected YarnConfigurationStore getConfStore() {
     return confStore;
   }
 
@@ -142,7 +126,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
     CapacitySchedulerConfiguration proposedConf =
             new CapacitySchedulerConfiguration(schedConf, false);
     Map<String, String> kvUpdate
-            = constructKeyValueConfUpdate(proposedConf, confUpdate);
+            = ConfigurationUpdateAssembler.constructKeyValueConfUpdate(proposedConf, confUpdate);
     LogMutation log = new LogMutation(kvUpdate, user.getShortUserName());
     confStore.logMutation(log);
     applyMutation(proposedConf, kvUpdate);
@@ -155,7 +139,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
     CapacitySchedulerConfiguration proposedConf =
             new CapacitySchedulerConfiguration(oldConfiguration, false);
     Map<String, String> kvUpdate
-            = constructKeyValueConfUpdate(proposedConf, confUpdate);
+            = ConfigurationUpdateAssembler.constructKeyValueConfUpdate(proposedConf, confUpdate);
     applyMutation(proposedConf, kvUpdate);
     return proposedConf;
   }
@@ -177,15 +161,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
     try {
       confStore.format();
       oldConf = new Configuration(schedConf);
-      Configuration initialSchedConf = new Configuration(false);
-      initialSchedConf.addResource(YarnConfiguration.CS_CONFIGURATION_FILE);
-      this.schedConf = new Configuration(false);
-      // We need to explicitly set the key-values in schedConf, otherwise
-      // these configuration keys cannot be deleted when
-      // configuration is reloaded.
-      for (Map.Entry<String, String> kv : initialSchedConf) {
-        schedConf.set(kv.getKey(), kv.getValue());
-      }
+      initializeSchedConf();
       confStore.initialize(config, schedConf, rmContext);
       confStore.checkVersion();
     } catch (Exception e) {
@@ -195,6 +171,17 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
     }
   }
 
+  private void initializeSchedConf() {
+    Configuration initialSchedConf = getInitSchedulerConfig();
+    this.schedConf = new Configuration(false);
+    // We need to explicitly set the key-values in schedConf, otherwise
+    // these configuration keys cannot be deleted when
+    // configuration is reloaded.
+    for (Map.Entry<String, String> kv : initialSchedConf) {
+      schedConf.set(kv.getKey(), kv.getValue());
+    }
+  }
+
   @Override
   public void revertToOldConfig(Configuration config) throws Exception {
     formatLock.writeLock().lock();
@@ -233,147 +220,4 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
       formatLock.readLock().unlock();
     }
   }
-
-  private List<String> getSiblingQueues(String queuePath, Configuration conf) {
-    String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.'));
-    String childQueuesKey = CapacitySchedulerConfiguration.PREFIX +
-        parentQueue + CapacitySchedulerConfiguration.DOT +
-        CapacitySchedulerConfiguration.QUEUES;
-    return new ArrayList<>(conf.getTrimmedStringCollection(childQueuesKey));
-  }
-
-  private Map<String, String> constructKeyValueConfUpdate(
-      CapacitySchedulerConfiguration proposedConf,
-      SchedConfUpdateInfo mutationInfo) throws IOException {
-
-    Map<String, String> confUpdate = new HashMap<>();
-    for (String queueToRemove : mutationInfo.getRemoveQueueInfo()) {
-      removeQueue(queueToRemove, proposedConf, confUpdate);
-    }
-    for (QueueConfigInfo addQueueInfo : mutationInfo.getAddQueueInfo()) {
-      addQueue(addQueueInfo, proposedConf, confUpdate);
-    }
-    for (QueueConfigInfo updateQueueInfo : mutationInfo.getUpdateQueueInfo()) {
-      updateQueue(updateQueueInfo, proposedConf, confUpdate);
-    }
-    for (Map.Entry<String, String> global : mutationInfo.getGlobalParams()
-        .entrySet()) {
-      confUpdate.put(global.getKey(), global.getValue());
-    }
-    return confUpdate;
-  }
-
-  private void removeQueue(
-      String queueToRemove, CapacitySchedulerConfiguration proposedConf,
-      Map<String, String> confUpdate) throws IOException {
-    if (queueToRemove == null) {
-      return;
-    } else {
-      String queueName = queueToRemove.substring(
-          queueToRemove.lastIndexOf('.') + 1);
-      if (queueToRemove.lastIndexOf('.') == -1) {
-        throw new IOException("Can't remove queue " + queueToRemove);
-      } else {
-        List<String> siblingQueues = getSiblingQueues(queueToRemove,
-            proposedConf);
-        if (!siblingQueues.contains(queueName)) {
-          throw new IOException("Queue " + queueToRemove + " not found");
-        }
-        siblingQueues.remove(queueName);
-        String parentQueuePath = queueToRemove.substring(0, queueToRemove
-            .lastIndexOf('.'));
-        proposedConf.setQueues(parentQueuePath, siblingQueues.toArray(
-            new String[0]));
-        String queuesConfig = CapacitySchedulerConfiguration.PREFIX
-            + parentQueuePath + CapacitySchedulerConfiguration.DOT
-            + CapacitySchedulerConfiguration.QUEUES;
-        if (siblingQueues.size() == 0) {
-          confUpdate.put(queuesConfig, null);
-          // Unset Ordering Policy of Leaf Queue converted from
-          // Parent Queue after removeQueue
-          String queueOrderingPolicy = CapacitySchedulerConfiguration.PREFIX
-              + parentQueuePath + CapacitySchedulerConfiguration.DOT
-              + ORDERING_POLICY;
-          proposedConf.unset(queueOrderingPolicy);
-          confUpdate.put(queueOrderingPolicy, null);
-        } else {
-          confUpdate.put(queuesConfig, Joiner.on(',').join(siblingQueues));
-        }
-        for (Map.Entry<String, String> confRemove : proposedConf.getValByRegex(
-            ".*" + queueToRemove.replaceAll("\\.", "\\.") + "\\..*")
-            .entrySet()) {
-          proposedConf.unset(confRemove.getKey());
-          confUpdate.put(confRemove.getKey(), null);
-        }
-      }
-    }
-  }
-
-  private void addQueue(
-      QueueConfigInfo addInfo, CapacitySchedulerConfiguration proposedConf,
-      Map<String, String> confUpdate) throws IOException {
-    if (addInfo == null) {
-      return;
-    } else {
-      String queuePath = addInfo.getQueue();
-      String queueName = queuePath.substring(queuePath.lastIndexOf('.') + 1);
-      if (queuePath.lastIndexOf('.') == -1) {
-        throw new IOException("Can't add invalid queue " + queuePath);
-      } else if (getSiblingQueues(queuePath, proposedConf).contains(
-          queueName)) {
-        throw new IOException("Can't add existing queue " + queuePath);
-      }
-      String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.'));
-      String[] siblings = proposedConf.getQueues(parentQueue);
-      List<String> siblingQueues = siblings == null ? new ArrayList<>() :
-          new ArrayList<>(Arrays.<String>asList(siblings));
-      siblingQueues.add(queuePath.substring(queuePath.lastIndexOf('.') + 1));
-      proposedConf.setQueues(parentQueue,
-          siblingQueues.toArray(new String[0]));
-      confUpdate.put(CapacitySchedulerConfiguration.PREFIX
-              + parentQueue + CapacitySchedulerConfiguration.DOT
-              + CapacitySchedulerConfiguration.QUEUES,
-          Joiner.on(',').join(siblingQueues));
-      String keyPrefix = CapacitySchedulerConfiguration.PREFIX
-          + queuePath + CapacitySchedulerConfiguration.DOT;
-      for (Map.Entry<String, String> kv : addInfo.getParams().entrySet()) {
-        if (kv.getValue() == null) {
-          proposedConf.unset(keyPrefix + kv.getKey());
-        } else {
-          proposedConf.set(keyPrefix + kv.getKey(), kv.getValue());
-        }
-        confUpdate.put(keyPrefix + kv.getKey(), kv.getValue());
-      }
-      // Unset Ordering Policy of Parent Queue converted from
-      // Leaf Queue after addQueue
-      String queueOrderingPolicy = CapacitySchedulerConfiguration.PREFIX
-          + parentQueue + CapacitySchedulerConfiguration.DOT + ORDERING_POLICY;
-      if (siblingQueues.size() == 1) {
-        proposedConf.unset(queueOrderingPolicy);
-        confUpdate.put(queueOrderingPolicy, null);
-      }
-    }
-  }
-
-  private void updateQueue(QueueConfigInfo updateInfo,
-      CapacitySchedulerConfiguration proposedConf,
-      Map<String, String> confUpdate) {
-    if (updateInfo == null) {
-      return;
-    } else {
-      String queuePath = updateInfo.getQueue();
-      String keyPrefix = CapacitySchedulerConfiguration.PREFIX
-          + queuePath + CapacitySchedulerConfiguration.DOT;
-      for (Map.Entry<String, String> kv : updateInfo.getParams().entrySet()) {
-        String keyValue = kv.getValue();
-        if (keyValue == null || keyValue.isEmpty()) {
-          keyValue = null;
-          proposedConf.unset(keyPrefix + kv.getKey());
-        } else {
-          proposedConf.set(keyPrefix + kv.getKey(), keyValue);
-        }
-        confUpdate.put(keyPrefix + kv.getKey(), keyValue);
-      }
-    }
-  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestConfigurationUpdateAssembler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestConfigurationUpdateAssembler.java
new file mode 100644
index 00000000000..890996ac23e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestConfigurationUpdateAssembler.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
+import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests {@link ConfigurationUpdateAssembler}.
+ */
+public class TestConfigurationUpdateAssembler {
+
+  private static final String A_PATH = "root.a";
+  private static final String B_PATH = "root.b";
+  private static final String C_PATH = "root.c";
+
+  private static final String CONFIG_NAME = "testConfigName";
+  private static final String A_CONFIG_PATH = CapacitySchedulerConfiguration.PREFIX + A_PATH +
+          CapacitySchedulerConfiguration.DOT + CONFIG_NAME;
+  private static final String B_CONFIG_PATH = CapacitySchedulerConfiguration.PREFIX + B_PATH +
+          CapacitySchedulerConfiguration.DOT + CONFIG_NAME;
+  private static final String C_CONFIG_PATH = CapacitySchedulerConfiguration.PREFIX + C_PATH +
+          CapacitySchedulerConfiguration.DOT + CONFIG_NAME;
+  private static final String ROOT_QUEUES_PATH = CapacitySchedulerConfiguration.PREFIX +
+          CapacitySchedulerConfiguration.ROOT + CapacitySchedulerConfiguration.DOT +
+          CapacitySchedulerConfiguration.QUEUES;
+
+  private static final String A_INIT_CONFIG_VALUE = "aInitValue";
+  private static final String A_CONFIG_VALUE = "aValue";
+  private static final String B_INIT_CONFIG_VALUE = "bInitValue";
+  private static final String B_CONFIG_VALUE = "bValue";
+  private static final String C_CONFIG_VALUE = "cValue";
+
+  private CapacitySchedulerConfiguration csConfig;
+
+  @Before
+  public void setUp() {
+    csConfig = crateInitialCSConfig();
+  }
+
+  @Test
+  public void testAddQueue() throws Exception {
+    SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
+    Map<String, String> updateMap = new HashMap<>();
+    updateMap.put(CONFIG_NAME, C_CONFIG_VALUE);
+    QueueConfigInfo queueConfigInfo = new QueueConfigInfo(C_PATH, updateMap);
+    updateInfo.getAddQueueInfo().add(queueConfigInfo);
+
+    Map<String, String> configurationUpdate =
+            ConfigurationUpdateAssembler.constructKeyValueConfUpdate(csConfig, updateInfo);
+
+    assertEquals(C_CONFIG_VALUE, configurationUpdate.get(C_CONFIG_PATH));
+    assertEquals("a,b,c", configurationUpdate.get(ROOT_QUEUES_PATH));
+  }
+
+  @Test
+  public void testAddExistingQueue() {
+    SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
+    Map<String, String> updateMap = new HashMap<>();
+    updateMap.put(CONFIG_NAME, A_CONFIG_VALUE);
+    QueueConfigInfo queueConfigInfo = new QueueConfigInfo(A_PATH, updateMap);
+    updateInfo.getAddQueueInfo().add(queueConfigInfo);
+
+    assertThrows(IOException.class, () -> {
+      ConfigurationUpdateAssembler.constructKeyValueConfUpdate(csConfig, updateInfo);
+    });
+  }
+
+  @Test
+  public void testAddInvalidQueue() {
+    SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
+    Map<String, String> updateMap = new HashMap<>();
+    updateMap.put(CONFIG_NAME, A_CONFIG_VALUE);
+    QueueConfigInfo queueConfigInfo = new QueueConfigInfo("invalidPath", updateMap);
+    updateInfo.getAddQueueInfo().add(queueConfigInfo);
+
+    assertThrows(IOException.class, () -> {
+      ConfigurationUpdateAssembler.constructKeyValueConfUpdate(csConfig, updateInfo);
+    });
+  }
+
+  @Test
+  public void testUpdateQueue() throws Exception {
+    SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
+    Map<String, String> updateMap = new HashMap<>();
+    updateMap.put(CONFIG_NAME, A_CONFIG_VALUE);
+    QueueConfigInfo queueAConfigInfo = new QueueConfigInfo(A_PATH, updateMap);
+    updateInfo.getUpdateQueueInfo().add(queueAConfigInfo);
+
+    Map<String, String> updateMapQueueB = new HashMap<>();
+    updateMapQueueB.put(CONFIG_NAME, B_CONFIG_VALUE);
+    QueueConfigInfo queueBConfigInfo = new QueueConfigInfo(B_PATH, updateMapQueueB);
+
+    updateInfo.getUpdateQueueInfo().add(queueBConfigInfo);
+
+    Map<String, String> configurationUpdate =
+            ConfigurationUpdateAssembler.constructKeyValueConfUpdate(csConfig, updateInfo);
+
+    assertEquals(A_CONFIG_VALUE, configurationUpdate.get(A_CONFIG_PATH));
+    assertEquals(B_CONFIG_VALUE, configurationUpdate.get(B_CONFIG_PATH));
+  }
+
+  @Test
+  public void testRemoveQueue() throws Exception {
+    SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
+    updateInfo.getRemoveQueueInfo().add(A_PATH);
+
+    Map<String, String> configurationUpdate =
+            ConfigurationUpdateAssembler.constructKeyValueConfUpdate(csConfig, updateInfo);
+
+    assertTrue(configurationUpdate.containsKey(A_CONFIG_PATH));
+    assertNull(configurationUpdate.get(A_CONFIG_PATH));
+    assertEquals("b", configurationUpdate.get(ROOT_QUEUES_PATH));
+  }
+
+  @Test
+  public void testRemoveInvalidQueue() {
+    SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
+    updateInfo.getRemoveQueueInfo().add("invalidPath");
+
+    assertThrows(IOException.class, () -> {
+      ConfigurationUpdateAssembler.constructKeyValueConfUpdate(csConfig, updateInfo);
+    });
+  }
+
+  @Test
+  public void testRemoveNonExistingQueue() {
+    SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
+    updateInfo.getRemoveQueueInfo().add("root.d");
+
+    assertThrows(IOException.class, () -> {
+      ConfigurationUpdateAssembler.constructKeyValueConfUpdate(csConfig, updateInfo);
+    });
+  }
+
+  private CapacitySchedulerConfiguration crateInitialCSConfig() {
+    CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
+    csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a, b"});
+
+    csConf.set(A_CONFIG_PATH, A_INIT_CONFIG_VALUE);
+    csConf.set(B_CONFIG_PATH, B_INIT_CONFIG_VALUE);
+
+    return csConf;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org