You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sn...@apache.org on 2020/01/14 11:26:35 UTC

[hadoop] branch trunk updated: YARN-9788. Queue Management API does not support parallel updates. Contributed by Prabhu Joseph

This is an automated email from the ASF dual-hosted git repository.

snemeth pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1c51f36  YARN-9788. Queue Management API does not support parallel updates. Contributed by Prabhu Joseph
1c51f36 is described below

commit 1c51f36be79924489ca4a2e5ca7e96ac75a6ec18
Author: Szilard Nemeth <sn...@apache.org>
AuthorDate: Tue Jan 14 12:26:03 2020 +0100

    YARN-9788. Queue Management API does not support parallel updates. Contributed by Prabhu Joseph
---
 .../hadoop/yarn/client/cli/TestSchedConfCLI.java   |  7 +--
 .../scheduler/MutableConfigurationProvider.java    | 10 ++--
 .../conf/FSSchedulerConfigurationStore.java        |  8 +--
 .../capacity/conf/InMemoryConfigurationStore.java  | 10 ++--
 .../capacity/conf/LeveldbConfigurationStore.java   |  6 +--
 .../conf/MutableCSConfigurationProvider.java       |  8 +--
 .../capacity/conf/YarnConfigurationStore.java      |  6 ++-
 .../capacity/conf/ZKConfigurationStore.java        |  7 +--
 .../resourcemanager/webapp/RMWebServices.java      |  8 +--
 .../capacity/conf/ConfigurationStoreBaseTest.java  |  6 +--
 .../conf/TestFSSchedulerConfigurationStore.java    |  8 +--
 .../conf/TestLeveldbConfigurationStore.java        | 16 +++---
 .../conf/TestMutableCSConfigurationProvider.java   | 57 +++++++++++++++++-----
 .../capacity/conf/TestZKConfigurationStore.java    | 30 ++++++------
 14 files changed, 116 insertions(+), 71 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestSchedConfCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestSchedConfCLI.java
