You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jh...@apache.org on 2017/09/22 18:27:28 UTC
[29/30] hadoop git commit: YARN-7046. Add closing logic to
configuration store
YARN-7046. Add closing logic to configuration store
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/166be597
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/166be597
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/166be597
Branch: refs/heads/YARN-5734
Commit: 166be597baa5ac6b1e02bed9208a71e64be6fa04
Parents: bcd39a0
Author: Jonathan Hung <jh...@linkedin.com>
Authored: Fri Sep 22 10:18:27 2017 -0700
Committer: Jonathan Hung <jh...@linkedin.com>
Committed: Fri Sep 22 11:26:30 2017 -0700
----------------------------------------------------------------------
.../scheduler/MutableConfigurationProvider.java | 8 +
.../scheduler/capacity/CapacityScheduler.java | 3 +
.../conf/LeveldbConfigurationStore.java | 44 ++--
.../conf/MutableCSConfigurationProvider.java | 6 +
.../capacity/conf/YarnConfigurationStore.java | 6 +
.../conf/ConfigurationStoreBaseTest.java | 2 +
.../conf/TestLeveldbConfigurationStore.java | 213 +++++++++++++++++++
7 files changed, 270 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/166be597/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
index f8e8814..2b9b25a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
+import java.io.IOException;
+
/**
* Interface for allowing changing scheduler configurations.
*/
@@ -55,4 +57,10 @@ public interface MutableConfigurationProvider {
* @throws Exception if confirming mutation fails
*/
void confirmPendingMutation(boolean isValid) throws Exception;
+
+ /**
+ * Closes the configuration provider, releasing any required resources.
+ * @throws IOException on failure to close
+ */
+ void close() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/166be597/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 16b27c1..de95179 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -411,6 +411,9 @@ public class CapacityScheduler extends
writeLock.unlock();
}
+ if (isConfigurationMutable()) {
+ ((MutableConfigurationProvider) csConfProvider).close();
+ }
super.serviceStop();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/166be597/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
index 1b0eb9f..21de7a2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -72,7 +73,8 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
private long maxLogs;
private Configuration conf;
private LogMutation pendingMutation;
- private static final Version CURRENT_VERSION_INFO = Version
+ @VisibleForTesting
+ protected static final Version CURRENT_VERSION_INFO = Version
.newInstance(0, 1);
private Timer compactionTimer;
private long compactionIntervalMsec;
@@ -82,7 +84,7 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
RMContext rmContext) throws IOException {
this.conf = config;
try {
- this.db = initDatabase(schedConf);
+ initDatabase(schedConf);
this.maxLogs = config.getLong(
YarnConfiguration.RM_SCHEDCONF_MAX_LOGS,
YarnConfiguration.DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS);
@@ -96,7 +98,7 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
}
}
- private DB initDatabase(Configuration config) throws Exception {
+ private void initDatabase(Configuration config) throws Exception {
Path storeRoot = createStorageDir();
Options options = new Options();
options.createIfMissing(false);
@@ -108,13 +110,13 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
if (key1Str.equals(key2Str)) {
return 0;
} else if (key1Str.equals(VERSION_KEY)) {
- return -1;
- } else if (key2Str.equals(VERSION_KEY)) {
return 1;
- } else if (key1Str.equals(LOG_KEY)) {
+ } else if (key2Str.equals(VERSION_KEY)) {
return -1;
- } else if (key2Str.equals(LOG_KEY)) {
+ } else if (key1Str.equals(LOG_KEY)) {
return 1;
+ } else if (key2Str.equals(LOG_KEY)) {
+ return -1;
}
return key1Str.compareTo(key2Str);
}
@@ -156,7 +158,6 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
throw e;
}
}
- return db;
}
private Path createStorageDir() throws IOException {
@@ -176,6 +177,13 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
}
@Override
+ public void close() throws IOException {
+ if (db != null) {
+ db.close();
+ }
+ }
+
+ @Override
public void logMutation(LogMutation logMutation) throws IOException {
LinkedList<LogMutation> logs = deserLogMutations(db.get(bytes(LOG_KEY)));
logs.add(logMutation);
@@ -212,8 +220,12 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
return baos.toByteArray();
}
}
+
private LinkedList<LogMutation> deserLogMutations(byte[] mutations) throws
IOException {
+ if (mutations == null) {
+ return new LinkedList<>();
+ }
try (ObjectInput input = new ObjectInputStream(
new ByteArrayInputStream(mutations))) {
return (LinkedList<LogMutation>) input.readObject();
@@ -225,13 +237,16 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
@Override
public synchronized Configuration retrieve() {
DBIterator itr = db.iterator();
- itr.seek(bytes(LOG_KEY));
+ itr.seekToFirst();
Configuration config = new Configuration(false);
- itr.next();
while (itr.hasNext()) {
Map.Entry<byte[], byte[]> entry = itr.next();
- config.set(new String(entry.getKey(), StandardCharsets.UTF_8),
- new String(entry.getValue(), StandardCharsets.UTF_8));
+ String key = new String(entry.getKey(), StandardCharsets.UTF_8);
+ String value = new String(entry.getValue(), StandardCharsets.UTF_8);
+ if (key.equals(LOG_KEY) || key.equals(VERSION_KEY)) {
+ break;
+ }
+ config.set(key, value);
}
return config;
}
@@ -268,6 +283,11 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
return version;
}
+ @VisibleForTesting
+ protected LinkedList<LogMutation> getLogs() throws Exception {
+ return deserLogMutations(db.get(bytes(LOG_KEY)));
+ }
+
@Override
public void storeVersion() throws Exception {
String key = VERSION_KEY;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/166be597/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
index 70d1840..ccadf76 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
@@ -92,6 +92,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
}
try {
confStore.initialize(config, schedConf, rmContext);
+ confStore.checkVersion();
} catch (Exception e) {
throw new IOException(e);
}
@@ -103,6 +104,11 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
aclMutationPolicy.init(config, rmContext);
}
+ @Override
+ public void close() throws IOException {
+ confStore.close();
+ }
+
@VisibleForTesting
public YarnConfigurationStore getConfStore() {
return confStore;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/166be597/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
index 1356535..7fb52fc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
@@ -96,6 +96,12 @@ public abstract class YarnConfigurationStore {
RMContext rmContext) throws Exception;
/**
+ * Closes the configuration store, releasing any required resources.
+ * @throws IOException on failure to close
+ */
+ public void close() throws IOException {}
+
+ /**
* Logs the configuration change to backing store.
* @param logMutation configuration change to be persisted in write ahead log
* @throws IOException if logging fails
http://git-wip-us.apache.org/repos/asf/hadoop/blob/166be597/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java
index bbe9570..8f3bc71 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java
@@ -71,6 +71,7 @@ public abstract class ConfigurationStoreBaseTest {
confStore.confirmMutation(false);
assertNull("Configuration should not be updated",
confStore.retrieve().get("keyUpdate2"));
+ confStore.close();
}
@Test
@@ -86,5 +87,6 @@ public abstract class ConfigurationStoreBaseTest {
confStore.logMutation(mutation);
confStore.confirmMutation(true);
assertNull(confStore.retrieve().get("key"));
+ confStore.close();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/166be597/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java
new file mode 100644
index 0000000..779208a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Tests {@link LeveldbConfigurationStore}.
+ */
+public class TestLeveldbConfigurationStore extends ConfigurationStoreBaseTest {
+
+ public static final Log LOG =
+ LogFactory.getLog(TestLeveldbConfigurationStore.class);
+ private static final File TEST_DIR = new File(
+ System.getProperty("test.build.data",
+ System.getProperty("java.io.tmpdir")),
+ TestLeveldbConfigurationStore.class.getName());
+
+ private ResourceManager rm;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ FileUtil.fullyDelete(TEST_DIR);
+ conf.set(CapacitySchedulerConfiguration.CS_CONF_PROVIDER,
+ CapacitySchedulerConfiguration.STORE_CS_CONF_PROVIDER);
+ conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
+ YarnConfiguration.LEVELDB_CONFIGURATION_STORE);
+ conf.set(YarnConfiguration.RM_SCHEDCONF_STORE_PATH, TEST_DIR.toString());
+ }
+
+ @Test
+ public void testVersioning() throws Exception {
+ confStore.initialize(conf, schedConf, rmContext);
+ assertNull(confStore.getConfStoreVersion());
+ confStore.checkVersion();
+ assertEquals(LeveldbConfigurationStore.CURRENT_VERSION_INFO,
+ confStore.getConfStoreVersion());
+ confStore.close();
+ }
+
+ @Test
+ public void testPersistConfiguration() throws Exception {
+ schedConf.set("key", "val");
+ confStore.initialize(conf, schedConf, rmContext);
+ assertEquals("val", confStore.retrieve().get("key"));
+ confStore.close();
+
+ // Create a new configuration store, and check for old configuration
+ confStore = createConfStore();
+ schedConf.set("key", "badVal");
+ // Should ignore passed-in scheduler configuration.
+ confStore.initialize(conf, schedConf, rmContext);
+ assertEquals("val", confStore.retrieve().get("key"));
+ confStore.close();
+ }
+
+ @Test
+ public void testPersistUpdatedConfiguration() throws Exception {
+ confStore.initialize(conf, schedConf, rmContext);
+ assertNull(confStore.retrieve().get("key"));
+
+ Map<String, String> update = new HashMap<>();
+ update.put("key", "val");
+ YarnConfigurationStore.LogMutation mutation =
+ new YarnConfigurationStore.LogMutation(update, TEST_USER);
+ confStore.logMutation(mutation);
+ confStore.confirmMutation(true);
+ assertEquals("val", confStore.retrieve().get("key"));
+ confStore.close();
+
+ // Create a new configuration store, and check for updated configuration
+ confStore = createConfStore();
+ schedConf.set("key", "badVal");
+ // Should ignore passed-in scheduler configuration.
+ confStore.initialize(conf, schedConf, rmContext);
+ assertEquals("val", confStore.retrieve().get("key"));
+ confStore.close();
+ }
+
+ @Test
+ public void testMaxLogs() throws Exception {
+ conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2);
+ confStore.initialize(conf, schedConf, rmContext);
+ LinkedList<YarnConfigurationStore.LogMutation> logs =
+ ((LeveldbConfigurationStore) confStore).getLogs();
+ assertEquals(0, logs.size());
+
+ Map<String, String> update1 = new HashMap<>();
+ update1.put("key1", "val1");
+ YarnConfigurationStore.LogMutation mutation =
+ new YarnConfigurationStore.LogMutation(update1, TEST_USER);
+ confStore.logMutation(mutation);
+ logs = ((LeveldbConfigurationStore) confStore).getLogs();
+ assertEquals(1, logs.size());
+ assertEquals("val1", logs.get(0).getUpdates().get("key1"));
+ confStore.confirmMutation(true);
+ assertEquals(1, logs.size());
+ assertEquals("val1", logs.get(0).getUpdates().get("key1"));
+
+ Map<String, String> update2 = new HashMap<>();
+ update2.put("key2", "val2");
+ mutation = new YarnConfigurationStore.LogMutation(update2, TEST_USER);
+ confStore.logMutation(mutation);
+ logs = ((LeveldbConfigurationStore) confStore).getLogs();
+ assertEquals(2, logs.size());
+ assertEquals("val1", logs.get(0).getUpdates().get("key1"));
+ assertEquals("val2", logs.get(1).getUpdates().get("key2"));
+ confStore.confirmMutation(true);
+ assertEquals(2, logs.size());
+ assertEquals("val1", logs.get(0).getUpdates().get("key1"));
+ assertEquals("val2", logs.get(1).getUpdates().get("key2"));
+
+ // Next update should purge first update from logs.
+ Map<String, String> update3 = new HashMap<>();
+ update3.put("key3", "val3");
+ mutation = new YarnConfigurationStore.LogMutation(update3, TEST_USER);
+ confStore.logMutation(mutation);
+ logs = ((LeveldbConfigurationStore) confStore).getLogs();
+ assertEquals(2, logs.size());
+ assertEquals("val2", logs.get(0).getUpdates().get("key2"));
+ assertEquals("val3", logs.get(1).getUpdates().get("key3"));
+ confStore.confirmMutation(true);
+ assertEquals(2, logs.size());
+ assertEquals("val2", logs.get(0).getUpdates().get("key2"));
+ assertEquals("val3", logs.get(1).getUpdates().get("key3"));
+ confStore.close();
+ }
+
+ /**
+ * When restarting, RM should read from current state of store, including
+ * any updates from the previous RM instance.
+ * @throws Exception
+ */
+ @Test
+ public void testRestartReadsFromUpdatedStore() throws Exception {
+ ResourceManager rm1 = new MockRM(conf);
+ rm1.start();
+ assertNull(((MutableConfScheduler) rm1.getResourceScheduler())
+ .getConfiguration().get("key"));
+
+ // Update configuration on RM
+ SchedConfUpdateInfo schedConfUpdateInfo = new SchedConfUpdateInfo();
+ schedConfUpdateInfo.getGlobalParams().put("key", "val");
+ MutableConfigurationProvider confProvider = ((MutableConfScheduler)
+ rm1.getResourceScheduler()).getMutableConfProvider();
+ UserGroupInformation user = UserGroupInformation
+ .createUserForTesting(TEST_USER, new String[0]);
+ confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+ rm1.getResourceScheduler().reinitialize(conf, rm1.getRMContext());
+ assertEquals("val", ((MutableConfScheduler) rm1.getResourceScheduler())
+ .getConfiguration().get("key"));
+ confProvider.confirmPendingMutation(true);
+ assertEquals("val", ((MutableCSConfigurationProvider) confProvider)
+ .getConfStore().retrieve().get("key"));
+ // Next update is not persisted, it should not be recovered
+ schedConfUpdateInfo.getGlobalParams().put("key", "badVal");
+ confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+ rm1.close();
+
+ // Start RM2 and verifies it starts with updated configuration
+ ResourceManager rm2 = new MockRM(conf);
+ rm2.start();
+ assertEquals("val", ((MutableCSConfigurationProvider) (
+ (CapacityScheduler) rm2.getResourceScheduler())
+ .getMutableConfProvider()).getConfStore().retrieve().get("key"));
+ assertEquals("val", ((MutableConfScheduler) rm2.getResourceScheduler())
+ .getConfiguration().get("key"));
+ rm2.close();
+ }
+
+ @Override
+ public YarnConfigurationStore createConfStore() {
+ return new LeveldbConfigurationStore();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org