You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jh...@apache.org on 2017/09/28 02:55:38 UTC

[47/50] [abbrv] hadoop git commit: YARN-7046. Add closing logic to configuration store

YARN-7046. Add closing logic to configuration store


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c7948fe5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c7948fe5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c7948fe5

Branch: refs/heads/YARN-5734
Commit: c7948fe56c46526fa1cf934444d7d193cfe7398f
Parents: 76746cb
Author: Jonathan Hung <jh...@linkedin.com>
Authored: Fri Sep 22 10:18:27 2017 -0700
Committer: Jonathan Hung <jh...@linkedin.com>
Committed: Wed Sep 27 19:52:59 2017 -0700

----------------------------------------------------------------------
 .../scheduler/MutableConfigurationProvider.java |   8 +
 .../scheduler/capacity/CapacityScheduler.java   |   3 +
 .../conf/LeveldbConfigurationStore.java         |  44 ++--
 .../conf/MutableCSConfigurationProvider.java    |   6 +
 .../capacity/conf/YarnConfigurationStore.java   |   6 +
 .../conf/ConfigurationStoreBaseTest.java        |   2 +
 .../conf/TestLeveldbConfigurationStore.java     | 213 +++++++++++++++++++
 7 files changed, 270 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7948fe5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
index f8e8814..2b9b25a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
 
+import java.io.IOException;
+
 /**
  * Interface for allowing changing scheduler configurations.
  */
@@ -55,4 +57,10 @@ public interface MutableConfigurationProvider {
    * @throws Exception if confirming mutation fails
    */
   void confirmPendingMutation(boolean isValid) throws Exception;
+
+  /**
+   * Closes the configuration provider, releasing any required resources.
+   * @throws IOException on failure to close
+   */
+  void close() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7948fe5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 16b27c1..de95179 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -411,6 +411,9 @@ public class CapacityScheduler extends
       writeLock.unlock();
     }
 
+    if (isConfigurationMutable()) {
+      ((MutableConfigurationProvider) csConfProvider).close();
+    }
     super.serviceStop();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7948fe5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
index 1b0eb9f..21de7a2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -72,7 +73,8 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
   private long maxLogs;
   private Configuration conf;
   private LogMutation pendingMutation;
-  private static final Version CURRENT_VERSION_INFO = Version
+  @VisibleForTesting
+  protected static final Version CURRENT_VERSION_INFO = Version
       .newInstance(0, 1);
   private Timer compactionTimer;
   private long compactionIntervalMsec;
@@ -82,7 +84,7 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
       RMContext rmContext) throws IOException {
     this.conf = config;
     try {
-      this.db = initDatabase(schedConf);
+      initDatabase(schedConf);
       this.maxLogs = config.getLong(
           YarnConfiguration.RM_SCHEDCONF_MAX_LOGS,
           YarnConfiguration.DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS);
@@ -96,7 +98,7 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
     }
   }
 
-  private DB initDatabase(Configuration config) throws Exception {
+  private void initDatabase(Configuration config) throws Exception {
     Path storeRoot = createStorageDir();
     Options options = new Options();
     options.createIfMissing(false);
@@ -108,13 +110,13 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
         if (key1Str.equals(key2Str)) {
           return 0;
         } else if (key1Str.equals(VERSION_KEY)) {
-          return -1;
-        } else if (key2Str.equals(VERSION_KEY)) {
           return 1;
-        } else if (key1Str.equals(LOG_KEY)) {
+        } else if (key2Str.equals(VERSION_KEY)) {
           return -1;
-        } else if (key2Str.equals(LOG_KEY)) {
+        } else if (key1Str.equals(LOG_KEY)) {
           return 1;
+        } else if (key2Str.equals(LOG_KEY)) {
+          return -1;
         }
         return key1Str.compareTo(key2Str);
       }
@@ -156,7 +158,6 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
         throw e;
       }
     }
-    return db;
   }
 
   private Path createStorageDir() throws IOException {
@@ -176,6 +177,13 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
   }
 
   @Override
+  public void close() throws IOException {
+    if (db != null) {
+      db.close();
+    }
+  }
+
+  @Override
   public void logMutation(LogMutation logMutation) throws IOException {
     LinkedList<LogMutation> logs = deserLogMutations(db.get(bytes(LOG_KEY)));
     logs.add(logMutation);
@@ -212,8 +220,12 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
       return baos.toByteArray();
     }
   }
+
   private LinkedList<LogMutation> deserLogMutations(byte[] mutations) throws
       IOException {
+    if (mutations == null) {
+      return new LinkedList<>();
+    }
     try (ObjectInput input = new ObjectInputStream(
         new ByteArrayInputStream(mutations))) {
       return (LinkedList<LogMutation>) input.readObject();
@@ -225,13 +237,16 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
   @Override
   public synchronized Configuration retrieve() {
     DBIterator itr = db.iterator();
-    itr.seek(bytes(LOG_KEY));
+    itr.seekToFirst();
     Configuration config = new Configuration(false);
-    itr.next();
     while (itr.hasNext()) {
       Map.Entry<byte[], byte[]> entry = itr.next();
-      config.set(new String(entry.getKey(), StandardCharsets.UTF_8),
-          new String(entry.getValue(), StandardCharsets.UTF_8));
+      String key = new String(entry.getKey(), StandardCharsets.UTF_8);
+      String value = new String(entry.getValue(), StandardCharsets.UTF_8);
+      if (key.equals(LOG_KEY) || key.equals(VERSION_KEY)) {
+        break;
+      }
+      config.set(key, value);
     }
     return config;
   }
@@ -268,6 +283,11 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
     return version;
   }
 
+  @VisibleForTesting
+  protected LinkedList<LogMutation> getLogs() throws Exception {
+    return deserLogMutations(db.get(bytes(LOG_KEY)));
+  }
+
   @Override
   public void storeVersion() throws Exception {
     String key = VERSION_KEY;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7948fe5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
index 70d1840..ccadf76 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
@@ -92,6 +92,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
     }
     try {
       confStore.initialize(config, schedConf, rmContext);
+      confStore.checkVersion();
     } catch (Exception e) {
       throw new IOException(e);
     }
@@ -103,6 +104,11 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
     aclMutationPolicy.init(config, rmContext);
   }
 
+  @Override
+  public void close() throws IOException {
+    confStore.close();
+  }
+
   @VisibleForTesting
   public YarnConfigurationStore getConfStore() {
     return confStore;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7948fe5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
index 1356535..7fb52fc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
@@ -96,6 +96,12 @@ public abstract class YarnConfigurationStore {
       RMContext rmContext) throws Exception;
 
   /**
+   * Closes the configuration store, releasing any required resources.
+   * @throws IOException on failure to close
+   */
+  public void close() throws IOException {}
+
+  /**
    * Logs the configuration change to backing store.
    * @param logMutation configuration change to be persisted in write ahead log
    * @throws IOException if logging fails

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7948fe5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java
index bbe9570..8f3bc71 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java
@@ -71,6 +71,7 @@ public abstract class ConfigurationStoreBaseTest {
     confStore.confirmMutation(false);
     assertNull("Configuration should not be updated",
         confStore.retrieve().get("keyUpdate2"));
+    confStore.close();
   }
 
   @Test
@@ -86,5 +87,6 @@ public abstract class ConfigurationStoreBaseTest {
     confStore.logMutation(mutation);
     confStore.confirmMutation(true);
     assertNull(confStore.retrieve().get("key"));
+    confStore.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7948fe5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java
new file mode 100644
index 0000000..779208a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Tests {@link LeveldbConfigurationStore}.
+ */
+public class TestLeveldbConfigurationStore extends ConfigurationStoreBaseTest {
+
+  public static final Log LOG =
+      LogFactory.getLog(TestLeveldbConfigurationStore.class);
+  private static final File TEST_DIR = new File(
+      System.getProperty("test.build.data",
+          System.getProperty("java.io.tmpdir")),
+      TestLeveldbConfigurationStore.class.getName());
+
+  private ResourceManager rm;
+
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    FileUtil.fullyDelete(TEST_DIR);
+    conf.set(CapacitySchedulerConfiguration.CS_CONF_PROVIDER,
+        CapacitySchedulerConfiguration.STORE_CS_CONF_PROVIDER);
+    conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
+        YarnConfiguration.LEVELDB_CONFIGURATION_STORE);
+    conf.set(YarnConfiguration.RM_SCHEDCONF_STORE_PATH, TEST_DIR.toString());
+  }
+
+  @Test
+  public void testVersioning() throws Exception {
+    confStore.initialize(conf, schedConf, rmContext);
+    assertNull(confStore.getConfStoreVersion());
+    confStore.checkVersion();
+    assertEquals(LeveldbConfigurationStore.CURRENT_VERSION_INFO,
+        confStore.getConfStoreVersion());
+    confStore.close();
+  }
+
+  @Test
+  public void testPersistConfiguration() throws Exception {
+    schedConf.set("key", "val");
+    confStore.initialize(conf, schedConf, rmContext);
+    assertEquals("val", confStore.retrieve().get("key"));
+    confStore.close();
+
+    // Create a new configuration store, and check for old configuration
+    confStore = createConfStore();
+    schedConf.set("key", "badVal");
+    // Should ignore passed-in scheduler configuration.
+    confStore.initialize(conf, schedConf, rmContext);
+    assertEquals("val", confStore.retrieve().get("key"));
+    confStore.close();
+  }
+
+  @Test
+  public void testPersistUpdatedConfiguration() throws Exception {
+    confStore.initialize(conf, schedConf, rmContext);
+    assertNull(confStore.retrieve().get("key"));
+
+    Map<String, String> update = new HashMap<>();
+    update.put("key", "val");
+    YarnConfigurationStore.LogMutation mutation =
+        new YarnConfigurationStore.LogMutation(update, TEST_USER);
+    confStore.logMutation(mutation);
+    confStore.confirmMutation(true);
+    assertEquals("val", confStore.retrieve().get("key"));
+    confStore.close();
+
+    // Create a new configuration store, and check for updated configuration
+    confStore = createConfStore();
+    schedConf.set("key", "badVal");
+    // Should ignore passed-in scheduler configuration.
+    confStore.initialize(conf, schedConf, rmContext);
+    assertEquals("val", confStore.retrieve().get("key"));
+    confStore.close();
+  }
+
+  @Test
+  public void testMaxLogs() throws Exception {
+    conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2);
+    confStore.initialize(conf, schedConf, rmContext);
+    LinkedList<YarnConfigurationStore.LogMutation> logs =
+        ((LeveldbConfigurationStore) confStore).getLogs();
+    assertEquals(0, logs.size());
+
+    Map<String, String> update1 = new HashMap<>();
+    update1.put("key1", "val1");
+    YarnConfigurationStore.LogMutation mutation =
+        new YarnConfigurationStore.LogMutation(update1, TEST_USER);
+    confStore.logMutation(mutation);
+    logs = ((LeveldbConfigurationStore) confStore).getLogs();
+    assertEquals(1, logs.size());
+    assertEquals("val1", logs.get(0).getUpdates().get("key1"));
+    confStore.confirmMutation(true);
+    assertEquals(1, logs.size());
+    assertEquals("val1", logs.get(0).getUpdates().get("key1"));
+
+    Map<String, String> update2 = new HashMap<>();
+    update2.put("key2", "val2");
+    mutation = new YarnConfigurationStore.LogMutation(update2, TEST_USER);
+    confStore.logMutation(mutation);
+    logs = ((LeveldbConfigurationStore) confStore).getLogs();
+    assertEquals(2, logs.size());
+    assertEquals("val1", logs.get(0).getUpdates().get("key1"));
+    assertEquals("val2", logs.get(1).getUpdates().get("key2"));
+    confStore.confirmMutation(true);
+    assertEquals(2, logs.size());
+    assertEquals("val1", logs.get(0).getUpdates().get("key1"));
+    assertEquals("val2", logs.get(1).getUpdates().get("key2"));
+
+    // Next update should purge first update from logs.
+    Map<String, String> update3 = new HashMap<>();
+    update3.put("key3", "val3");
+    mutation = new YarnConfigurationStore.LogMutation(update3, TEST_USER);
+    confStore.logMutation(mutation);
+    logs = ((LeveldbConfigurationStore) confStore).getLogs();
+    assertEquals(2, logs.size());
+    assertEquals("val2", logs.get(0).getUpdates().get("key2"));
+    assertEquals("val3", logs.get(1).getUpdates().get("key3"));
+    confStore.confirmMutation(true);
+    assertEquals(2, logs.size());
+    assertEquals("val2", logs.get(0).getUpdates().get("key2"));
+    assertEquals("val3", logs.get(1).getUpdates().get("key3"));
+    confStore.close();
+  }
+
+  /**
+   * When restarting, RM should read from current state of store, including
+   * any updates from the previous RM instance.
+   * @throws Exception
+   */
+  @Test
+  public void testRestartReadsFromUpdatedStore() throws Exception {
+    ResourceManager rm1 = new MockRM(conf);
+    rm1.start();
+    assertNull(((MutableConfScheduler) rm1.getResourceScheduler())
+        .getConfiguration().get("key"));
+
+    // Update configuration on RM
+    SchedConfUpdateInfo schedConfUpdateInfo = new SchedConfUpdateInfo();
+    schedConfUpdateInfo.getGlobalParams().put("key", "val");
+    MutableConfigurationProvider confProvider = ((MutableConfScheduler)
+        rm1.getResourceScheduler()).getMutableConfProvider();
+    UserGroupInformation user = UserGroupInformation
+        .createUserForTesting(TEST_USER, new String[0]);
+    confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+    rm1.getResourceScheduler().reinitialize(conf, rm1.getRMContext());
+    assertEquals("val", ((MutableConfScheduler) rm1.getResourceScheduler())
+        .getConfiguration().get("key"));
+    confProvider.confirmPendingMutation(true);
+    assertEquals("val", ((MutableCSConfigurationProvider) confProvider)
+        .getConfStore().retrieve().get("key"));
+    // Next update is not persisted, it should not be recovered
+    schedConfUpdateInfo.getGlobalParams().put("key", "badVal");
+    confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+    rm1.close();
+
+    // Start RM2 and verifies it starts with updated configuration
+    ResourceManager rm2 = new MockRM(conf);
+    rm2.start();
+    assertEquals("val", ((MutableCSConfigurationProvider) (
+        (CapacityScheduler) rm2.getResourceScheduler())
+        .getMutableConfProvider()).getConfStore().retrieve().get("key"));
+    assertEquals("val", ((MutableConfScheduler) rm2.getResourceScheduler())
+        .getConfiguration().get("key"));
+    rm2.close();
+  }
+
+  @Override
+  public YarnConfigurationStore createConfStore() {
+    return new LeveldbConfigurationStore();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org