index 4233b4c..3b961df 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestSchedConfCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestSchedConfCLI.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.JAXBContextResolver;
@@ -247,10 +248,10 @@ public class TestSchedConfCLI extends JerseyTestBase {
       globalUpdates.put("schedKey1", "schedVal1");
       schedUpdateInfo.setGlobalParams(globalUpdates);
 
-      provider.logAndApplyMutation(UserGroupInformation.getCurrentUser(),
-          schedUpdateInfo);
+      LogMutation log = provider.logAndApplyMutation(
+          UserGroupInformation.getCurrentUser(), schedUpdateInfo);
       rm.getRMContext().getRMAdminService().refreshQueues();
-      provider.confirmPendingMutation(true);
+      provider.confirmPendingMutation(log, true);
 
       Configuration schedulerConf = provider.getConfiguration();
       assertEquals("schedVal1", schedulerConf.get("schedKey1"));
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
index 03902e3..751c9a3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
 import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
 
 import java.io.IOException;
@@ -46,18 +47,21 @@ public interface MutableConfigurationProvider {
    * Log user's requested configuration mutation, and applies it in-memory.
    * @param user User who requested the change
    * @param confUpdate User's requested configuration change
+   * @return LogMutation with update info from given SchedConfUpdateInfo
    * @throws Exception if logging the mutation fails
    */
-  void logAndApplyMutation(UserGroupInformation user, SchedConfUpdateInfo
-      confUpdate) throws Exception;
+  LogMutation logAndApplyMutation(UserGroupInformation user,
+      SchedConfUpdateInfo confUpdate) throws Exception;
 
   /**
    * Confirm last logged mutation.
+   * @param pendingMutation the log mutation to apply
    * @param isValid if the last logged mutation is applied to scheduler
    *                properly.
    * @throws Exception if confirming mutation fails
    */
-  void confirmPendingMutation(boolean isValid) throws Exception;
+  void confirmPendingMutation(LogMutation pendingMutation,
+      boolean isValid) throws Exception;
 
   /**
    * Returns scheduler configuration cached in this provider.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java
index 855939e..eeb38d3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java
@@ -58,7 +58,6 @@ public class FSSchedulerConfigurationStore extends YarnConfigurationStore {
   private int maxVersion;
   private Path schedulerConfDir;
   private FileSystem fileSystem;
-  private LogMutation pendingMutation;
   private PathFilter configFilePathFilter;
   private volatile Configuration schedConf;
   private volatile Configuration oldConf;
@@ -134,10 +133,9 @@ public class FSSchedulerConfigurationStore extends YarnConfigurationStore {
    */
   @Override
   public void logMutation(LogMutation logMutation) throws IOException {
-    pendingMutation = logMutation;
     LOG.info(new GsonBuilder().serializeNulls().create().toJson(logMutation));
     oldConf = new Configuration(schedConf);
-    Map<String, String> mutations = pendingMutation.getUpdates();
+    Map<String, String> mutations = logMutation.getUpdates();
     for (Map.Entry<String, String> kv : mutations.entrySet()) {
       if (kv.getValue() == null) {
         this.schedConf.unset(kv.getKey());
@@ -149,12 +147,14 @@ public class FSSchedulerConfigurationStore extends YarnConfigurationStore {
   }
 
   /**
+   * @param pendingMutation the log mutation to apply
    * @param isValid if true, finalize temp configuration file
    *                if false, remove temp configuration file and rollback
    * @throws Exception throw IOE when write temp configuration file fail
    */
   @Override
-  public void confirmMutation(boolean isValid) throws Exception {
+  public void confirmMutation(LogMutation pendingMutation,
+      boolean isValid) throws Exception {
     if (pendingMutation == null || tempConfigPath == null) {
       LOG.warn("pendingMutation or tempConfigPath is null, do nothing");
       return;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
index 47dd6bd..d031ea9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
@@ -32,7 +32,6 @@ import java.util.Map;
 public class InMemoryConfigurationStore extends YarnConfigurationStore {
 
   private Configuration schedConf;
-  private LogMutation pendingMutation;
   private long configVersion;
 
   @Override
@@ -42,13 +41,17 @@ public class InMemoryConfigurationStore extends YarnConfigurationStore {
     this.configVersion = 1L;
   }
 
+  /**
+   * This method does not log as it does not support backing store.
+   * The mutation to be applied on top of schedConf will be directly passed
+   * in confirmMutation.
+   */
   @Override
   public void logMutation(LogMutation logMutation) {
-    pendingMutation = logMutation;
   }
 
   @Override
-  public void confirmMutation(boolean isValid) {
+  public void confirmMutation(LogMutation pendingMutation, boolean isValid) {
     if (isValid) {
       for (Map.Entry<String, String> kv : pendingMutation.getUpdates()
           .entrySet()) {
@@ -60,7 +63,6 @@ public class InMemoryConfigurationStore extends YarnConfigurationStore {
       }
       this.configVersion = this.configVersion + 1L;
     }
-    pendingMutation = null;
   }
 
   @Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
index 39cd8ff..bcdfb59 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
@@ -75,7 +75,6 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
   private DB versiondb;
   private long maxLogs;
   private Configuration conf;
-  private LogMutation pendingMutation;
   @VisibleForTesting
   protected static final Version CURRENT_VERSION_INFO = Version
       .newInstance(0, 1);
@@ -232,11 +231,11 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
       }
       db.put(bytes(LOG_KEY), serLogMutations(logs));
     }
-    pendingMutation = logMutation;
   }
 
   @Override
-  public void confirmMutation(boolean isValid) throws IOException {
+  public void confirmMutation(LogMutation pendingMutation,
+      boolean isValid) throws IOException {
     WriteBatch updateBatch = db.createWriteBatch();
     if (isValid) {
       for (Map.Entry<String, String> changes :
@@ -252,7 +251,6 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
           bytes(String.valueOf(configVersion)));
     }
     db.write(updateBatch);
-    pendingMutation = null;
   }
 
   private byte[] serLogMutations(LinkedList<LogMutation> mutations) throws
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
index c8d0a0c..0914640 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
@@ -128,7 +128,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
   }
 
   @Override
-  public void logAndApplyMutation(UserGroupInformation user,
+  public LogMutation logAndApplyMutation(UserGroupInformation user,
       SchedConfUpdateInfo confUpdate) throws Exception {
     oldConf = new Configuration(schedConf);
     Map<String, String> kvUpdate = constructKeyValueConfUpdate(confUpdate);
@@ -141,6 +141,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
         schedConf.set(kv.getKey(), kv.getValue());
       }
     }
+    return log;
   }
 
   @Override
@@ -184,10 +185,11 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
   }
 
   @Override
-  public void confirmPendingMutation(boolean isValid) throws Exception {
+  public void confirmPendingMutation(LogMutation pendingMutation,
+      boolean isValid) throws Exception {
     formatLock.readLock().lock();
     try {
-      confStore.confirmMutation(isValid);
+      confStore.confirmMutation(pendingMutation, isValid);
       if (!isValid) {
         schedConf = oldConf;
       }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
index 6af11a3..34aa174 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
@@ -52,7 +52,7 @@ public abstract class YarnConfigurationStore {
    * LogMutation encapsulates the fields needed for configuration mutation
    * audit logging and recovery.
    */
-  static class LogMutation implements Serializable {
+  public static class LogMutation implements Serializable {
     private Map<String, String> updates;
     private String user;
 
@@ -113,11 +113,13 @@ public abstract class YarnConfigurationStore {
    * last logged by {@code logMutation} and marks the mutation as persisted (no
    * longer pending). If isValid is true, merge the mutation with the persisted
    * configuration.
+   * @param pendingMutation the log mutation to apply
    * @param isValid if true, update persisted configuration with pending
    *                mutation.
    * @throws Exception if mutation confirmation fails
    */
-  public abstract void confirmMutation(boolean isValid) throws Exception;
+  public abstract void confirmMutation(LogMutation pendingMutation,
+      boolean isValid) throws Exception;
 
   /**
    * Retrieve the persisted configuration.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java
index 75ae727..0fa48b4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java
@@ -54,7 +54,6 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
   protected static final Version CURRENT_VERSION_INFO = Version
       .newInstance(0, 1);
   private Configuration conf;
-  private LogMutation pendingMutation;
 
   private String znodeParentPath;
 
@@ -175,12 +174,11 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
       zkManager.safeSetData(logsPath, serializeObject(logs), -1, zkAcl,
               fencingNodePath);
     }
-    pendingMutation = logMutation;
   }
 
   @Override
-  public void confirmMutation(boolean isValid)
-      throws Exception {
+  public void confirmMutation(LogMutation pendingMutation,
+      boolean isValid) throws Exception {
     if (isValid) {
       Configuration storedConfigs = retrieve();
       Map<String, String> mapConf = new HashMap<>();
@@ -201,7 +199,6 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
       zkManager.setData(confVersionPath, String.valueOf(configVersion), -1);
 
     }
-    pendingMutation = null;
   }
 
   @Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
index e63dc06..bdd8e64 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
@@ -148,6 +148,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
@@ -2643,14 +2644,15 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
               throw new org.apache.hadoop.security.AccessControlException("User"
                   + " is not admin of all modified queues.");
             }
-            provider.logAndApplyMutation(callerUGI, mutationInfo);
+            LogMutation logMutation = provider.logAndApplyMutation(callerUGI,
+                mutationInfo);
             try {
               rm.getRMContext().getRMAdminService().refreshQueues();
             } catch (IOException | YarnException e) {
-              provider.confirmPendingMutation(false);
+              provider.confirmPendingMutation(logMutation, false);
               throw e;
             }
-            provider.confirmPendingMutation(true);
+            provider.confirmPendingMutation(logMutation, true);
             return null;
           }
         });
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java
index 0f50b53..4b3153a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java
@@ -64,7 +64,7 @@ public abstract class ConfigurationStoreBaseTest {
     YarnConfigurationStore.LogMutation mutation1 =
         new YarnConfigurationStore.LogMutation(update1, TEST_USER);
     confStore.logMutation(mutation1);
-    confStore.confirmMutation(true);
+    confStore.confirmMutation(mutation1, true);
     assertEquals("valUpdate1", confStore.retrieve().get("keyUpdate1"));
 
     Map<String, String> update2 = new HashMap<>();
@@ -72,7 +72,7 @@ public abstract class ConfigurationStoreBaseTest {
     YarnConfigurationStore.LogMutation mutation2 =
         new YarnConfigurationStore.LogMutation(update2, TEST_USER);
     confStore.logMutation(mutation2);
-    confStore.confirmMutation(false);
+    confStore.confirmMutation(mutation2, false);
     assertNull("Configuration should not be updated",
         confStore.retrieve().get("keyUpdate2"));
     confStore.close();
@@ -89,7 +89,7 @@ public abstract class ConfigurationStoreBaseTest {
     YarnConfigurationStore.LogMutation mutation =
         new YarnConfigurationStore.LogMutation(update, TEST_USER);
     confStore.logMutation(mutation);
-    confStore.confirmMutation(true);
+    confStore.confirmMutation(mutation, true);
     assertNull(confStore.retrieve().get("key"));
     confStore.close();
   }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java
index 7968372..e4ca3d3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java
@@ -100,7 +100,7 @@ public class TestFSSchedulerConfigurationStore {
 
     LogMutation logMutation = new LogMutation(updates, "test");
     configurationStore.logMutation(logMutation);
-    configurationStore.confirmMutation(true);
+    configurationStore.confirmMutation(logMutation, true);
     storeConf = configurationStore.retrieve();
     assertEquals(null, storeConf.get("a"));
     assertEquals("bb", storeConf.get("b"));
@@ -110,7 +110,7 @@ public class TestFSSchedulerConfigurationStore {
 
     updates.put("b", "bbb");
     configurationStore.logMutation(logMutation);
-    configurationStore.confirmMutation(true);
+    configurationStore.confirmMutation(logMutation, true);
     storeConf = configurationStore.retrieve();
     assertEquals(null, storeConf.get("a"));
     assertEquals("bbb", storeConf.get("b"));
@@ -133,7 +133,7 @@ public class TestFSSchedulerConfigurationStore {
 
     LogMutation logMutation = new LogMutation(updates, "test");
     configurationStore.logMutation(logMutation);
-    configurationStore.confirmMutation(false);
+    configurationStore.confirmMutation(logMutation, false);
     storeConf = configurationStore.retrieve();
 
     compareConfig(conf, storeConf);
@@ -168,7 +168,7 @@ public class TestFSSchedulerConfigurationStore {
         updates.put("testkey", "testvalue");
         LogMutation logMutation = new LogMutation(updates, "test");
         configStore.logMutation(logMutation);
-        configStore.confirmMutation(true);
+        configStore.confirmMutation(logMutation, true);
       } catch (IOException e) {
         if (e.getMessage().contains("Filesystem closed")) {
           fail("FSSchedulerConfigurationStore failed to handle " +
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java
index 5f78aa2..0ae7624 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
 import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
 import org.junit.Before;
 import org.junit.Test;
@@ -103,7 +104,7 @@ public class TestLeveldbConfigurationStore extends ConfigurationStoreBaseTest {
     YarnConfigurationStore.LogMutation mutation =
         new YarnConfigurationStore.LogMutation(update, TEST_USER);
     confStore.logMutation(mutation);
-    confStore.confirmMutation(true);
+    confStore.confirmMutation(mutation, true);
     assertEquals("val", confStore.retrieve().get("key"));
     confStore.close();
 
@@ -159,7 +160,7 @@ public class TestLeveldbConfigurationStore extends ConfigurationStoreBaseTest {
     logs = ((LeveldbConfigurationStore) confStore).getLogs();
     assertEquals(1, logs.size());
     assertEquals("val1", logs.get(0).getUpdates().get("key1"));
-    confStore.confirmMutation(true);
+    confStore.confirmMutation(mutation, true);
     assertEquals(1, logs.size());
     assertEquals("val1", logs.get(0).getUpdates().get("key1"));
 
@@ -171,7 +172,7 @@ public class TestLeveldbConfigurationStore extends ConfigurationStoreBaseTest {
     assertEquals(2, logs.size());
     assertEquals("val1", logs.get(0).getUpdates().get("key1"));
     assertEquals("val2", logs.get(1).getUpdates().get("key2"));
-    confStore.confirmMutation(true);
+    confStore.confirmMutation(mutation, true);
     assertEquals(2, logs.size());
     assertEquals("val1", logs.get(0).getUpdates().get("key1"));
     assertEquals("val2", logs.get(1).getUpdates().get("key2"));
@@ -185,7 +186,7 @@ public class TestLeveldbConfigurationStore extends ConfigurationStoreBaseTest {
     assertEquals(2, logs.size());
     assertEquals("val2", logs.get(0).getUpdates().get("key2"));
     assertEquals("val3", logs.get(1).getUpdates().get("key3"));
-    confStore.confirmMutation(true);
+    confStore.confirmMutation(mutation, true);
     assertEquals(2, logs.size());
     assertEquals("val2", logs.get(0).getUpdates().get("key2"));
     assertEquals("val3", logs.get(1).getUpdates().get("key3"));
@@ -211,16 +212,17 @@ public class TestLeveldbConfigurationStore extends ConfigurationStoreBaseTest {
         rm1.getResourceScheduler()).getMutableConfProvider();
     UserGroupInformation user = UserGroupInformation
         .createUserForTesting(TEST_USER, new String[0]);
-    confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+    LogMutation log = confProvider.logAndApplyMutation(user,
+        schedConfUpdateInfo);
     rm1.getResourceScheduler().reinitialize(conf, rm1.getRMContext());
     assertEquals("val", ((MutableConfScheduler) rm1.getResourceScheduler())
         .getConfiguration().get("key"));
-    confProvider.confirmPendingMutation(true);
+    confProvider.confirmPendingMutation(log, true);
     assertEquals("val", ((MutableCSConfigurationProvider) confProvider)
         .getConfStore().retrieve().get("key"));
     // Next update is not persisted, it should not be recovered
     schedConfUpdateInfo.getGlobalParams().put("key", "badVal");
-    confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+    log = confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
     rm1.close();
 
     // Start RM2 and verifies it starts with updated configuration
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java
index cb416e2..0c9a312 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
 import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
 import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
 import org.junit.Before;
@@ -93,15 +94,15 @@ public class TestMutableCSConfigurationProvider {
     assertNull(confProvider.loadConfiguration(conf)
         .get("yarn.scheduler.capacity.root.a.goodKey"));
 
-    confProvider.logAndApplyMutation(TEST_USER, goodUpdate);
-    confProvider.confirmPendingMutation(true);
+    LogMutation log = confProvider.logAndApplyMutation(TEST_USER, goodUpdate);
+    confProvider.confirmPendingMutation(log, true);
     assertEquals("goodVal", confProvider.loadConfiguration(conf)
         .get("yarn.scheduler.capacity.root.a.goodKey"));
 
     assertNull(confProvider.loadConfiguration(conf).get(
         "yarn.scheduler.capacity.root.a.badKey"));
-    confProvider.logAndApplyMutation(TEST_USER, badUpdate);
-    confProvider.confirmPendingMutation(false);
+    log = confProvider.logAndApplyMutation(TEST_USER, badUpdate);
+    confProvider.confirmPendingMutation(log, false);
     assertNull(confProvider.loadConfiguration(conf).get(
         "yarn.scheduler.capacity.root.a.badKey"));
 
@@ -125,8 +126,8 @@ public class TestMutableCSConfigurationProvider {
         QueueConfigInfo("root.a", updateMap);
     updateInfo.getUpdateQueueInfo().add(queueConfigInfo);
 
-    confProvider.logAndApplyMutation(TEST_USER, updateInfo);
-    confProvider.confirmPendingMutation(true);
+    LogMutation log = confProvider.logAndApplyMutation(TEST_USER, updateInfo);
+    confProvider.confirmPendingMutation(log, true);
     assertEquals("testval1", confProvider.loadConfiguration(conf)
         .get("yarn.scheduler.capacity.root.a.testkey1"));
     assertEquals("testval2", confProvider.loadConfiguration(conf)
@@ -138,8 +139,8 @@ public class TestMutableCSConfigurationProvider {
     queueConfigInfo = new QueueConfigInfo("root.a", updateMap);
     updateInfo.getUpdateQueueInfo().add(queueConfigInfo);
 
-    confProvider.logAndApplyMutation(TEST_USER, updateInfo);
-    confProvider.confirmPendingMutation(true);
+    log = confProvider.logAndApplyMutation(TEST_USER, updateInfo);
+    confProvider.confirmPendingMutation(log, true);
     assertNull("Failed to remove config",
         confProvider.loadConfiguration(conf)
         .get("yarn.scheduler.capacity.root.a.testkey1"));
@@ -148,6 +149,38 @@ public class TestMutableCSConfigurationProvider {
   }
 
   @Test
+  public void testMultipleUpdatesNotLost() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
+        YarnConfiguration.MEMORY_CONFIGURATION_STORE);
+    confProvider.init(conf);
+
+    SchedConfUpdateInfo updateInfo1 = new SchedConfUpdateInfo();
+    Map<String, String> updateMap1 = new HashMap<>();
+    updateMap1.put("key1", "val1");
+    QueueConfigInfo queueConfigInfo1 = new
+        QueueConfigInfo("root.a", updateMap1);
+    updateInfo1.getUpdateQueueInfo().add(queueConfigInfo1);
+    LogMutation log1 = confProvider.logAndApplyMutation(TEST_USER, updateInfo1);
+
+    SchedConfUpdateInfo updateInfo2 = new SchedConfUpdateInfo();
+    Map<String, String> updateMap2 = new HashMap<>();
+    updateMap2.put("key2", "val2");
+    QueueConfigInfo queueConfigInfo2 = new
+        QueueConfigInfo("root.a", updateMap2);
+    updateInfo2.getUpdateQueueInfo().add(queueConfigInfo2);
+    LogMutation log2 = confProvider.logAndApplyMutation(TEST_USER, updateInfo2);
+
+    confProvider.confirmPendingMutation(log1, true);
+    confProvider.confirmPendingMutation(log2, true);
+
+    assertEquals("val1", confProvider.loadConfiguration(conf)
+        .get("yarn.scheduler.capacity.root.a.key1"));
+    assertEquals("val2", confProvider.loadConfiguration(conf)
+        .get("yarn.scheduler.capacity.root.a.key2"));
+  }
+
+  @Test
   public void testHDFSBackedProvider() throws Exception {
     File testSchedulerConfigurationDir = new File(
         TestMutableCSConfigurationProvider.class.getResource("").getPath()
@@ -166,15 +199,15 @@ public class TestMutableCSConfigurationProvider {
     assertNull(confProvider.loadConfiguration(conf)
         .get("yarn.scheduler.capacity.root.a.goodKey"));
 
-    confProvider.logAndApplyMutation(TEST_USER, goodUpdate);
-    confProvider.confirmPendingMutation(true);
+    LogMutation log = confProvider.logAndApplyMutation(TEST_USER, goodUpdate);
+    confProvider.confirmPendingMutation(log, true);
     assertEquals("goodVal", confProvider.loadConfiguration(conf)
         .get("yarn.scheduler.capacity.root.a.goodKey"));
 
     assertNull(confProvider.loadConfiguration(conf).get(
         "yarn.scheduler.capacity.root.a.badKey"));
-    confProvider.logAndApplyMutation(TEST_USER, badUpdate);
-    confProvider.confirmPendingMutation(false);
+    log = confProvider.logAndApplyMutation(TEST_USER, badUpdate);
+    confProvider.confirmPendingMutation(log, false);
     assertNull(confProvider.loadConfiguration(conf).get(
         "yarn.scheduler.capacity.root.a.badKey"));
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java
index 57ccd75..e67e382 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
 import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
 import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
 import org.junit.After;
@@ -148,7 +149,7 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
     YarnConfigurationStore.LogMutation mutation =
         new YarnConfigurationStore.LogMutation(update, TEST_USER);
     confStore.logMutation(mutation);
-    confStore.confirmMutation(true);
+    confStore.confirmMutation(mutation, true);
     long v2 = confStore.getConfigVersion();
     assertEquals(2, v2);
   }
@@ -160,10 +161,9 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
 
     Map<String, String> update = new HashMap<>();
     update.put("key", "val");
-    YarnConfigurationStore.LogMutation mutation =
-        new YarnConfigurationStore.LogMutation(update, TEST_USER);
+    LogMutation mutation = new LogMutation(update, TEST_USER);
     confStore.logMutation(mutation);
-    confStore.confirmMutation(true);
+    confStore.confirmMutation(mutation, true);
     assertEquals("val", confStore.retrieve().get("key"));
 
     // Create a new configuration store, and check for updated configuration
@@ -190,7 +190,7 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
     logs = ((ZKConfigurationStore) confStore).getLogs();
     assertEquals(1, logs.size());
     assertEquals("val1", logs.get(0).getUpdates().get("key1"));
-    confStore.confirmMutation(true);
+    confStore.confirmMutation(mutation, true);
     assertEquals(1, logs.size());
     assertEquals("val1", logs.get(0).getUpdates().get("key1"));
 
@@ -202,7 +202,7 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
     assertEquals(2, logs.size());
     assertEquals("val1", logs.get(0).getUpdates().get("key1"));
     assertEquals("val2", logs.get(1).getUpdates().get("key2"));
-    confStore.confirmMutation(true);
+    confStore.confirmMutation(mutation, true);
     assertEquals(2, logs.size());
     assertEquals("val1", logs.get(0).getUpdates().get("key1"));
     assertEquals("val2", logs.get(1).getUpdates().get("key2"));
@@ -216,7 +216,7 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
     assertEquals(2, logs.size());
     assertEquals("val2", logs.get(0).getUpdates().get("key2"));
     assertEquals("val3", logs.get(1).getUpdates().get("key3"));
-    confStore.confirmMutation(true);
+    confStore.confirmMutation(mutation, true);
     assertEquals(2, logs.size());
     assertEquals("val2", logs.get(0).getUpdates().get("key2"));
     assertEquals("val3", logs.get(1).getUpdates().get("key3"));
@@ -308,16 +308,17 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
         rm1.getResourceScheduler()).getMutableConfProvider();
     UserGroupInformation user = UserGroupInformation
         .createUserForTesting(TEST_USER, new String[0]);
-    confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+    LogMutation log = confProvider.logAndApplyMutation(user,
+        schedConfUpdateInfo);
     rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext());
     assertEquals("val", ((MutableConfScheduler) rm1.getResourceScheduler())
         .getConfiguration().get("key"));
-    confProvider.confirmPendingMutation(true);
+    confProvider.confirmPendingMutation(log, true);
     assertEquals("val", ((MutableCSConfigurationProvider) confProvider)
         .getConfStore().retrieve().get("key"));
     // Next update is not persisted, it should not be recovered
     schedConfUpdateInfo.getGlobalParams().put("key", "badVal");
-    confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+    log = confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
 
     // Start RM2 and verifies it starts with updated configuration
     rm2.getRMContext().getRMAdminService().transitionToActive(req);
@@ -400,9 +401,10 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
     stopParams.put("capacity", "0");
     QueueConfigInfo stopInfo = new QueueConfigInfo("root.default", stopParams);
     schedConfUpdateInfo.getUpdateQueueInfo().add(stopInfo);
-    confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+    LogMutation log = confProvider.logAndApplyMutation(user,
+        schedConfUpdateInfo);
     rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext());
-    confProvider.confirmPendingMutation(true);
+    confProvider.confirmPendingMutation(log, true);
     assertTrue(Arrays.asList(((MutableConfScheduler) rm1.getResourceScheduler())
         .getConfiguration().get("yarn.scheduler.capacity.root.queues").split
             (",")).contains("a"));
@@ -411,9 +413,9 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
     schedConfUpdateInfo.getUpdateQueueInfo().clear();
     schedConfUpdateInfo.getAddQueueInfo().clear();
     schedConfUpdateInfo.getRemoveQueueInfo().add("root.default");
-    confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+    log =  confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
     rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext());
-    confProvider.confirmPendingMutation(true);
+    confProvider.confirmPendingMutation(log, true);
     assertEquals("a", ((MutableConfScheduler) rm1.getResourceScheduler())
         .getConfiguration().get("yarn.scheduler.capacity.root.queues"));
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org