You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2017/09/18 16:59:22 UTC

[1/2] hadoop git commit: YARN-6840. Implement zookeeper based store for scheduler configuration updates. (Jonathan Hung via wangda)

Repository: hadoop
Updated Branches:
  refs/heads/YARN-5734 98c4a5299 -> 9726e1fc2
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/
new file mode 100644
index 0000000..3cfa8da
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/
@@ -0,0 +1,312 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+ * Tests {@link ZKConfigurationStore}.
+ */
+public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
+  public static final Log LOG =
+      LogFactory.getLog(TestZKConfigurationStore.class);
+  private static final int ZK_TIMEOUT_MS = 10000;
+  private TestingServer curatorTestingServer;
+  private CuratorFramework curatorFramework;
+  private ResourceManager rm;
+  public static TestingServer setupCuratorServer() throws Exception {
+    TestingServer curatorTestingServer = new TestingServer();
+    curatorTestingServer.start();
+    return curatorTestingServer;
+  }
+  public static CuratorFramework setupCuratorFramework(
+      TestingServer curatorTestingServer) throws Exception {
+    CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
+        .connectString(curatorTestingServer.getConnectString())
+        .retryPolicy(new RetryNTimes(100, 100))
+        .build();
+    curatorFramework.start();
+    return curatorFramework;
+  }
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    curatorTestingServer = setupCuratorServer();
+    curatorFramework = setupCuratorFramework(curatorTestingServer);
+    conf.set(CommonConfigurationKeys.ZK_ADDRESS,
+        curatorTestingServer.getConnectString());
+    rm = new MockRM(conf);
+    rm.start();
+    rmContext = rm.getRMContext();
+  }
+  @After
+  public void cleanup() throws IOException {
+    rm.stop();
+    curatorFramework.close();
+    curatorTestingServer.stop();
+  }
+  @Test
+  public void testVersioning() throws Exception {
+    confStore.initialize(conf, schedConf, rmContext);
+    assertNull(confStore.getConfStoreVersion());
+    confStore.checkVersion();
+    assertEquals(ZKConfigurationStore.CURRENT_VERSION_INFO,
+        confStore.getConfStoreVersion());
+  }
+  @Test
+  public void testPersistConfiguration() throws Exception {
+    schedConf.set("key", "val");
+    confStore.initialize(conf, schedConf, rmContext);
+    assertEquals("val", confStore.retrieve().get("key"));
+    // Create a new configuration store, and check for old configuration
+    confStore = createConfStore();
+    schedConf.set("key", "badVal");
+    // Should ignore passed-in scheduler configuration.
+    confStore.initialize(conf, schedConf, rmContext);
+    assertEquals("val", confStore.retrieve().get("key"));
+  }
+  @Test
+  public void testPersistUpdatedConfiguration() throws Exception {
+    confStore.initialize(conf, schedConf, rmContext);
+    assertNull(confStore.retrieve().get("key"));
+    Map<String, String> update = new HashMap<>();
+    update.put("key", "val");
+    YarnConfigurationStore.LogMutation mutation =
+        new YarnConfigurationStore.LogMutation(update, TEST_USER);
+    confStore.logMutation(mutation);
+    confStore.confirmMutation(true);
+    assertEquals("val", confStore.retrieve().get("key"));
+    // Create a new configuration store, and check for updated configuration
+    confStore = createConfStore();
+    schedConf.set("key", "badVal");
+    // Should ignore passed-in scheduler configuration.
+    confStore.initialize(conf, schedConf, rmContext);
+    assertEquals("val", confStore.retrieve().get("key"));
+  }
+  @Test
+  public void testMaxLogs() throws Exception {
+    conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2);
+    confStore.initialize(conf, schedConf, rmContext);
+    LinkedList<YarnConfigurationStore.LogMutation> logs =
+        ((ZKConfigurationStore) confStore).getLogs();
+    assertEquals(0, logs.size());
+    Map<String, String> update1 = new HashMap<>();
+    update1.put("key1", "val1");
+    YarnConfigurationStore.LogMutation mutation =
+        new YarnConfigurationStore.LogMutation(update1, TEST_USER);
+    confStore.logMutation(mutation);
+    logs = ((ZKConfigurationStore) confStore).getLogs();
+    assertEquals(1, logs.size());
+    assertEquals("val1", logs.get(0).getUpdates().get("key1"));
+    confStore.confirmMutation(true);
+    assertEquals(1, logs.size());
+    assertEquals("val1", logs.get(0).getUpdates().get("key1"));
+    Map<String, String> update2 = new HashMap<>();
+    update2.put("key2", "val2");
+    mutation = new YarnConfigurationStore.LogMutation(update2, TEST_USER);
+    confStore.logMutation(mutation);
+    logs = ((ZKConfigurationStore) confStore).getLogs();
+    assertEquals(2, logs.size());
+    assertEquals("val1", logs.get(0).getUpdates().get("key1"));
+    assertEquals("val2", logs.get(1).getUpdates().get("key2"));
+    confStore.confirmMutation(true);
+    assertEquals(2, logs.size());
+    assertEquals("val1", logs.get(0).getUpdates().get("key1"));
+    assertEquals("val2", logs.get(1).getUpdates().get("key2"));
+    // Next update should purge first update from logs.
+    Map<String, String> update3 = new HashMap<>();
+    update3.put("key3", "val3");
+    mutation = new YarnConfigurationStore.LogMutation(update3, TEST_USER);
+    confStore.logMutation(mutation);
+    logs = ((ZKConfigurationStore) confStore).getLogs();
+    assertEquals(2, logs.size());
+    assertEquals("val2", logs.get(0).getUpdates().get("key2"));
+    assertEquals("val3", logs.get(1).getUpdates().get("key3"));
+    confStore.confirmMutation(true);
+    assertEquals(2, logs.size());
+    assertEquals("val2", logs.get(0).getUpdates().get("key2"));
+    assertEquals("val3", logs.get(1).getUpdates().get("key3"));
+  }
+  public Configuration createRMHAConf(String rmIds, String rmId,
+      int adminPort) {
+    Configuration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
+    conf.set(YarnConfiguration.RM_HA_IDS, rmIds);
+    conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+    conf.set(CapacitySchedulerConfiguration.CS_CONF_PROVIDER,
+        CapacitySchedulerConfiguration.STORE_CS_CONF_PROVIDER);
+    conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
+        YarnConfiguration.ZK_CONFIGURATION_STORE);
+    conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName());
+    conf.set(YarnConfiguration.RM_ZK_ADDRESS,
+        curatorTestingServer.getConnectString());
+    conf.set(YarnConfiguration.RM_HA_ID, rmId);
+    conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0");
+    conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
+    for (String rpcAddress :
+        YarnConfiguration.getServiceAddressConfKeys(conf)) {
+      for (String id : HAUtil.getRMHAIds(conf)) {
+        conf.set(HAUtil.addSuffix(rpcAddress, id), "localhost:0");
+      }
+    }
+    conf.set(HAUtil.addSuffix(YarnConfiguration.RM_ADMIN_ADDRESS, rmId),
+        "localhost:" + adminPort);
+    return conf;
+  }
+  /**
+   * When failing over, new active RM should read from current state of store,
+   * including any updates when the new active RM was in standby.
+   * @throws Exception
+   */
+  @Test
+  public void testFailoverReadsFromUpdatedStore() throws Exception {
+    HAServiceProtocol.StateChangeRequestInfo req =
+        new HAServiceProtocol.StateChangeRequestInfo(
+        HAServiceProtocol.RequestSource.REQUEST_BY_USER);
+    Configuration conf1 = createRMHAConf("rm1,rm2", "rm1", 1234);
+    ResourceManager rm1 = new MockRM(conf1);
+    rm1.start();
+    rm1.getRMContext().getRMAdminService().transitionToActive(req);
+    assertEquals("RM with ZKStore didn't start",
+        Service.STATE.STARTED, rm1.getServiceState());
+    assertEquals("RM should be Active",
+        HAServiceProtocol.HAServiceState.ACTIVE,
+        rm1.getRMContext().getRMAdminService().getServiceStatus().getState());
+    assertNull(((MutableConfScheduler) rm1.getResourceScheduler())
+        .getConfiguration().get("key"));
+    Configuration conf2 = createRMHAConf("rm1,rm2", "rm2", 5678);
+    ResourceManager rm2 = new MockRM(conf2);
+    rm2.start();
+    assertEquals("RM should be Standby",
+        HAServiceProtocol.HAServiceState.STANDBY,
+        rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
+    // Update configuration on RM1
+    SchedConfUpdateInfo schedConfUpdateInfo = new SchedConfUpdateInfo();
+    schedConfUpdateInfo.getGlobalParams().put("key", "val");
+    MutableConfigurationProvider confProvider = ((MutableConfScheduler)
+        rm1.getResourceScheduler()).getMutableConfProvider();
+    UserGroupInformation user = UserGroupInformation
+        .createUserForTesting(TEST_USER, new String[0]);
+    confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+    rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext());
+    assertEquals("val", ((MutableConfScheduler) rm1.getResourceScheduler())
+        .getConfiguration().get("key"));
+    confProvider.confirmPendingMutation(true);
+    assertEquals("val", ((MutableCSConfigurationProvider) confProvider)
+        .getConfStore().retrieve().get("key"));
+    // Next update is not persisted, it should not be recovered
+    schedConfUpdateInfo.getGlobalParams().put("key", "badVal");
+    confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+    // Start RM2 and verifies it starts with updated configuration
+    rm2.getRMContext().getRMAdminService().transitionToActive(req);
+    assertEquals("RM with ZKStore didn't start",
+        Service.STATE.STARTED, rm2.getServiceState());
+    assertEquals("RM should be Active",
+        HAServiceProtocol.HAServiceState.ACTIVE,
+        rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
+    for (int i = 0; i < ZK_TIMEOUT_MS / 50; i++) {
+      if (HAServiceProtocol.HAServiceState.ACTIVE ==
+          rm1.getRMContext().getRMAdminService().getServiceStatus()
+              .getState()) {
+        Thread.sleep(100);
+      }
+    }
+    assertEquals("RM should have been fenced",
+        HAServiceProtocol.HAServiceState.STANDBY,
+        rm1.getRMContext().getRMAdminService().getServiceStatus().getState());
+    assertEquals("RM should be Active",
+        HAServiceProtocol.HAServiceState.ACTIVE,
+        rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
+    assertEquals("val", ((MutableCSConfigurationProvider) (
+        (CapacityScheduler) rm2.getResourceScheduler())
+        .getMutableConfProvider()).getConfStore().retrieve().get("key"));
+    assertEquals("val", ((MutableConfScheduler) rm2.getResourceScheduler())
+        .getConfiguration().get("key"));
+    // Transition to standby will set RM's HA status and then reinitialize in
+    // a separate thread. Despite asserting for STANDBY state, it's
+    // possible for reinitialization to be unfinished. Wait here for it to
+    // finish, otherwise closing rm1 will close zkManager and the unfinished
+    // reinitialization will throw an exception.
+    Thread.sleep(10000);
+    rm1.close();
+    rm2.close();
+  }
+  @Override
+  public YarnConfigurationStore createConfStore() {
+    return new ZKConfigurationStore();
+  }

To unsubscribe, e-mail:
For additional commands, e-mail:

[2/2] hadoop git commit: YARN-6840. Implement zookeeper based store for scheduler configuration updates. (Jonathan Hung via wangda)

Posted by
YARN-6840. Implement zookeeper based store for scheduler configuration updates. (Jonathan Hung via wangda)

Change-Id: I9debea674fe8c7e4109d4ca136965a1ea4c48bcc


Branch: refs/heads/YARN-5734
Commit: 9726e1fc2c4d5025bf20d087ee45c7ade46693f4
Parents: 98c4a52
Author: Wangda Tan <>
Authored: Mon Sep 18 09:53:42 2017 -0700
Committer: Wangda Tan <>
Committed: Mon Sep 18 09:53:42 2017 -0700

 .../hadoop/yarn/conf/     |  14 +-
 .../src/main/resources/yarn-default.xml         |  15 +-
 .../server/resourcemanager/    |  18 +-
 .../server/resourcemanager/ |  24 +-
 .../    |   2 +-
 .../recovery/                |   5 +-
 .../scheduler/         |  22 +-
 .../scheduler/ |  36 ++-
 .../scheduler/capacity/   |  22 +-
 .../conf/        |  71 +++--
 .../conf/         | 168 +++++-----
 .../conf/    | 148 +++++----
 .../capacity/conf/   | 132 ++++----
 .../capacity/conf/     | 235 ++++++++++++++
 .../resourcemanager/webapp/   |  26 +-
 .../conf/        |  90 ++++++
 .../conf/    |  30 ++
 .../     |  18 +-
 .../conf/        |  71 -----
 .../capacity/conf/ | 312 +++++++++++++++++++
 20 files changed, 1037 insertions(+), 422 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/
index 6ffe1f7..793a554 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/
@@ -643,6 +643,7 @@ public class YarnConfiguration extends Configuration {
       YARN_PREFIX + "";
   public static final String MEMORY_CONFIGURATION_STORE = "memory";
   public static final String LEVELDB_CONFIGURATION_STORE = "leveldb";
+  public static final String ZK_CONFIGURATION_STORE = "zk";
   public static final String DEFAULT_CONFIGURATION_STORE =
   public static final String RM_SCHEDCONF_STORE_PATH = YARN_PREFIX
@@ -654,9 +655,16 @@ public class YarnConfiguration extends Configuration {
   public static final long
-  public static final String RM_SCHEDCONF_LEVELDB_MAX_LOGS =
-      YARN_PREFIX + "scheduler.configuration.leveldb-store.max-logs";
-  public static final int DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS = 1000;
+  public static final String RM_SCHEDCONF_MAX_LOGS =
+      YARN_PREFIX + "";
+  public static final long DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS = 1000;
+  public static final long DEFAULT_RM_SCHEDCONF_ZK_MAX_LOGS = 1000;
+  /** Parent znode path under which ZKConfigurationStore will create znodes. */
+      + "scheduler.configuration.zk-store.parent-path";
+  public static final String DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH =
+      "/confstore";
   public static final String RM_SCHEDULER_MUTATION_ACL_POLICY_CLASS =
       YARN_PREFIX + "scheduler.configuration.mutation.acl-policy.class";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 2d6574f..dd9e913 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3271,11 +3271,20 @@
-      The max number of configuration change log entries kept in LevelDB config
+      The max number of configuration change log entries kept in config
       store, when is configured to be
-      "leveldb". Default is 1000.
+      "leveldb" or "zk". Default is 1000 for either.
-    <name>yarn.scheduler.configuration.leveldb-store.max-logs</name>
+    <name></name>
+  <property>
+    <description>
+      ZK root node path for configuration store when using zookeeper-based
+      configuration store.
+    </description>
+    <name>yarn.scheduler.configuration.zk-store.parent-path</name>
+    <value>/confstore</value>
+  </property>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/
index fd9e849..6c0a854 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/
@@ -387,9 +387,7 @@ public class AdminService extends CompositeService implements
     RefreshQueuesResponse response =
     try {
-      ResourceScheduler scheduler = rm.getRMContext().getScheduler();
-      if (scheduler instanceof MutableConfScheduler
-          && ((MutableConfScheduler) scheduler).isConfigurationMutable()) {
+      if (isSchedulerMutable()) {
         throw new IOException("Scheduler configuration is mutable. " +
             operation + " is not allowed in this scenario.");
@@ -413,6 +411,12 @@ public class AdminService extends CompositeService implements
+  private boolean isSchedulerMutable() {
+    ResourceScheduler scheduler = rm.getRMContext().getScheduler();
+    return (scheduler instanceof MutableConfScheduler
+        && ((MutableConfScheduler) scheduler).isConfigurationMutable());
+  }
   public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
       throws YarnException, StandbyException {
@@ -721,6 +725,14 @@ public class AdminService extends CompositeService implements
   void refreshAll() throws ServiceFailedException {
     try {
+      if (isSchedulerMutable()) {
+        try {
+          ((MutableConfScheduler) rm.getRMContext().getScheduler())
+              .getMutableConfProvider().reloadConfigurationFromStore();
+        } catch (Exception e) {
+          throw new IOException("Failed to refresh configuration:", e);
+        }
+      }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/
index 5333f25..0be0539 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/
@@ -342,7 +342,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
     if (curatorEnabled) {
-      this.zkManager = createAndStartZKManager(conf);
+      this.zkManager = getAndStartZKManager(conf);
       elector = new CuratorBasedElectorService(this);
     } else {
       elector = new ActiveStandbyElectorBasedElectorService(this);
@@ -351,13 +351,16 @@ public class ResourceManager extends CompositeService implements Recoverable {
-   * Create and ZooKeeper Curator manager.
+   * Get ZooKeeper Curator manager, creating and starting if not exists.
    * @param config Configuration for the ZooKeeper curator.
-   * @return New ZooKeeper Curator manager.
+   * @return ZooKeeper Curator manager.
    * @throws IOException If it cannot create the manager.
-  public ZKCuratorManager createAndStartZKManager(Configuration config)
-      throws IOException {
+  public synchronized ZKCuratorManager getAndStartZKManager(Configuration
+      config) throws IOException {
+    if (this.zkManager != null) {
+      return zkManager;
+    }
     ZKCuratorManager manager = new ZKCuratorManager(config);
     // Get authentication
@@ -377,15 +380,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
-    return manager;
-  }
-  /**
-   * Get the ZooKeeper Curator manager.
-   * @return ZooKeeper Curator manager.
-   */
-  public ZKCuratorManager getZKManager() {
-    return this.zkManager;
+    this.zkManager = manager;
+    return zkManager;
   public CuratorFramework getCurator() {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/
index 135868f..d5fce36 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/
@@ -22,7 +22,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
  * This exception is thrown by ResourceManager if it's loading an incompatible
- * version of state from state store on recovery.
+ * version of storage on recovery.
 public class RMStateVersionIncompatibleException extends YarnException {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/
index ac67dcd..5bff77f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/
@@ -327,10 +327,7 @@ public class ZKRMStateStore extends RMStateStore {
     amrmTokenSecretManagerRoot =
         getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT);
     reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT);
-    zkManager = resourceManager.getZKManager();
-    if (zkManager == null) {
-      zkManager = resourceManager.createAndStartZKManager(conf);
-    }
+    zkManager = resourceManager.getAndStartZKManager(conf);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/
index 313bf6a..6f677fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/
@@ -18,11 +18,6 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
  * Interface for a scheduler that supports changing configuration at runtime.
@@ -31,16 +26,6 @@ import;
 public interface MutableConfScheduler extends ResourceScheduler {
-   * Update the scheduler's configuration.
-   * @param user Caller of this update
-   * @param confUpdate configuration update
-   * @throws IOException if scheduler could not be reinitialized
-   * @throws YarnException if reservation system could not be reinitialized
-   */
-  void updateConfiguration(UserGroupInformation user,
-      SchedConfUpdateInfo confUpdate) throws IOException, YarnException;
-  /**
    * Get the scheduler configuration.
    * @return the scheduler configuration
@@ -58,4 +43,11 @@ public interface MutableConfScheduler extends ResourceScheduler {
    * @return whether scheduler configuration is mutable or not.
   boolean isConfigurationMutable();
+  /**
+   * Get scheduler's configuration provider, so other classes can directly
+   * call mutation APIs on configuration provider.
+   * @return scheduler's configuration provider
+   */
+  MutableConfigurationProvider getMutableConfProvider();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/
index 9baf1ad..f8e8814 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/
@@ -19,30 +19,40 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
-import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
  * Interface for allowing changing scheduler configurations.
 public interface MutableConfigurationProvider {
-   * Apply transactions which were not committed.
-   * @throws IOException if recovery fails
+   * Get the acl mutation policy for this configuration provider.
+   * @return The acl mutation policy.
+   */
+  ConfigurationMutationACLPolicy getAclMutationPolicy();
+  /**
+   * Called when a new ResourceManager is starting/becomes active. Ensures
+   * configuration is up-to-date.
+   * @throws Exception if configuration could not be refreshed from store
-  void recoverConf() throws IOException;
+  void reloadConfigurationFromStore() throws Exception;
-   * Update the scheduler configuration with the provided key value pairs.
-   * @param user User issuing the request
-   * @param confUpdate Key-value pairs for configurations to be updated.
-   * @throws IOException if scheduler could not be reinitialized
-   * @throws YarnException if reservation system could not be reinitialized
+   * Log user's requested configuration mutation, and applies it in-memory.
+   * @param user User who requested the change
+   * @param confUpdate User's requested configuration change
+   * @throws Exception if logging the mutation fails
-  void mutateConfiguration(UserGroupInformation user, SchedConfUpdateInfo
-      confUpdate) throws IOException, YarnException;
+  void logAndApplyMutation(UserGroupInformation user, SchedConfUpdateInfo
+      confUpdate) throws Exception;
+  /**
+   * Confirm last logged mutation.
+   * @param isValid if the last logged mutation is applied to scheduler
+   *                properly.
+   * @throws Exception if confirming mutation fails
+   */
+  void confirmPendingMutation(boolean isValid) throws Exception;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/
index df77135..c6f9c97 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/
@@ -141,7 +141,6 @@ import org.apache.hadoop.yarn.server.utils.Lock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
@@ -393,9 +392,6 @@ public class CapacityScheduler extends
   public void serviceStart() throws Exception {
-    if (this.csConfProvider instanceof MutableConfigurationProvider) {
-      ((MutableConfigurationProvider) csConfProvider).recoverConf();
-    }
@@ -2576,19 +2572,15 @@ public class CapacityScheduler extends
-  public void updateConfiguration(UserGroupInformation user,
-      SchedConfUpdateInfo confUpdate) throws IOException, YarnException {
-    if (isConfigurationMutable()) {
-      ((MutableConfigurationProvider) csConfProvider).mutateConfiguration(
-          user, confUpdate);
-    } else {
-      throw new UnsupportedOperationException("Configured CS configuration " +
-          "provider does not support updating configuration.");
-    }
+  public boolean isConfigurationMutable() {
+    return csConfProvider instanceof MutableConfigurationProvider;
-  public boolean isConfigurationMutable() {
-    return csConfProvider instanceof MutableConfigurationProvider;
+  public MutableConfigurationProvider getMutableConfProvider() {
+    if (isConfigurationMutable()) {
+      return (MutableConfigurationProvider) csConfProvider;
+    }
+    return null;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/
index c63734d..d69c236 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/
@@ -19,8 +19,9 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -28,48 +29,35 @@ import java.util.Map;
  * A default implementation of {@link YarnConfigurationStore}. Doesn't offer
  * persistent configuration storage, just stores the configuration in memory.
-public class InMemoryConfigurationStore implements YarnConfigurationStore {
+public class InMemoryConfigurationStore extends YarnConfigurationStore {
   private Configuration schedConf;
-  private LinkedList<LogMutation> pendingMutations;
-  private long pendingId;
+  private LogMutation pendingMutation;
-  public void initialize(Configuration conf, Configuration schedConf) {
+  public void initialize(Configuration conf, Configuration schedConf,
+      RMContext rmContext) {
     this.schedConf = schedConf;
-    this.pendingMutations = new LinkedList<>();
-    this.pendingId = 0;
-  public synchronized long logMutation(LogMutation logMutation) {
-    logMutation.setId(++pendingId);
-    pendingMutations.add(logMutation);
-    return pendingId;
+  public void logMutation(LogMutation logMutation) {
+    pendingMutation = logMutation;
-  public synchronized boolean confirmMutation(long id, boolean isValid) {
-    LogMutation mutation = pendingMutations.poll();
-    // If confirmMutation is called out of order, discard mutations until id
-    // is reached.
-    while (mutation != null) {
-      if (mutation.getId() == id) {
-        if (isValid) {
-          Map<String, String> mutations = mutation.getUpdates();
-          for (Map.Entry<String, String> kv : mutations.entrySet()) {
-            if (kv.getValue() == null) {
-              schedConf.unset(kv.getKey());
-            } else {
-              schedConf.set(kv.getKey(), kv.getValue());
-            }
-          }
+  public void confirmMutation(boolean isValid) {
+    if (isValid) {
+      for (Map.Entry<String, String> kv : pendingMutation.getUpdates()
+          .entrySet()) {
+        if (kv.getValue() == null) {
+          schedConf.unset(kv.getKey());
+        } else {
+          schedConf.set(kv.getKey(), kv.getValue());
-        return true;
-      mutation = pendingMutations.poll();
-    return false;
+    pendingMutation = null;
@@ -78,13 +66,30 @@ public class InMemoryConfigurationStore implements YarnConfigurationStore {
-  public synchronized List<LogMutation> getPendingMutations() {
-    return new LinkedList<>(pendingMutations);
+  public List<LogMutation> getConfirmedConfHistory(long fromId) {
+    // Unimplemented.
+    return null;
-  public List<LogMutation> getConfirmedConfHistory(long fromId) {
-    // Unimplemented.
+  public Version getConfStoreVersion() throws Exception {
+    // Does nothing.
     return null;
+  @Override
+  public void storeVersion() throws Exception {
+    // Does nothing.
+  }
+  @Override
+  public Version getCurrentVersion() {
+    // Does nothing.
+    return null;
+  }
+  @Override
+  public void checkVersion() {
+    // Does nothing. (Version is always compatible since it's in memory)
+  }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/
index 1280fab..1b0eb9f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/
@@ -26,6 +26,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.fusesource.leveldbjni.JniDBFactory;
 import org.fusesource.leveldbjni.internal.NativeDB;
 import org.iq80.leveldb.DB;
@@ -55,58 +59,32 @@ import static org.fusesource.leveldbjni.JniDBFactory.bytes;
  * A LevelDB implementation of {@link YarnConfigurationStore}.
-public class LeveldbConfigurationStore implements YarnConfigurationStore {
+public class LeveldbConfigurationStore extends YarnConfigurationStore {
   public static final Log LOG =
   private static final String DB_NAME = "yarn-conf-store";
-  private static final String LOG_PREFIX = "log.";
-  private static final String LOG_COMMITTED_TXN = "committedTxn";
+  private static final String LOG_KEY = "log";
+  private static final String VERSION_KEY = "version";
   private DB db;
-  // Txnid for the last transaction logged to the store.
-  private long txnId = 0;
-  private long minTxn = 0;
   private long maxLogs;
   private Configuration conf;
-  private LinkedList<LogMutation> pendingMutations = new LinkedList<>();
+  private LogMutation pendingMutation;
+  private static final Version CURRENT_VERSION_INFO = Version
+      .newInstance(0, 1);
   private Timer compactionTimer;
   private long compactionIntervalMsec;
-  public void initialize(Configuration config, Configuration schedConf)
-      throws IOException {
+  public void initialize(Configuration config, Configuration schedConf,
+      RMContext rmContext) throws IOException {
     this.conf = config;
     try {
       this.db = initDatabase(schedConf);
-      this.txnId = Long.parseLong(new String(db.get(bytes(LOG_COMMITTED_TXN)),
-          StandardCharsets.UTF_8));
-      DBIterator itr = db.iterator();
- + txnId));
-      // Seek to first uncommitted log
-      while (itr.hasNext()) {
-        Map.Entry<byte[], byte[]> entry =;
-        if (!new String(entry.getKey(), StandardCharsets.UTF_8)
-            .startsWith(LOG_PREFIX)) {
-          break;
-        }
-        pendingMutations.add(deserLogMutation(entry.getValue()));
-        txnId++;
-      }
-      // Get the earliest txnId stored in logs
-      itr.seekToFirst();
-      if (itr.hasNext()) {
-        Map.Entry<byte[], byte[]> entry =;
-        byte[] key = entry.getKey();
-        String logId = new String(key, StandardCharsets.UTF_8);
-        if (logId.startsWith(LOG_PREFIX)) {
-          minTxn = Long.parseLong(logId.substring(logId.indexOf('.') + 1));
-        }
-      }
       this.maxLogs = config.getLong(
-          YarnConfiguration.RM_SCHEDCONF_LEVELDB_MAX_LOGS,
+          YarnConfiguration.RM_SCHEDCONF_MAX_LOGS,
       this.compactionIntervalMsec = config.getLong(
@@ -127,33 +105,23 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore {
       public int compare(byte[] key1, byte[] key2) {
         String key1Str = new String(key1, StandardCharsets.UTF_8);
         String key2Str = new String(key2, StandardCharsets.UTF_8);
-        int key1Txn = Integer.MAX_VALUE;
-        int key2Txn = Integer.MAX_VALUE;
-        if (key1Str.startsWith(LOG_PREFIX)) {
-          key1Txn = Integer.parseInt(key1Str.substring(
-              key1Str.indexOf('.') + 1));
-        }
-        if (key2Str.startsWith(LOG_PREFIX)) {
-          key2Txn = Integer.parseInt(key2Str.substring(
-              key2Str.indexOf('.') + 1));
-        }
-        // TODO txnId could overflow, in theory
-        if (key1Txn == Integer.MAX_VALUE && key2Txn == Integer.MAX_VALUE) {
-          if (key1Str.equals(key2Str) && key1Str.equals(LOG_COMMITTED_TXN)) {
-            return 0;
-          } else if (key1Str.equals(LOG_COMMITTED_TXN)) {
-            return -1;
-          } else if (key2Str.equals(LOG_COMMITTED_TXN)) {
-            return 1;
-          }
-          return key1Str.compareTo(key2Str);
+        if (key1Str.equals(key2Str)) {
+          return 0;
+        } else if (key1Str.equals(VERSION_KEY)) {
+          return -1;
+        } else if (key2Str.equals(VERSION_KEY)) {
+          return 1;
+        } else if (key1Str.equals(LOG_KEY)) {
+          return -1;
+        } else if (key2Str.equals(LOG_KEY)) {
+          return 1;
-        return key1Txn - key2Txn;
+        return key1Str.compareTo(key2Str);
       public String name() {
-        return "logComparator";
+        return "keyComparator";
       public byte[] findShortestSeparator(byte[] start, byte[] limit) {
@@ -164,6 +132,7 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore {
         return key;
+"Using conf database at " + storeRoot);
     File dbfile = new File(storeRoot.toString());
     try {
@@ -179,7 +148,6 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore {
           for (Map.Entry<String, String> kv : config) {
             initBatch.put(bytes(kv.getKey()), bytes(kv.getValue()));
-          initBatch.put(bytes(LOG_COMMITTED_TXN), bytes("0"));
         } catch (DBException dbErr) {
           throw new IOException(dbErr.getMessage(), dbErr);
@@ -208,28 +176,22 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore {
-  public synchronized long logMutation(LogMutation logMutation)
-      throws IOException {
-    logMutation.setId(++txnId);
-    WriteBatch logBatch = db.createWriteBatch();
-    logBatch.put(bytes(LOG_PREFIX + txnId), serLogMutation(logMutation));
-    if (txnId - minTxn >= maxLogs) {
-      logBatch.delete(bytes(LOG_PREFIX + minTxn));
-      minTxn++;
+  public void logMutation(LogMutation logMutation) throws IOException {
+    LinkedList<LogMutation> logs = deserLogMutations(db.get(bytes(LOG_KEY)));
+    logs.add(logMutation);
+    if (logs.size() > maxLogs) {
+      logs.removeFirst();
-    db.write(logBatch);
-    pendingMutations.add(logMutation);
-    return txnId;
+    db.put(bytes(LOG_KEY), serLogMutations(logs));
+    pendingMutation = logMutation;
-  public synchronized boolean confirmMutation(long id, boolean isValid)
-      throws IOException {
+  public void confirmMutation(boolean isValid) throws IOException {
     WriteBatch updateBatch = db.createWriteBatch();
     if (isValid) {
-      LogMutation mutation = deserLogMutation(db.get(bytes(LOG_PREFIX + id)));
       for (Map.Entry<String, String> changes :
-          mutation.getUpdates().entrySet()) {
+          pendingMutation.getUpdates().entrySet()) {
         if (changes.getValue() == null || changes.getValue().isEmpty()) {
         } else {
@@ -237,28 +199,24 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore {
-    updateBatch.put(bytes(LOG_COMMITTED_TXN), bytes(String.valueOf(id)));
-    // Assumes logMutation and confirmMutation are done in the same
-    // synchronized method. For example,
-    // {@link MutableCSConfigurationProvider#mutateConfiguration(
-    // UserGroupInformation user, SchedConfUpdateInfo confUpdate)}
-    pendingMutations.removeFirst();
-    return true;
+    pendingMutation = null;
-  private byte[] serLogMutation(LogMutation mutation) throws IOException {
+  private byte[] serLogMutations(LinkedList<LogMutation> mutations) throws
+      IOException {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     try (ObjectOutput oos = new ObjectOutputStream(baos)) {
-      oos.writeObject(mutation);
+      oos.writeObject(mutations);
       return baos.toByteArray();
-  private LogMutation deserLogMutation(byte[] mutation) throws IOException {
+  private LinkedList<LogMutation> deserLogMutations(byte[] mutations) throws
+      IOException {
     try (ObjectInput input = new ObjectInputStream(
-        new ByteArrayInputStream(mutation))) {
-      return (LogMutation) input.readObject();
+        new ByteArrayInputStream(mutations))) {
+      return (LinkedList<LogMutation>) input.readObject();
     } catch (ClassNotFoundException e) {
       throw new IOException(e);
@@ -267,7 +225,7 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore {
   public synchronized Configuration retrieve() {
     DBIterator itr = db.iterator();
     Configuration config = new Configuration(false);;
     while (itr.hasNext()) {
@@ -279,11 +237,6 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore {
-  public List<LogMutation> getPendingMutations() {
-    return new LinkedList<>(pendingMutations);
-  }
-  @Override
   public List<LogMutation> getConfirmedConfHistory(long fromId) {
     return null; // unimplemented
@@ -299,6 +252,39 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore {
+  // TODO: following is taken from LeveldbRMStateStore
+  @Override
+  public Version getConfStoreVersion() throws Exception {
+    Version version = null;
+    try {
+      byte[] data = db.get(bytes(VERSION_KEY));
+      if (data != null) {
+        version = new VersionPBImpl(YarnServerCommonProtos.VersionProto
+            .parseFrom(data));
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+    return version;
+  }
+  @Override
+  public void storeVersion() throws Exception {
+    String key = VERSION_KEY;
+    byte[] data = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto()
+        .toByteArray();
+    try {
+      db.put(bytes(key), data);
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+  @Override
+  public Version getCurrentVersion() {
+  }
   private class CompactionTimerTask extends TimerTask {
     public void run() {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/
index d03b2e2..70d1840 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/
@@ -18,20 +18,17 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationACLPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationACLPolicyFactory;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
 import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
@@ -56,6 +53,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
   private Configuration schedConf;
+  private Configuration oldConf;
   private YarnConfigurationStore confStore;
   private ConfigurationMutationACLPolicy aclMutationPolicy;
   private RMContext rmContext;
@@ -76,6 +74,9 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
     case YarnConfiguration.LEVELDB_CONFIGURATION_STORE:
       this.confStore = new LeveldbConfigurationStore();
+    case YarnConfiguration.ZK_CONFIGURATION_STORE:
+      this.confStore = new ZKConfigurationStore();
+      break;
       this.confStore = YarnConfigurationStoreFactory.getStore(config);
@@ -89,7 +90,11 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
     for (Map.Entry<String, String> kv : initialSchedConf) {
       schedConf.set(kv.getKey(), kv.getValue());
-    confStore.initialize(config, schedConf);
+    try {
+      confStore.initialize(config, schedConf, rmContext);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
     // After initializing confStore, the store may already have an existing
     // configuration. Use this one.
     schedConf = confStore.retrieve();
@@ -98,6 +103,11 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
     aclMutationPolicy.init(config, rmContext);
+  @VisibleForTesting
+  public YarnConfigurationStore getConfStore() {
+    return confStore;
+  }
   public CapacitySchedulerConfiguration loadConfiguration(Configuration
       configuration) throws IOException {
@@ -107,16 +117,17 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
-  public synchronized void mutateConfiguration(UserGroupInformation user,
-      SchedConfUpdateInfo confUpdate) throws IOException, YarnException {
-    if (!aclMutationPolicy.isMutationAllowed(user, confUpdate)) {
-      throw new AccessControlException("User is not admin of all modified" +
-          " queues.");
-    }
-    Configuration oldConf = new Configuration(schedConf);
+  public ConfigurationMutationACLPolicy getAclMutationPolicy() {
+    return aclMutationPolicy;
+  }
+  @Override
+  public void logAndApplyMutation(UserGroupInformation user,
+      SchedConfUpdateInfo confUpdate) throws Exception {
+    oldConf = new Configuration(schedConf);
     Map<String, String> kvUpdate = constructKeyValueConfUpdate(confUpdate);
     LogMutation log = new LogMutation(kvUpdate, user.getShortUserName());
-    long id = confStore.logMutation(log);
+    confStore.logMutation(log);
     for (Map.Entry<String, String> kv : kvUpdate.entrySet()) {
       if (kv.getValue() == null) {
@@ -124,47 +135,33 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
         schedConf.set(kv.getKey(), kv.getValue());
-    try {
-      rmContext.getRMAdminService().refreshQueues();
-    } catch (IOException | YarnException e) {
+  }
+  @Override
+  public void confirmPendingMutation(boolean isValid) throws Exception {
+    confStore.confirmMutation(isValid);
+    if (!isValid) {
       schedConf = oldConf;
-      confStore.confirmMutation(id, false);
-      throw e;
-    confStore.confirmMutation(id, true);
-  public void recoverConf() throws IOException {
-    List<LogMutation> uncommittedLogs = confStore.getPendingMutations();
-    Configuration oldConf = new Configuration(schedConf);
-    for (LogMutation mutation : uncommittedLogs) {
-      for (Map.Entry<String, String> kv : mutation.getUpdates().entrySet()) {
-        if (kv.getValue() == null) {
-          schedConf.unset(kv.getKey());
-        } else {
-          schedConf.set(kv.getKey(), kv.getValue());
-        }
-      }
-      try {
-        rmContext.getScheduler().reinitialize(schedConf, rmContext);
-      } catch (IOException e) {
-        schedConf = oldConf;
-        confStore.confirmMutation(mutation.getId(), false);
-"Configuration mutation " + mutation.getId()
-            + " was rejected", e);
-        continue;
-      }
-      confStore.confirmMutation(mutation.getId(), true);
-"Configuration mutation " + mutation.getId()+ " was accepted");
-    }
+  public void reloadConfigurationFromStore() throws Exception {
+    schedConf = confStore.retrieve();
+  }
+  private List<String> getSiblingQueues(String queuePath, Configuration conf) {
+    String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.'));
+    String childQueuesKey = CapacitySchedulerConfiguration.PREFIX +
+        parentQueue + CapacitySchedulerConfiguration.DOT +
+        CapacitySchedulerConfiguration.QUEUES;
+    return new ArrayList<>(conf.getStringCollection(childQueuesKey));
   private Map<String, String> constructKeyValueConfUpdate(
       SchedConfUpdateInfo mutationInfo) throws IOException {
-    CapacityScheduler cs = (CapacityScheduler) rmContext.getScheduler();
     CapacitySchedulerConfiguration proposedConf =
-        new CapacitySchedulerConfiguration(cs.getConfiguration(), false);
+        new CapacitySchedulerConfiguration(schedConf, false);
     Map<String, String> confUpdate = new HashMap<>();
     for (String queueToRemove : mutationInfo.getRemoveQueueInfo()) {
       removeQueue(queueToRemove, proposedConf, confUpdate);
@@ -188,40 +185,35 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
     if (queueToRemove == null) {
     } else {
-      CapacityScheduler cs = (CapacityScheduler) rmContext.getScheduler();
       String queueName = queueToRemove.substring(
           queueToRemove.lastIndexOf('.') + 1);
-      CSQueue queue = cs.getQueue(queueName);
-      if (queue == null ||
-          !queue.getQueuePath().equals(queueToRemove)) {
-        throw new IOException("Queue " + queueToRemove + " not found");
-      } else if (queueToRemove.lastIndexOf('.') == -1) {
+      if (queueToRemove.lastIndexOf('.') == -1) {
         throw new IOException("Can't remove queue " + queueToRemove);
-      }
-      String parentQueuePath = queueToRemove.substring(0, queueToRemove
-          .lastIndexOf('.'));
-      String[] siblingQueues = proposedConf.getQueues(parentQueuePath);
-      List<String> newSiblingQueues = new ArrayList<>();
-      for (String siblingQueue : siblingQueues) {
-        if (!siblingQueue.equals(queueName)) {
-          newSiblingQueues.add(siblingQueue);
-        }
-      }
-      proposedConf.setQueues(parentQueuePath, newSiblingQueues
-          .toArray(new String[0]));
-      String queuesConfig = CapacitySchedulerConfiguration.PREFIX
-          + parentQueuePath + CapacitySchedulerConfiguration.DOT
-          + CapacitySchedulerConfiguration.QUEUES;
-      if (newSiblingQueues.size() == 0) {
-        confUpdate.put(queuesConfig, null);
       } else {
-        confUpdate.put(queuesConfig, Joiner.on(',').join(newSiblingQueues));
-      }
-      for (Map.Entry<String, String> confRemove : proposedConf.getValByRegex(
-          ".*" + queueToRemove.replaceAll("\\.", "\\.") + "\\..*")
-          .entrySet()) {
-        proposedConf.unset(confRemove.getKey());
-        confUpdate.put(confRemove.getKey(), null);
+        List<String> siblingQueues = getSiblingQueues(queueToRemove,
+            proposedConf);
+        if (!siblingQueues.contains(queueName)) {
+          throw new IOException("Queue " + queueToRemove + " not found");
+        }
+        siblingQueues.remove(queueName);
+        String parentQueuePath = queueToRemove.substring(0, queueToRemove
+            .lastIndexOf('.'));
+        proposedConf.setQueues(parentQueuePath, siblingQueues.toArray(
+            new String[0]));
+        String queuesConfig = CapacitySchedulerConfiguration.PREFIX
+            + parentQueuePath + CapacitySchedulerConfiguration.DOT
+            + CapacitySchedulerConfiguration.QUEUES;
+        if (siblingQueues.size() == 0) {
+          confUpdate.put(queuesConfig, null);
+        } else {
+          confUpdate.put(queuesConfig, Joiner.on(',').join(siblingQueues));
+        }
+        for (Map.Entry<String, String> confRemove : proposedConf.getValByRegex(
+            ".*" + queueToRemove.replaceAll("\\.", "\\.") + "\\..*")
+            .entrySet()) {
+          proposedConf.unset(confRemove.getKey());
+          confUpdate.put(confRemove.getKey(), null);
+        }
@@ -232,13 +224,13 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
     if (addInfo == null) {
     } else {
-      CapacityScheduler cs = (CapacityScheduler) rmContext.getScheduler();
       String queuePath = addInfo.getQueue();
       String queueName = queuePath.substring(queuePath.lastIndexOf('.') + 1);
-      if (cs.getQueue(queueName) != null) {
-        throw new IOException("Can't add existing queue " + queuePath);
-      } else if (queuePath.lastIndexOf('.') == -1) {
+      if (queuePath.lastIndexOf('.') == -1) {
         throw new IOException("Can't add invalid queue " + queuePath);
+      } else if (getSiblingQueues(queuePath, proposedConf).contains(
+          queueName)) {
+        throw new IOException("Can't add existing queue " + queuePath);
       String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.'));
       String[] siblings = proposedConf.getQueues(parentQueue);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/
index 065c877..1356535 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/
@@ -18,7 +18,12 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateVersionIncompatibleException;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@@ -39,36 +44,26 @@ import java.util.Map;
  * {@code getPendingMutations}, and replay/confirm them via
  * {@code confirmMutation} as in the normal case.
-public interface YarnConfigurationStore {
+public abstract class YarnConfigurationStore {
+  public static final Log LOG =
+      LogFactory.getLog(YarnConfigurationStore.class);
    * LogMutation encapsulates the fields needed for configuration mutation
    * audit logging and recovery.
-  class LogMutation implements Serializable {
+  static class LogMutation implements Serializable {
     private Map<String, String> updates;
     private String user;
-    private long id;
-     * Create log mutation prior to logging.
+     * Create log mutation.
      * @param updates key-value configuration updates
      * @param user user who requested configuration change
-    public LogMutation(Map<String, String> updates, String user) {
-      this(updates, user, 0);
-    }
-    /**
-     * Create log mutation for recovery.
-     * @param updates key-value configuration updates
-     * @param user user who requested configuration change
-     * @param id transaction id of configuration change
-     */
-    LogMutation(Map<String, String> updates, String user, long id) {
+    LogMutation(Map<String, String> updates, String user) {
       this.updates = updates;
       this.user = user;
- = id;
@@ -86,75 +81,92 @@ public interface YarnConfigurationStore {
     public String getUser() {
       return user;
-    /**
-     * Get transaction id of this configuration change.
-     * @return transaction id
-     */
-    public long getId() {
-      return id;
-    }
-    /**
-     * Set transaction id of this configuration change.
-     * @param id transaction id
-     */
-    public void setId(long id) {
- = id;
-    }
-   * Initialize the configuration store.
+   * Initialize the configuration store, with schedConf as the initial
+   * scheduler configuration. If a persisted store already exists, use the
+   * scheduler configuration stored there, and ignore schedConf.
    * @param conf configuration to initialize store with
-   * @param schedConf Initial key-value configuration to persist
+   * @param schedConf Initial key-value scheduler configuration to persist.
+   * @param rmContext RMContext for this configuration store
    * @throws IOException if initialization fails
-  void initialize(Configuration conf, Configuration schedConf)
-      throws IOException;
+  public abstract void initialize(Configuration conf, Configuration schedConf,
+      RMContext rmContext) throws Exception;
-   * Logs the configuration change to backing store. Generates an id associated
-   * with this mutation, sets it in {@code logMutation}, and returns it.
+   * Logs the configuration change to backing store.
    * @param logMutation configuration change to be persisted in write ahead log
-   * @return id which configuration store associates with this mutation
    * @throws IOException if logging fails
-  long logMutation(LogMutation logMutation) throws IOException;
+  public abstract void logMutation(LogMutation logMutation) throws Exception;
    * Should be called after {@code logMutation}. Gets the pending mutation
-   * associated with {@code id} and marks the mutation as persisted (no longer
-   * pending). If isValid is true, merge the mutation with the persisted
+   * last logged by {@code logMutation} and marks the mutation as persisted (no
+   * longer pending). If isValid is true, merge the mutation with the persisted
    * configuration.
-   *
-   * If {@code confirmMutation} is called with ids in a different order than
-   * was returned by {@code logMutation}, the result is implementation
-   * dependent.
-   * @param id id of mutation to be confirmed
-   * @param isValid if true, update persisted configuration with mutation
-   *                associated with {@code id}.
-   * @return true on success
-   * @throws IOException if mutation confirmation fails
+   * @param isValid if true, update persisted configuration with pending
+   *                mutation.
+   * @throws Exception if mutation confirmation fails
-  boolean confirmMutation(long id, boolean isValid) throws IOException;
+  public abstract void confirmMutation(boolean isValid) throws Exception;
    * Retrieve the persisted configuration.
    * @return configuration as key-value
-  Configuration retrieve();
-  /**
-   * Get the list of pending mutations, in the order they were logged.
-   * @return list of mutations
-   */
-  List<LogMutation> getPendingMutations();
+  public abstract Configuration retrieve();
    * Get a list of confirmed configuration mutations starting from a given id.
    * @param fromId id from which to start getting mutations, inclusive
    * @return list of configuration mutations
-  List<LogMutation> getConfirmedConfHistory(long fromId);
+  public abstract List<LogMutation> getConfirmedConfHistory(long fromId);
+  /**
+   * Get schema version of persisted conf store, for detecting compatibility
+   * issues when changing conf store schema.
+   * @return Schema version currently used by the persisted configuration store.
+   * @throws Exception On version fetch failure
+   */
+  protected abstract Version getConfStoreVersion() throws Exception;
+  /**
+   * Persist the hard-coded schema version to the conf store.
+   * @throws Exception On storage failure
+   */
+  protected abstract void storeVersion() throws Exception;
+  /**
+   * Get the hard-coded schema version, for comparison against the schema
+   * version currently persisted.
+   * @return Current hard-coded schema version
+   */
+  protected abstract Version getCurrentVersion();
+  public void checkVersion() throws Exception {
+    // TODO this was taken from RMStateStore. Should probably refactor
+    Version loadedVersion = getConfStoreVersion();
+"Loaded configuration store version info " + loadedVersion);
+    if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) {
+      return;
+    }
+    // if there is no version info, treat it as CURRENT_VERSION_INFO;
+    if (loadedVersion == null) {
+      loadedVersion = getCurrentVersion();
+    }
+    if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
+"Storing configuration store version info "
+          + getCurrentVersion());
+      storeVersion();
+    } else {
+      throw new RMStateVersionIncompatibleException(
+          "Expecting configuration store version " + getCurrentVersion()
+              + ", but loading version " + loadedVersion);
+    }
+  }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/
new file mode 100644
index 0000000..a0bba8c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/
@@ -0,0 +1,235 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.curator.ZKCuratorManager;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.zookeeper.CreateMode;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+ * A Zookeeper-based implementation of {@link YarnConfigurationStore}.
+ */
+public class ZKConfigurationStore extends YarnConfigurationStore {
+  public static final Log LOG =
+      LogFactory.getLog(ZKConfigurationStore.class);
+  private long maxLogs;
+  @VisibleForTesting
+  protected static final Version CURRENT_VERSION_INFO = Version
+      .newInstance(0, 1);
+  private Configuration conf;
+  private LogMutation pendingMutation;
+  private String znodeParentPath;
+  private static final String ZK_VERSION_PATH = "VERSION";
+  private static final String LOGS_PATH = "LOGS";
+  private static final String CONF_STORE_PATH = "CONF_STORE";
+  private static final String FENCING_PATH = "FENCING";
+  private String zkVersionPath;
+  private String logsPath;
+  private String confStorePath;
+  private String fencingNodePath;
+  @VisibleForTesting
+  protected ZKCuratorManager zkManager;
+  private List<ACL> zkAcl;
+  @Override
+  public void initialize(Configuration config, Configuration schedConf,
+      RMContext rmContext) throws Exception {
+    this.conf = config;
+    this.maxLogs = conf.getLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS,
+        YarnConfiguration.DEFAULT_RM_SCHEDCONF_ZK_MAX_LOGS);
+    this.znodeParentPath =
+        conf.get(YarnConfiguration.RM_SCHEDCONF_STORE_ZK_PARENT_PATH,
+            YarnConfiguration.DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH);
+    this.zkManager = rmContext.getResourceManager().getAndStartZKManager(conf);
+    this.zkAcl = ZKCuratorManager.getZKAcls(conf);
+    this.zkVersionPath = getNodePath(znodeParentPath, ZK_VERSION_PATH);
+    this.logsPath = getNodePath(znodeParentPath, LOGS_PATH);
+    this.confStorePath = getNodePath(znodeParentPath, CONF_STORE_PATH);
+    this.fencingNodePath = getNodePath(znodeParentPath, FENCING_PATH);
+    zkManager.createRootDirRecursively(znodeParentPath);
+    zkManager.delete(fencingNodePath);
+    if (!zkManager.exists(logsPath)) {
+      zkManager.create(logsPath);
+      zkManager.setData(logsPath,
+          serializeObject(new LinkedList<LogMutation>()), -1);
+    }
+    if (!zkManager.exists(confStorePath)) {
+      zkManager.create(confStorePath);
+      HashMap<String, String> mapSchedConf = new HashMap<>();
+      for (Map.Entry<String, String> entry : schedConf) {
+        mapSchedConf.put(entry.getKey(), entry.getValue());
+      }
+      zkManager.setData(confStorePath, serializeObject(mapSchedConf), -1);
+    }
+  }
+  @VisibleForTesting
+  protected LinkedList<LogMutation> getLogs() throws Exception {
+    return (LinkedList<LogMutation>)
+        deserializeObject(zkManager.getData(logsPath));
+  }
+  // TODO: following version-related code is taken from ZKRMStateStore
+  @Override
+  public Version getCurrentVersion() {
+  }
+  @Override
+  public Version getConfStoreVersion() throws Exception {
+    if (zkManager.exists(zkVersionPath)) {
+      byte[] data = zkManager.getData(zkVersionPath);
+      return new VersionPBImpl(YarnServerCommonProtos.VersionProto
+          .parseFrom(data));
+    }
+    return null;
+  }
+  @Override
+  public synchronized void storeVersion() throws Exception {
+    byte[] data =
+        ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
+    if (zkManager.exists(zkVersionPath)) {
+      zkManager.safeSetData(zkVersionPath, data, -1, zkAcl, fencingNodePath);
+    } else {
+      zkManager.safeCreate(zkVersionPath, data, zkAcl, CreateMode.PERSISTENT,
+          zkAcl, fencingNodePath);
+    }
+  }
+  @Override
+  public void logMutation(LogMutation logMutation) throws Exception {
+    byte[] storedLogs = zkManager.getData(logsPath);
+    LinkedList<LogMutation> logs = new LinkedList<>();
+    if (storedLogs != null) {
+      logs = (LinkedList<LogMutation>) deserializeObject(storedLogs);
+    }
+    logs.add(logMutation);
+    if (logs.size() > maxLogs) {
+      logs.remove(logs.removeFirst());
+    }
+    zkManager.safeSetData(logsPath, serializeObject(logs), -1, zkAcl,
+        fencingNodePath);
+    pendingMutation = logMutation;
+  }
+  @Override
+  public void confirmMutation(boolean isValid)
+      throws Exception {
+    if (isValid) {
+      Configuration storedConfigs = retrieve();
+      Map<String, String> mapConf = new HashMap<>();
+      for (Map.Entry<String, String> storedConf : storedConfigs) {
+        mapConf.put(storedConf.getKey(), storedConf.getValue());
+      }
+      for (Map.Entry<String, String> confChange :
+          pendingMutation.getUpdates().entrySet()) {
+        if (confChange.getValue() == null || confChange.getValue().isEmpty()) {
+          mapConf.remove(confChange.getKey());
+        } else {
+          mapConf.put(confChange.getKey(), confChange.getValue());
+        }
+      }
+      zkManager.safeSetData(confStorePath, serializeObject(mapConf), -1,
+          zkAcl, fencingNodePath);
+    }
+    pendingMutation = null;
+  }
+  @Override
+  public synchronized Configuration retrieve() {
+    byte[] serializedSchedConf;
+    try {
+      serializedSchedConf = zkManager.getData(confStorePath);
+    } catch (Exception e) {
+      LOG.error("Failed to retrieve configuration from zookeeper store", e);
+      return null;
+    }
+    try {
+      Map<String, String> map =
+          (HashMap<String, String>) deserializeObject(serializedSchedConf);
+      Configuration c = new Configuration();
+      for (Map.Entry<String, String> e : map.entrySet()) {
+        c.set(e.getKey(), e.getValue());
+      }
+      return c;
+    } catch (Exception e) {
+      LOG.error("Exception while deserializing scheduler configuration " +
+          "from store", e);
+    }
+    return null;
+  }
+  @Override
+  public List<LogMutation> getConfirmedConfHistory(long fromId) {
+    return null; // unimplemented
+  }
+  private static String getNodePath(String root, String nodeName) {
+    return ZKCuratorManager.getNodePath(root, nodeName);
+  }
+  private static byte[] serializeObject(Object o) throws Exception {
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(baos);) {
+      oos.writeObject(o);
+      oos.flush();
+      baos.flush();
+      return baos.toByteArray();
+    }
+  }
+  private static Object deserializeObject(byte[] bytes) throws Exception {
+    try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+        ObjectInputStream ois = new ObjectInputStream(bais);) {
+      return ois.readObject();
+    }
+  }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/
index 516b6293..45210a4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/
@@ -129,6 +129,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -2414,7 +2415,7 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
   @Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
       MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
   @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
-  public Response updateSchedulerConfiguration(SchedConfUpdateInfo
+  public synchronized Response updateSchedulerConfiguration(SchedConfUpdateInfo
       mutationInfo, @Context HttpServletRequest hsr)
       throws AuthorizationException, InterruptedException {
@@ -2429,17 +2430,32 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
     ResourceScheduler scheduler = rm.getResourceScheduler();
-    if (scheduler instanceof MutableConfScheduler) {
+    if (scheduler instanceof MutableConfScheduler && ((MutableConfScheduler)
+        scheduler).isConfigurationMutable()) {
       try {
         callerUGI.doAs(new PrivilegedExceptionAction<Void>() {
-          public Void run() throws IOException, YarnException {
-            ((MutableConfScheduler) scheduler).updateConfiguration(callerUGI,
-                mutationInfo);
+          public Void run() throws Exception {
+            MutableConfigurationProvider provider = ((MutableConfScheduler)
+                scheduler).getMutableConfProvider();
+            if (!provider.getAclMutationPolicy().isMutationAllowed(callerUGI,
+                mutationInfo)) {
+              throw new"User"
+                  + " is not admin of all modified queues.");
+            }
+            provider.logAndApplyMutation(callerUGI, mutationInfo);
+            try {
+              rm.getRMContext().getRMAdminService().refreshQueues();
+            } catch (IOException | YarnException e) {
+              provider.confirmPendingMutation(false);
+              throw e;
+            }
+            provider.confirmPendingMutation(true);
             return null;
       } catch (IOException e) {
+        LOG.error("Exception thrown when modifying configuration.", e);
         return Response.status(Status.BAD_REQUEST).entity(e.getMessage())
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/
new file mode 100644
index 0000000..bbe9570
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/
@@ -0,0 +1,90 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ *
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+ * Base class for {@link YarnConfigurationStore} implementations.
+ */
+public abstract class ConfigurationStoreBaseTest {
+  protected YarnConfigurationStore confStore = createConfStore();
+  protected abstract YarnConfigurationStore createConfStore();
+  protected Configuration conf;
+  protected Configuration schedConf;
+  protected RMContext rmContext;
+  protected static final String TEST_USER = "testUser";
+  @Before
+  public void setUp() throws Exception {
+    this.conf = new Configuration();
+    this.schedConf = new Configuration(false);
+  }
+  @Test
+  public void testConfigurationUpdate() throws Exception {
+    schedConf.set("key1", "val1");
+    confStore.initialize(conf, schedConf, rmContext);
+    assertEquals("val1", confStore.retrieve().get("key1"));
+    Map<String, String> update1 = new HashMap<>();
+    update1.put("keyUpdate1", "valUpdate1");
+    YarnConfigurationStore.LogMutation mutation1 =
+        new YarnConfigurationStore.LogMutation(update1, TEST_USER);
+    confStore.logMutation(mutation1);
+    confStore.confirmMutation(true);
+    assertEquals("valUpdate1", confStore.retrieve().get("keyUpdate1"));
+    Map<String, String> update2 = new HashMap<>();
+    update2.put("keyUpdate2", "valUpdate2");
+    YarnConfigurationStore.LogMutation mutation2 =
+        new YarnConfigurationStore.LogMutation(update2, TEST_USER);
+    confStore.logMutation(mutation2);
+    confStore.confirmMutation(false);
+    assertNull("Configuration should not be updated",
+        confStore.retrieve().get("keyUpdate2"));
+  }
+  @Test
+  public void testNullConfigurationUpdate() throws Exception {
+    schedConf.set("key", "val");
+    confStore.initialize(conf, schedConf, rmContext);
+    assertEquals("val", confStore.retrieve().get("key"));
+    Map<String, String> update = new HashMap<>();
+    update.put("key", null);
+    YarnConfigurationStore.LogMutation mutation =
+        new YarnConfigurationStore.LogMutation(update, TEST_USER);
+    confStore.logMutation(mutation);
+    confStore.confirmMutation(true);
+    assertNull(confStore.retrieve().get("key"));
+  }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/
new file mode 100644
index 0000000..c40d16a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/
@@ -0,0 +1,30 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+ * Tests {@link InMemoryConfigurationStore}.
+ */
+public class TestInMemoryConfigurationStore extends ConfigurationStoreBaseTest {
+  @Override
+  protected YarnConfigurationStore createConfStore() {
+    return new InMemoryConfigurationStore();
+  }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/
index 635a184..9b080cd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@@ -30,14 +29,11 @@ import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
 import org.junit.Before;
 import org.junit.Test;
 import java.util.HashMap;
 import java.util.Map;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -82,25 +78,21 @@ public class TestMutableCSConfigurationProvider {
-  public void testInMemoryBackedProvider() throws IOException, YarnException {
+  public void testInMemoryBackedProvider() throws Exception {
     Configuration conf = new Configuration();
-    doNothing().when(adminService).refreshQueues();
-    confProvider.mutateConfiguration(TEST_USER, goodUpdate);
+    confProvider.logAndApplyMutation(TEST_USER, goodUpdate);
+    confProvider.confirmPendingMutation(true);
     assertEquals("goodVal", confProvider.loadConfiguration(conf)
-    doThrow(new IOException()).when(adminService).refreshQueues();
-    try {
-      confProvider.mutateConfiguration(TEST_USER, badUpdate);
-    } catch (IOException e) {
-      // Expected exception.
-    }
+    confProvider.logAndApplyMutation(TEST_USER, badUpdate);
+    confProvider.confirmPendingMutation(false);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/
deleted file mode 100644
index 631ce65..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/
+++ /dev/null
@@ -1,71 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
-import org.junit.Before;
-import org.junit.Test;
-import java.util.HashMap;
-import java.util.Map;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-public class TestYarnConfigurationStore {
-  private YarnConfigurationStore confStore;
-  private Configuration schedConf;
-  private static final String testUser = "testUser";
-  @Before
-  public void setUp() {
-    schedConf = new Configuration(false);
-    schedConf.set("key1", "val1");
-  }
-  @Test
-  public void testInMemoryConfigurationStore() throws IOException {
-    confStore = new InMemoryConfigurationStore();
-    confStore.initialize(new Configuration(), schedConf);
-    assertEquals("val1", confStore.retrieve().get("key1"));
-    Map<String, String> update1 = new HashMap<>();
-    update1.put("keyUpdate1", "valUpdate1");
-    LogMutation mutation1 = new LogMutation(update1, testUser);
-    long id = confStore.logMutation(mutation1);
-    assertEquals(1, confStore.getPendingMutations().size());
-    confStore.confirmMutation(id, true);
-    assertEquals("valUpdate1", confStore.retrieve().get("keyUpdate1"));
-    assertEquals(0, confStore.getPendingMutations().size());
-    Map<String, String> update2 = new HashMap<>();
-    update2.put("keyUpdate2", "valUpdate2");
-    LogMutation mutation2 = new LogMutation(update2, testUser);
-    id = confStore.logMutation(mutation2);
-    assertEquals(1, confStore.getPendingMutations().size());
-    confStore.confirmMutation(id, false);
-    assertNull("Configuration should not be updated",
-        confStore.retrieve().get("keyUpdate2"));
-    assertEquals(0, confStore.getPendingMutations().size());
-  }

To unsubscribe, e-mail:
For additional commands, e-mail: