You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2017/09/18 16:59:22 UTC
[1/2] hadoop git commit: YARN-6840. Implement zookeeper based store
for scheduler configuration updates. (Jonathan Hung via wangda)
Repository: hadoop
Updated Branches:
refs/heads/YARN-5734 98c4a5299 -> 9726e1fc2
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9726e1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java
new file mode 100644
index 0000000..3cfa8da
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Tests {@link ZKConfigurationStore}.
+ */
+public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
+
+ public static final Log LOG =
+ LogFactory.getLog(TestZKConfigurationStore.class);
+
+ private static final int ZK_TIMEOUT_MS = 10000;
+ private TestingServer curatorTestingServer;
+ private CuratorFramework curatorFramework;
+ private ResourceManager rm;
+
+ public static TestingServer setupCuratorServer() throws Exception {
+ TestingServer curatorTestingServer = new TestingServer();
+ curatorTestingServer.start();
+ return curatorTestingServer;
+ }
+
+ public static CuratorFramework setupCuratorFramework(
+ TestingServer curatorTestingServer) throws Exception {
+ CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
+ .connectString(curatorTestingServer.getConnectString())
+ .retryPolicy(new RetryNTimes(100, 100))
+ .build();
+ curatorFramework.start();
+ return curatorFramework;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ curatorTestingServer = setupCuratorServer();
+ curatorFramework = setupCuratorFramework(curatorTestingServer);
+
+ conf.set(CommonConfigurationKeys.ZK_ADDRESS,
+ curatorTestingServer.getConnectString());
+ rm = new MockRM(conf);
+ rm.start();
+ rmContext = rm.getRMContext();
+ }
+
+ @After
+ public void cleanup() throws IOException {
+ rm.stop();
+ curatorFramework.close();
+ curatorTestingServer.stop();
+ }
+
+ @Test
+ public void testVersioning() throws Exception {
+ confStore.initialize(conf, schedConf, rmContext);
+ assertNull(confStore.getConfStoreVersion());
+ confStore.checkVersion();
+ assertEquals(ZKConfigurationStore.CURRENT_VERSION_INFO,
+ confStore.getConfStoreVersion());
+ }
+
+ @Test
+ public void testPersistConfiguration() throws Exception {
+ schedConf.set("key", "val");
+ confStore.initialize(conf, schedConf, rmContext);
+ assertEquals("val", confStore.retrieve().get("key"));
+
+ // Create a new configuration store, and check for old configuration
+ confStore = createConfStore();
+ schedConf.set("key", "badVal");
+ // Should ignore passed-in scheduler configuration.
+ confStore.initialize(conf, schedConf, rmContext);
+ assertEquals("val", confStore.retrieve().get("key"));
+ }
+
+
+ @Test
+ public void testPersistUpdatedConfiguration() throws Exception {
+ confStore.initialize(conf, schedConf, rmContext);
+ assertNull(confStore.retrieve().get("key"));
+
+ Map<String, String> update = new HashMap<>();
+ update.put("key", "val");
+ YarnConfigurationStore.LogMutation mutation =
+ new YarnConfigurationStore.LogMutation(update, TEST_USER);
+ confStore.logMutation(mutation);
+ confStore.confirmMutation(true);
+ assertEquals("val", confStore.retrieve().get("key"));
+
+ // Create a new configuration store, and check for updated configuration
+ confStore = createConfStore();
+ schedConf.set("key", "badVal");
+ // Should ignore passed-in scheduler configuration.
+ confStore.initialize(conf, schedConf, rmContext);
+ assertEquals("val", confStore.retrieve().get("key"));
+ }
+
+ @Test
+ public void testMaxLogs() throws Exception {
+ conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2);
+ confStore.initialize(conf, schedConf, rmContext);
+ LinkedList<YarnConfigurationStore.LogMutation> logs =
+ ((ZKConfigurationStore) confStore).getLogs();
+ assertEquals(0, logs.size());
+
+ Map<String, String> update1 = new HashMap<>();
+ update1.put("key1", "val1");
+ YarnConfigurationStore.LogMutation mutation =
+ new YarnConfigurationStore.LogMutation(update1, TEST_USER);
+ confStore.logMutation(mutation);
+ logs = ((ZKConfigurationStore) confStore).getLogs();
+ assertEquals(1, logs.size());
+ assertEquals("val1", logs.get(0).getUpdates().get("key1"));
+ confStore.confirmMutation(true);
+ assertEquals(1, logs.size());
+ assertEquals("val1", logs.get(0).getUpdates().get("key1"));
+
+ Map<String, String> update2 = new HashMap<>();
+ update2.put("key2", "val2");
+ mutation = new YarnConfigurationStore.LogMutation(update2, TEST_USER);
+ confStore.logMutation(mutation);
+ logs = ((ZKConfigurationStore) confStore).getLogs();
+ assertEquals(2, logs.size());
+ assertEquals("val1", logs.get(0).getUpdates().get("key1"));
+ assertEquals("val2", logs.get(1).getUpdates().get("key2"));
+ confStore.confirmMutation(true);
+ assertEquals(2, logs.size());
+ assertEquals("val1", logs.get(0).getUpdates().get("key1"));
+ assertEquals("val2", logs.get(1).getUpdates().get("key2"));
+
+ // Next update should purge first update from logs.
+ Map<String, String> update3 = new HashMap<>();
+ update3.put("key3", "val3");
+ mutation = new YarnConfigurationStore.LogMutation(update3, TEST_USER);
+ confStore.logMutation(mutation);
+ logs = ((ZKConfigurationStore) confStore).getLogs();
+ assertEquals(2, logs.size());
+ assertEquals("val2", logs.get(0).getUpdates().get("key2"));
+ assertEquals("val3", logs.get(1).getUpdates().get("key3"));
+ confStore.confirmMutation(true);
+ assertEquals(2, logs.size());
+ assertEquals("val2", logs.get(0).getUpdates().get("key2"));
+ assertEquals("val3", logs.get(1).getUpdates().get("key3"));
+ }
+
+ public Configuration createRMHAConf(String rmIds, String rmId,
+ int adminPort) {
+ Configuration conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
+ conf.set(YarnConfiguration.RM_HA_IDS, rmIds);
+ conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+ conf.set(CapacitySchedulerConfiguration.CS_CONF_PROVIDER,
+ CapacitySchedulerConfiguration.STORE_CS_CONF_PROVIDER);
+ conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
+ YarnConfiguration.ZK_CONFIGURATION_STORE);
+ conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName());
+ conf.set(YarnConfiguration.RM_ZK_ADDRESS,
+ curatorTestingServer.getConnectString());
+ conf.set(YarnConfiguration.RM_HA_ID, rmId);
+ conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0");
+ conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
+ for (String rpcAddress :
+ YarnConfiguration.getServiceAddressConfKeys(conf)) {
+ for (String id : HAUtil.getRMHAIds(conf)) {
+ conf.set(HAUtil.addSuffix(rpcAddress, id), "localhost:0");
+ }
+ }
+ conf.set(HAUtil.addSuffix(YarnConfiguration.RM_ADMIN_ADDRESS, rmId),
+ "localhost:" + adminPort);
+ return conf;
+ }
+
+ /**
+ * When failing over, new active RM should read from current state of store,
+ * including any updates when the new active RM was in standby.
+ * @throws Exception
+ */
+ @Test
+ public void testFailoverReadsFromUpdatedStore() throws Exception {
+ HAServiceProtocol.StateChangeRequestInfo req =
+ new HAServiceProtocol.StateChangeRequestInfo(
+ HAServiceProtocol.RequestSource.REQUEST_BY_USER);
+
+ Configuration conf1 = createRMHAConf("rm1,rm2", "rm1", 1234);
+ ResourceManager rm1 = new MockRM(conf1);
+ rm1.start();
+ rm1.getRMContext().getRMAdminService().transitionToActive(req);
+ assertEquals("RM with ZKStore didn't start",
+ Service.STATE.STARTED, rm1.getServiceState());
+ assertEquals("RM should be Active",
+ HAServiceProtocol.HAServiceState.ACTIVE,
+ rm1.getRMContext().getRMAdminService().getServiceStatus().getState());
+ assertNull(((MutableConfScheduler) rm1.getResourceScheduler())
+ .getConfiguration().get("key"));
+
+ Configuration conf2 = createRMHAConf("rm1,rm2", "rm2", 5678);
+ ResourceManager rm2 = new MockRM(conf2);
+ rm2.start();
+ assertEquals("RM should be Standby",
+ HAServiceProtocol.HAServiceState.STANDBY,
+ rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
+
+ // Update configuration on RM1
+ SchedConfUpdateInfo schedConfUpdateInfo = new SchedConfUpdateInfo();
+ schedConfUpdateInfo.getGlobalParams().put("key", "val");
+ MutableConfigurationProvider confProvider = ((MutableConfScheduler)
+ rm1.getResourceScheduler()).getMutableConfProvider();
+ UserGroupInformation user = UserGroupInformation
+ .createUserForTesting(TEST_USER, new String[0]);
+ confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+ rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext());
+ assertEquals("val", ((MutableConfScheduler) rm1.getResourceScheduler())
+ .getConfiguration().get("key"));
+ confProvider.confirmPendingMutation(true);
+ assertEquals("val", ((MutableCSConfigurationProvider) confProvider)
+ .getConfStore().retrieve().get("key"));
+ // Next update is not persisted, it should not be recovered
+ schedConfUpdateInfo.getGlobalParams().put("key", "badVal");
+ confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+
+ // Start RM2 and verifies it starts with updated configuration
+ rm2.getRMContext().getRMAdminService().transitionToActive(req);
+ assertEquals("RM with ZKStore didn't start",
+ Service.STATE.STARTED, rm2.getServiceState());
+ assertEquals("RM should be Active",
+ HAServiceProtocol.HAServiceState.ACTIVE,
+ rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
+
+ for (int i = 0; i < ZK_TIMEOUT_MS / 50; i++) {
+ if (HAServiceProtocol.HAServiceState.ACTIVE ==
+ rm1.getRMContext().getRMAdminService().getServiceStatus()
+ .getState()) {
+ Thread.sleep(100);
+ }
+ }
+ assertEquals("RM should have been fenced",
+ HAServiceProtocol.HAServiceState.STANDBY,
+ rm1.getRMContext().getRMAdminService().getServiceStatus().getState());
+ assertEquals("RM should be Active",
+ HAServiceProtocol.HAServiceState.ACTIVE,
+ rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
+
+ assertEquals("val", ((MutableCSConfigurationProvider) (
+ (CapacityScheduler) rm2.getResourceScheduler())
+ .getMutableConfProvider()).getConfStore().retrieve().get("key"));
+ assertEquals("val", ((MutableConfScheduler) rm2.getResourceScheduler())
+ .getConfiguration().get("key"));
+ // Transition to standby will set RM's HA status and then reinitialize in
+ // a separate thread. Despite asserting for STANDBY state, it's
+ // possible for reinitialization to be unfinished. Wait here for it to
+ // finish, otherwise closing rm1 will close zkManager and the unfinished
+ // reinitialization will throw an exception.
+ Thread.sleep(10000);
+ rm1.close();
+ rm2.close();
+ }
+
+ @Override
+ public YarnConfigurationStore createConfStore() {
+ return new ZKConfigurationStore();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/2] hadoop git commit: YARN-6840. Implement zookeeper based store
for scheduler configuration updates. (Jonathan Hung via wangda)
Posted by wa...@apache.org.
YARN-6840. Implement zookeeper based store for scheduler configuration updates. (Jonathan Hung via wangda)
Change-Id: I9debea674fe8c7e4109d4ca136965a1ea4c48bcc
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9726e1fc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9726e1fc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9726e1fc
Branch: refs/heads/YARN-5734
Commit: 9726e1fc2c4d5025bf20d087ee45c7ade46693f4
Parents: 98c4a52
Author: Wangda Tan <wa...@apache.org>
Authored: Mon Sep 18 09:53:42 2017 -0700
Committer: Wangda Tan <wa...@apache.org>
Committed: Mon Sep 18 09:53:42 2017 -0700
----------------------------------------------------------------------
.../hadoop/yarn/conf/YarnConfiguration.java | 14 +-
.../src/main/resources/yarn-default.xml | 15 +-
.../server/resourcemanager/AdminService.java | 18 +-
.../server/resourcemanager/ResourceManager.java | 24 +-
.../RMStateVersionIncompatibleException.java | 2 +-
.../recovery/ZKRMStateStore.java | 5 +-
.../scheduler/MutableConfScheduler.java | 22 +-
.../scheduler/MutableConfigurationProvider.java | 36 ++-
.../scheduler/capacity/CapacityScheduler.java | 22 +-
.../conf/InMemoryConfigurationStore.java | 71 +++--
.../conf/LeveldbConfigurationStore.java | 168 +++++-----
.../conf/MutableCSConfigurationProvider.java | 148 +++++----
.../capacity/conf/YarnConfigurationStore.java | 132 ++++----
.../capacity/conf/ZKConfigurationStore.java | 235 ++++++++++++++
.../resourcemanager/webapp/RMWebServices.java | 26 +-
.../conf/ConfigurationStoreBaseTest.java | 90 ++++++
.../conf/TestInMemoryConfigurationStore.java | 30 ++
.../TestMutableCSConfigurationProvider.java | 18 +-
.../conf/TestYarnConfigurationStore.java | 71 -----
.../capacity/conf/TestZKConfigurationStore.java | 312 +++++++++++++++++++
20 files changed, 1037 insertions(+), 422 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9726e1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 6ffe1f7..793a554 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -643,6 +643,7 @@ public class YarnConfiguration extends Configuration {
YARN_PREFIX + "scheduler.configuration.store.class";
public static final String MEMORY_CONFIGURATION_STORE = "memory";
public static final String LEVELDB_CONFIGURATION_STORE = "leveldb";
+ public static final String ZK_CONFIGURATION_STORE = "zk";
public static final String DEFAULT_CONFIGURATION_STORE =
MEMORY_CONFIGURATION_STORE;
public static final String RM_SCHEDCONF_STORE_PATH = YARN_PREFIX
@@ -654,9 +655,16 @@ public class YarnConfiguration extends Configuration {
public static final long
DEFAULT_RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS = 60 * 60 * 24L;
- public static final String RM_SCHEDCONF_LEVELDB_MAX_LOGS =
- YARN_PREFIX + "scheduler.configuration.leveldb-store.max-logs";
- public static final int DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS = 1000;
+ public static final String RM_SCHEDCONF_MAX_LOGS =
+ YARN_PREFIX + "scheduler.configuration.store.max-logs";
+ public static final long DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS = 1000;
+ public static final long DEFAULT_RM_SCHEDCONF_ZK_MAX_LOGS = 1000;
+
+ /** Parent znode path under which ZKConfigurationStore will create znodes. */
+ public static final String RM_SCHEDCONF_STORE_ZK_PARENT_PATH = YARN_PREFIX
+ + "scheduler.configuration.zk-store.parent-path";
+ public static final String DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH =
+ "/confstore";
public static final String RM_SCHEDULER_MUTATION_ACL_POLICY_CLASS =
YARN_PREFIX + "scheduler.configuration.mutation.acl-policy.class";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9726e1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 2d6574f..dd9e913 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3271,11 +3271,20 @@
<property>
<description>
- The max number of configuration change log entries kept in LevelDB config
+ The max number of configuration change log entries kept in config
store, when yarn.scheduler.configuration.store.class is configured to be
- "leveldb". Default is 1000.
+ "leveldb" or "zk". Default is 1000 for either.
</description>
- <name>yarn.scheduler.configuration.leveldb-store.max-logs</name>
+ <name>yarn.scheduler.configuration.store.max-logs</name>
<value>1000</value>
</property>
+
+ <property>
+ <description>
+ ZK root node path for configuration store when using zookeeper-based
+ configuration store.
+ </description>
+ <name>yarn.scheduler.configuration.zk-store.parent-path</name>
+ <value>/confstore</value>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9726e1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
index fd9e849..6c0a854 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
@@ -387,9 +387,7 @@ public class AdminService extends CompositeService implements
RefreshQueuesResponse response =
recordFactory.newRecordInstance(RefreshQueuesResponse.class);
try {
- ResourceScheduler scheduler = rm.getRMContext().getScheduler();
- if (scheduler instanceof MutableConfScheduler
- && ((MutableConfScheduler) scheduler).isConfigurationMutable()) {
+ if (isSchedulerMutable()) {
throw new IOException("Scheduler configuration is mutable. " +
operation + " is not allowed in this scenario.");
}
@@ -413,6 +411,12 @@ public class AdminService extends CompositeService implements
}
}
+ private boolean isSchedulerMutable() {
+ ResourceScheduler scheduler = rm.getRMContext().getScheduler();
+ return (scheduler instanceof MutableConfScheduler
+ && ((MutableConfScheduler) scheduler).isConfigurationMutable());
+ }
+
@Override
public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
throws YarnException, StandbyException {
@@ -721,6 +725,14 @@ public class AdminService extends CompositeService implements
void refreshAll() throws ServiceFailedException {
try {
checkAcls("refreshAll");
+ if (isSchedulerMutable()) {
+ try {
+ ((MutableConfScheduler) rm.getRMContext().getScheduler())
+ .getMutableConfProvider().reloadConfigurationFromStore();
+ } catch (Exception e) {
+ throw new IOException("Failed to refresh configuration:", e);
+ }
+ }
refreshQueues();
refreshNodes();
refreshSuperUserGroupsConfiguration();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9726e1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 5333f25..0be0539 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -342,7 +342,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
if (curatorEnabled) {
- this.zkManager = createAndStartZKManager(conf);
+ this.zkManager = getAndStartZKManager(conf);
elector = new CuratorBasedElectorService(this);
} else {
elector = new ActiveStandbyElectorBasedElectorService(this);
@@ -351,13 +351,16 @@ public class ResourceManager extends CompositeService implements Recoverable {
}
/**
- * Create and ZooKeeper Curator manager.
+ * Get ZooKeeper Curator manager, creating and starting if not exists.
* @param config Configuration for the ZooKeeper curator.
- * @return New ZooKeeper Curator manager.
+ * @return ZooKeeper Curator manager.
* @throws IOException If it cannot create the manager.
*/
- public ZKCuratorManager createAndStartZKManager(Configuration config)
- throws IOException {
+ public synchronized ZKCuratorManager getAndStartZKManager(Configuration
+ config) throws IOException {
+ if (this.zkManager != null) {
+ return zkManager;
+ }
ZKCuratorManager manager = new ZKCuratorManager(config);
// Get authentication
@@ -377,15 +380,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
}
manager.start(authInfos);
- return manager;
- }
-
- /**
- * Get the ZooKeeper Curator manager.
- * @return ZooKeeper Curator manager.
- */
- public ZKCuratorManager getZKManager() {
- return this.zkManager;
+ this.zkManager = manager;
+ return zkManager;
}
public CuratorFramework getCurator() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9726e1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateVersionIncompatibleException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateVersionIncompatibleException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateVersionIncompatibleException.java
index 135868f..d5fce36 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateVersionIncompatibleException.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateVersionIncompatibleException.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
/**
* This exception is thrown by ResourceManager if it's loading an incompatible
- * version of state from state store on recovery.
+ * version of storage on recovery.
*/
public class RMStateVersionIncompatibleException extends YarnException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9726e1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index ac67dcd..5bff77f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -327,10 +327,7 @@ public class ZKRMStateStore extends RMStateStore {
amrmTokenSecretManagerRoot =
getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT);
reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT);
- zkManager = resourceManager.getZKManager();
- if (zkManager == null) {
- zkManager = resourceManager.createAndStartZKManager(conf);
- }
+ zkManager = resourceManager.getAndStartZKManager(conf);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9726e1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java
index 313bf6a..6f677fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java
@@ -18,11 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
-
-import java.io.IOException;
/**
* Interface for a scheduler that supports changing configuration at runtime.
@@ -31,16 +26,6 @@ import java.io.IOException;
public interface MutableConfScheduler extends ResourceScheduler {
/**
- * Update the scheduler's configuration.
- * @param user Caller of this update
- * @param confUpdate configuration update
- * @throws IOException if scheduler could not be reinitialized
- * @throws YarnException if reservation system could not be reinitialized
- */
- void updateConfiguration(UserGroupInformation user,
- SchedConfUpdateInfo confUpdate) throws IOException, YarnException;
-
- /**
* Get the scheduler configuration.
* @return the scheduler configuration
*/
@@ -58,4 +43,11 @@ public interface MutableConfScheduler extends ResourceScheduler {
* @return whether scheduler configuration is mutable or not.
*/
boolean isConfigurationMutable();
+
+ /**
+ * Get scheduler's configuration provider, so other classes can directly
+ * call mutation APIs on configuration provider.
+ * @return scheduler's configuration provider
+ */
+ MutableConfigurationProvider getMutableConfProvider();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9726e1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
index 9baf1ad..f8e8814 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
@@ -19,30 +19,40 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
-import java.io.IOException;
-
/**
* Interface for allowing changing scheduler configurations.
*/
public interface MutableConfigurationProvider {
/**
- * Apply transactions which were not committed.
- * @throws IOException if recovery fails
+ * Get the acl mutation policy for this configuration provider.
+ * @return The acl mutation policy.
+ */
+ ConfigurationMutationACLPolicy getAclMutationPolicy();
+
+ /**
+ * Called when a new ResourceManager is starting/becomes active. Ensures
+ * configuration is up-to-date.
+ * @throws Exception if configuration could not be refreshed from store
*/
- void recoverConf() throws IOException;
+ void reloadConfigurationFromStore() throws Exception;
/**
- * Update the scheduler configuration with the provided key value pairs.
- * @param user User issuing the request
- * @param confUpdate Key-value pairs for configurations to be updated.
- * @throws IOException if scheduler could not be reinitialized
- * @throws YarnException if reservation system could not be reinitialized
+ * Log user's requested configuration mutation, and applies it in-memory.
+ * @param user User who requested the change
+ * @param confUpdate User's requested configuration change
+ * @throws Exception if logging the mutation fails
*/
- void mutateConfiguration(UserGroupInformation user, SchedConfUpdateInfo
- confUpdate) throws IOException, YarnException;
+ void logAndApplyMutation(UserGroupInformation user, SchedConfUpdateInfo
+ confUpdate) throws Exception;
+ /**
+ * Confirm last logged mutation.
+ * @param isValid if the last logged mutation is applied to scheduler
+ * properly.
+ * @throws Exception if confirming mutation fails
+ */
+ void confirmPendingMutation(boolean isValid) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9726e1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index df77135..c6f9c97 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -141,7 +141,6 @@ import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -393,9 +392,6 @@ public class CapacityScheduler extends
@Override
public void serviceStart() throws Exception {
startSchedulerThreads();
- if (this.csConfProvider instanceof MutableConfigurationProvider) {
- ((MutableConfigurationProvider) csConfProvider).recoverConf();
- }
super.serviceStart();
}
@@ -2576,19 +2572,15 @@ public class CapacityScheduler extends
}
@Override
- public void updateConfiguration(UserGroupInformation user,
- SchedConfUpdateInfo confUpdate) throws IOException, YarnException {
- if (isConfigurationMutable()) {
- ((MutableConfigurationProvider) csConfProvider).mutateConfiguration(
- user, confUpdate);
- } else {
- throw new UnsupportedOperationException("Configured CS configuration " +
- "provider does not support updating configuration.");
- }
+ public boolean isConfigurationMutable() {
+ return csConfProvider instanceof MutableConfigurationProvider;
}
@Override
- public boolean isConfigurationMutable() {
- return csConfProvider instanceof MutableConfigurationProvider;
+ public MutableConfigurationProvider getMutableConfProvider() {
+ if (isConfigurationMutable()) {
+ return (MutableConfigurationProvider) csConfProvider;
+ }
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9726e1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
index c63734d..d69c236 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
@@ -19,8 +19,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -28,48 +29,35 @@ import java.util.Map;
* A default implementation of {@link YarnConfigurationStore}. Doesn't offer
* persistent configuration storage, just stores the configuration in memory.
*/
-public class InMemoryConfigurationStore implements YarnConfigurationStore {
+public class InMemoryConfigurationStore extends YarnConfigurationStore {
private Configuration schedConf;
- private LinkedList<LogMutation> pendingMutations;
- private long pendingId;
+ private LogMutation pendingMutation;
@Override
- public void initialize(Configuration conf, Configuration schedConf) {
+ public void initialize(Configuration conf, Configuration schedConf,
+ RMContext rmContext) {
this.schedConf = schedConf;
- this.pendingMutations = new LinkedList<>();
- this.pendingId = 0;
}
@Override
- public synchronized long logMutation(LogMutation logMutation) {
- logMutation.setId(++pendingId);
- pendingMutations.add(logMutation);
- return pendingId;
+ public void logMutation(LogMutation logMutation) {
+ pendingMutation = logMutation;
}
@Override
- public synchronized boolean confirmMutation(long id, boolean isValid) {
- LogMutation mutation = pendingMutations.poll();
- // If confirmMutation is called out of order, discard mutations until id
- // is reached.
- while (mutation != null) {
- if (mutation.getId() == id) {
- if (isValid) {
- Map<String, String> mutations = mutation.getUpdates();
- for (Map.Entry<String, String> kv : mutations.entrySet()) {
- if (kv.getValue() == null) {
- schedConf.unset(kv.getKey());
- } else {
- schedConf.set(kv.getKey(), kv.getValue());
- }
- }
+ public void confirmMutation(boolean isValid) {
+ if (isValid) {
+ for (Map.Entry<String, String> kv : pendingMutation.getUpdates()
+ .entrySet()) {
+ if (kv.getValue() == null) {
+ schedConf.unset(kv.getKey());
+ } else {
+ schedConf.set(kv.getKey(), kv.getValue());
}
- return true;
}
- mutation = pendingMutations.poll();
}
- return false;
+ pendingMutation = null;
}
@Override
@@ -78,13 +66,30 @@ public class InMemoryConfigurationStore implements YarnConfigurationStore {
}
@Override
- public synchronized List<LogMutation> getPendingMutations() {
- return new LinkedList<>(pendingMutations);
+ public List<LogMutation> getConfirmedConfHistory(long fromId) {
+ // Unimplemented.
+ return null;
}
@Override
- public List<LogMutation> getConfirmedConfHistory(long fromId) {
- // Unimplemented.
+ public Version getConfStoreVersion() throws Exception {
+ // Does nothing.
return null;
}
+
+ @Override
+ public void storeVersion() throws Exception {
+ // Does nothing.
+ }
+
+ @Override
+ public Version getCurrentVersion() {
+ // Does nothing.
+ return null;
+ }
+
+ @Override
+ public void checkVersion() {
+ // Does nothing. (Version is always compatible since it's in memory)
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9726e1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
index 1280fab..1b0eb9f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
@@ -26,6 +26,10 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.fusesource.leveldbjni.JniDBFactory;
import org.fusesource.leveldbjni.internal.NativeDB;
import org.iq80.leveldb.DB;
@@ -55,58 +59,32 @@ import static org.fusesource.leveldbjni.JniDBFactory.bytes;
/**
* A LevelDB implementation of {@link YarnConfigurationStore}.
*/
-public class LeveldbConfigurationStore implements YarnConfigurationStore {
+public class LeveldbConfigurationStore extends YarnConfigurationStore {
public static final Log LOG =
LogFactory.getLog(LeveldbConfigurationStore.class);
private static final String DB_NAME = "yarn-conf-store";
- private static final String LOG_PREFIX = "log.";
- private static final String LOG_COMMITTED_TXN = "committedTxn";
+ private static final String LOG_KEY = "log";
+ private static final String VERSION_KEY = "version";
private DB db;
- // Txnid for the last transaction logged to the store.
- private long txnId = 0;
- private long minTxn = 0;
private long maxLogs;
private Configuration conf;
- private LinkedList<LogMutation> pendingMutations = new LinkedList<>();
+ private LogMutation pendingMutation;
+ private static final Version CURRENT_VERSION_INFO = Version
+ .newInstance(0, 1);
private Timer compactionTimer;
private long compactionIntervalMsec;
@Override
- public void initialize(Configuration config, Configuration schedConf)
- throws IOException {
+ public void initialize(Configuration config, Configuration schedConf,
+ RMContext rmContext) throws IOException {
this.conf = config;
try {
this.db = initDatabase(schedConf);
- this.txnId = Long.parseLong(new String(db.get(bytes(LOG_COMMITTED_TXN)),
- StandardCharsets.UTF_8));
- DBIterator itr = db.iterator();
- itr.seek(bytes(LOG_PREFIX + txnId));
- // Seek to first uncommitted log
- itr.next();
- while (itr.hasNext()) {
- Map.Entry<byte[], byte[]> entry = itr.next();
- if (!new String(entry.getKey(), StandardCharsets.UTF_8)
- .startsWith(LOG_PREFIX)) {
- break;
- }
- pendingMutations.add(deserLogMutation(entry.getValue()));
- txnId++;
- }
- // Get the earliest txnId stored in logs
- itr.seekToFirst();
- if (itr.hasNext()) {
- Map.Entry<byte[], byte[]> entry = itr.next();
- byte[] key = entry.getKey();
- String logId = new String(key, StandardCharsets.UTF_8);
- if (logId.startsWith(LOG_PREFIX)) {
- minTxn = Long.parseLong(logId.substring(logId.indexOf('.') + 1));
- }
- }
this.maxLogs = config.getLong(
- YarnConfiguration.RM_SCHEDCONF_LEVELDB_MAX_LOGS,
+ YarnConfiguration.RM_SCHEDCONF_MAX_LOGS,
YarnConfiguration.DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS);
this.compactionIntervalMsec = config.getLong(
YarnConfiguration.RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS,
@@ -127,33 +105,23 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore {
public int compare(byte[] key1, byte[] key2) {
String key1Str = new String(key1, StandardCharsets.UTF_8);
String key2Str = new String(key2, StandardCharsets.UTF_8);
- int key1Txn = Integer.MAX_VALUE;
- int key2Txn = Integer.MAX_VALUE;
- if (key1Str.startsWith(LOG_PREFIX)) {
- key1Txn = Integer.parseInt(key1Str.substring(
- key1Str.indexOf('.') + 1));
- }
- if (key2Str.startsWith(LOG_PREFIX)) {
- key2Txn = Integer.parseInt(key2Str.substring(
- key2Str.indexOf('.') + 1));
- }
- // TODO txnId could overflow, in theory
- if (key1Txn == Integer.MAX_VALUE && key2Txn == Integer.MAX_VALUE) {
- if (key1Str.equals(key2Str) && key1Str.equals(LOG_COMMITTED_TXN)) {
- return 0;
- } else if (key1Str.equals(LOG_COMMITTED_TXN)) {
- return -1;
- } else if (key2Str.equals(LOG_COMMITTED_TXN)) {
- return 1;
- }
- return key1Str.compareTo(key2Str);
+ if (key1Str.equals(key2Str)) {
+ return 0;
+ } else if (key1Str.equals(VERSION_KEY)) {
+ return -1;
+ } else if (key2Str.equals(VERSION_KEY)) {
+ return 1;
+ } else if (key1Str.equals(LOG_KEY)) {
+ return -1;
+ } else if (key2Str.equals(LOG_KEY)) {
+ return 1;
}
- return key1Txn - key2Txn;
+ return key1Str.compareTo(key2Str);
}
@Override
public String name() {
- return "logComparator";
+ return "keyComparator";
}
public byte[] findShortestSeparator(byte[] start, byte[] limit) {
@@ -164,6 +132,7 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore {
return key;
}
});
+
LOG.info("Using conf database at " + storeRoot);
File dbfile = new File(storeRoot.toString());
try {
@@ -179,7 +148,6 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore {
for (Map.Entry<String, String> kv : config) {
initBatch.put(bytes(kv.getKey()), bytes(kv.getValue()));
}
- initBatch.put(bytes(LOG_COMMITTED_TXN), bytes("0"));
db.write(initBatch);
} catch (DBException dbErr) {
throw new IOException(dbErr.getMessage(), dbErr);
@@ -208,28 +176,22 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore {
}
@Override
- public synchronized long logMutation(LogMutation logMutation)
- throws IOException {
- logMutation.setId(++txnId);
- WriteBatch logBatch = db.createWriteBatch();
- logBatch.put(bytes(LOG_PREFIX + txnId), serLogMutation(logMutation));
- if (txnId - minTxn >= maxLogs) {
- logBatch.delete(bytes(LOG_PREFIX + minTxn));
- minTxn++;
+ public void logMutation(LogMutation logMutation) throws IOException {
+ LinkedList<LogMutation> logs = deserLogMutations(db.get(bytes(LOG_KEY)));
+ logs.add(logMutation);
+ if (logs.size() > maxLogs) {
+ logs.removeFirst();
}
- db.write(logBatch);
- pendingMutations.add(logMutation);
- return txnId;
+ db.put(bytes(LOG_KEY), serLogMutations(logs));
+ pendingMutation = logMutation;
}
@Override
- public synchronized boolean confirmMutation(long id, boolean isValid)
- throws IOException {
+ public void confirmMutation(boolean isValid) throws IOException {
WriteBatch updateBatch = db.createWriteBatch();
if (isValid) {
- LogMutation mutation = deserLogMutation(db.get(bytes(LOG_PREFIX + id)));
for (Map.Entry<String, String> changes :
- mutation.getUpdates().entrySet()) {
+ pendingMutation.getUpdates().entrySet()) {
if (changes.getValue() == null || changes.getValue().isEmpty()) {
updateBatch.delete(bytes(changes.getKey()));
} else {
@@ -237,28 +199,24 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore {
}
}
}
- updateBatch.put(bytes(LOG_COMMITTED_TXN), bytes(String.valueOf(id)));
db.write(updateBatch);
- // Assumes logMutation and confirmMutation are done in the same
- // synchronized method. For example,
- // {@link MutableCSConfigurationProvider#mutateConfiguration(
- // UserGroupInformation user, SchedConfUpdateInfo confUpdate)}
- pendingMutations.removeFirst();
- return true;
+ pendingMutation = null;
}
- private byte[] serLogMutation(LogMutation mutation) throws IOException {
+ private byte[] serLogMutations(LinkedList<LogMutation> mutations) throws
+ IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (ObjectOutput oos = new ObjectOutputStream(baos)) {
- oos.writeObject(mutation);
+ oos.writeObject(mutations);
oos.flush();
return baos.toByteArray();
}
}
- private LogMutation deserLogMutation(byte[] mutation) throws IOException {
+ private LinkedList<LogMutation> deserLogMutations(byte[] mutations) throws
+ IOException {
try (ObjectInput input = new ObjectInputStream(
- new ByteArrayInputStream(mutation))) {
- return (LogMutation) input.readObject();
+ new ByteArrayInputStream(mutations))) {
+ return (LinkedList<LogMutation>) input.readObject();
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
@@ -267,7 +225,7 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore {
@Override
public synchronized Configuration retrieve() {
DBIterator itr = db.iterator();
- itr.seek(bytes(LOG_COMMITTED_TXN));
+ itr.seek(bytes(LOG_KEY));
Configuration config = new Configuration(false);
itr.next();
while (itr.hasNext()) {
@@ -279,11 +237,6 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore {
}
@Override
- public List<LogMutation> getPendingMutations() {
- return new LinkedList<>(pendingMutations);
- }
-
- @Override
public List<LogMutation> getConfirmedConfHistory(long fromId) {
return null; // unimplemented
}
@@ -299,6 +252,39 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore {
}
}
+ // TODO: following is taken from LeveldbRMStateStore
+ @Override
+ public Version getConfStoreVersion() throws Exception {
+ Version version = null;
+ try {
+ byte[] data = db.get(bytes(VERSION_KEY));
+ if (data != null) {
+ version = new VersionPBImpl(YarnServerCommonProtos.VersionProto
+ .parseFrom(data));
+ }
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ return version;
+ }
+
+ @Override
+ public void storeVersion() throws Exception {
+ String key = VERSION_KEY;
+ byte[] data = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto()
+ .toByteArray();
+ try {
+ db.put(bytes(key), data);
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public Version getCurrentVersion() {
+ return CURRENT_VERSION_INFO;
+ }
+
private class CompactionTimerTask extends TimerTask {
@Override
public void run() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9726e1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
index d03b2e2..70d1840 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
@@ -18,20 +18,17 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationACLPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationACLPolicyFactory;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
@@ -56,6 +53,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
LogFactory.getLog(MutableCSConfigurationProvider.class);
private Configuration schedConf;
+ private Configuration oldConf;
private YarnConfigurationStore confStore;
private ConfigurationMutationACLPolicy aclMutationPolicy;
private RMContext rmContext;
@@ -76,6 +74,9 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
case YarnConfiguration.LEVELDB_CONFIGURATION_STORE:
this.confStore = new LeveldbConfigurationStore();
break;
+ case YarnConfiguration.ZK_CONFIGURATION_STORE:
+ this.confStore = new ZKConfigurationStore();
+ break;
default:
this.confStore = YarnConfigurationStoreFactory.getStore(config);
break;
@@ -89,7 +90,11 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
for (Map.Entry<String, String> kv : initialSchedConf) {
schedConf.set(kv.getKey(), kv.getValue());
}
- confStore.initialize(config, schedConf);
+ try {
+ confStore.initialize(config, schedConf, rmContext);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
// After initializing confStore, the store may already have an existing
// configuration. Use this one.
schedConf = confStore.retrieve();
@@ -98,6 +103,11 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
aclMutationPolicy.init(config, rmContext);
}
+ @VisibleForTesting
+ public YarnConfigurationStore getConfStore() {
+ return confStore;
+ }
+
@Override
public CapacitySchedulerConfiguration loadConfiguration(Configuration
configuration) throws IOException {
@@ -107,16 +117,17 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
}
@Override
- public synchronized void mutateConfiguration(UserGroupInformation user,
- SchedConfUpdateInfo confUpdate) throws IOException, YarnException {
- if (!aclMutationPolicy.isMutationAllowed(user, confUpdate)) {
- throw new AccessControlException("User is not admin of all modified" +
- " queues.");
- }
- Configuration oldConf = new Configuration(schedConf);
+ public ConfigurationMutationACLPolicy getAclMutationPolicy() {
+ return aclMutationPolicy;
+ }
+
+ @Override
+ public void logAndApplyMutation(UserGroupInformation user,
+ SchedConfUpdateInfo confUpdate) throws Exception {
+ oldConf = new Configuration(schedConf);
Map<String, String> kvUpdate = constructKeyValueConfUpdate(confUpdate);
LogMutation log = new LogMutation(kvUpdate, user.getShortUserName());
- long id = confStore.logMutation(log);
+ confStore.logMutation(log);
for (Map.Entry<String, String> kv : kvUpdate.entrySet()) {
if (kv.getValue() == null) {
schedConf.unset(kv.getKey());
@@ -124,47 +135,33 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
schedConf.set(kv.getKey(), kv.getValue());
}
}
- try {
- rmContext.getRMAdminService().refreshQueues();
- } catch (IOException | YarnException e) {
+ }
+
+ @Override
+ public void confirmPendingMutation(boolean isValid) throws Exception {
+ confStore.confirmMutation(isValid);
+ if (!isValid) {
schedConf = oldConf;
- confStore.confirmMutation(id, false);
- throw e;
}
- confStore.confirmMutation(id, true);
}
@Override
- public void recoverConf() throws IOException {
- List<LogMutation> uncommittedLogs = confStore.getPendingMutations();
- Configuration oldConf = new Configuration(schedConf);
- for (LogMutation mutation : uncommittedLogs) {
- for (Map.Entry<String, String> kv : mutation.getUpdates().entrySet()) {
- if (kv.getValue() == null) {
- schedConf.unset(kv.getKey());
- } else {
- schedConf.set(kv.getKey(), kv.getValue());
- }
- }
- try {
- rmContext.getScheduler().reinitialize(schedConf, rmContext);
- } catch (IOException e) {
- schedConf = oldConf;
- confStore.confirmMutation(mutation.getId(), false);
- LOG.info("Configuration mutation " + mutation.getId()
- + " was rejected", e);
- continue;
- }
- confStore.confirmMutation(mutation.getId(), true);
- LOG.info("Configuration mutation " + mutation.getId()+ " was accepted");
- }
+ public void reloadConfigurationFromStore() throws Exception {
+ schedConf = confStore.retrieve();
+ }
+
+ private List<String> getSiblingQueues(String queuePath, Configuration conf) {
+ String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.'));
+ String childQueuesKey = CapacitySchedulerConfiguration.PREFIX +
+ parentQueue + CapacitySchedulerConfiguration.DOT +
+ CapacitySchedulerConfiguration.QUEUES;
+ return new ArrayList<>(conf.getStringCollection(childQueuesKey));
}
private Map<String, String> constructKeyValueConfUpdate(
SchedConfUpdateInfo mutationInfo) throws IOException {
- CapacityScheduler cs = (CapacityScheduler) rmContext.getScheduler();
CapacitySchedulerConfiguration proposedConf =
- new CapacitySchedulerConfiguration(cs.getConfiguration(), false);
+ new CapacitySchedulerConfiguration(schedConf, false);
Map<String, String> confUpdate = new HashMap<>();
for (String queueToRemove : mutationInfo.getRemoveQueueInfo()) {
removeQueue(queueToRemove, proposedConf, confUpdate);
@@ -188,40 +185,35 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
if (queueToRemove == null) {
return;
} else {
- CapacityScheduler cs = (CapacityScheduler) rmContext.getScheduler();
String queueName = queueToRemove.substring(
queueToRemove.lastIndexOf('.') + 1);
- CSQueue queue = cs.getQueue(queueName);
- if (queue == null ||
- !queue.getQueuePath().equals(queueToRemove)) {
- throw new IOException("Queue " + queueToRemove + " not found");
- } else if (queueToRemove.lastIndexOf('.') == -1) {
+ if (queueToRemove.lastIndexOf('.') == -1) {
throw new IOException("Can't remove queue " + queueToRemove);
- }
- String parentQueuePath = queueToRemove.substring(0, queueToRemove
- .lastIndexOf('.'));
- String[] siblingQueues = proposedConf.getQueues(parentQueuePath);
- List<String> newSiblingQueues = new ArrayList<>();
- for (String siblingQueue : siblingQueues) {
- if (!siblingQueue.equals(queueName)) {
- newSiblingQueues.add(siblingQueue);
- }
- }
- proposedConf.setQueues(parentQueuePath, newSiblingQueues
- .toArray(new String[0]));
- String queuesConfig = CapacitySchedulerConfiguration.PREFIX
- + parentQueuePath + CapacitySchedulerConfiguration.DOT
- + CapacitySchedulerConfiguration.QUEUES;
- if (newSiblingQueues.size() == 0) {
- confUpdate.put(queuesConfig, null);
} else {
- confUpdate.put(queuesConfig, Joiner.on(',').join(newSiblingQueues));
- }
- for (Map.Entry<String, String> confRemove : proposedConf.getValByRegex(
- ".*" + queueToRemove.replaceAll("\\.", "\\.") + "\\..*")
- .entrySet()) {
- proposedConf.unset(confRemove.getKey());
- confUpdate.put(confRemove.getKey(), null);
+ List<String> siblingQueues = getSiblingQueues(queueToRemove,
+ proposedConf);
+ if (!siblingQueues.contains(queueName)) {
+ throw new IOException("Queue " + queueToRemove + " not found");
+ }
+ siblingQueues.remove(queueName);
+ String parentQueuePath = queueToRemove.substring(0, queueToRemove
+ .lastIndexOf('.'));
+ proposedConf.setQueues(parentQueuePath, siblingQueues.toArray(
+ new String[0]));
+ String queuesConfig = CapacitySchedulerConfiguration.PREFIX
+ + parentQueuePath + CapacitySchedulerConfiguration.DOT
+ + CapacitySchedulerConfiguration.QUEUES;
+ if (siblingQueues.size() == 0) {
+ confUpdate.put(queuesConfig, null);
+ } else {
+ confUpdate.put(queuesConfig, Joiner.on(',').join(siblingQueues));
+ }
+ for (Map.Entry<String, String> confRemove : proposedConf.getValByRegex(
+ ".*" + queueToRemove.replaceAll("\\.", "\\.") + "\\..*")
+ .entrySet()) {
+ proposedConf.unset(confRemove.getKey());
+ confUpdate.put(confRemove.getKey(), null);
+ }
}
}
}
@@ -232,13 +224,13 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
if (addInfo == null) {
return;
} else {
- CapacityScheduler cs = (CapacityScheduler) rmContext.getScheduler();
String queuePath = addInfo.getQueue();
String queueName = queuePath.substring(queuePath.lastIndexOf('.') + 1);
- if (cs.getQueue(queueName) != null) {
- throw new IOException("Can't add existing queue " + queuePath);
- } else if (queuePath.lastIndexOf('.') == -1) {
+ if (queuePath.lastIndexOf('.') == -1) {
throw new IOException("Can't add invalid queue " + queuePath);
+ } else if (getSiblingQueues(queuePath, proposedConf).contains(
+ queueName)) {
+ throw new IOException("Can't add existing queue " + queuePath);
}
String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.'));
String[] siblings = proposedConf.getQueues(parentQueue);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9726e1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
index 065c877..1356535 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
@@ -18,7 +18,12 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateVersionIncompatibleException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import java.io.IOException;
@@ -39,36 +44,26 @@ import java.util.Map;
* {@code getPendingMutations}, and replay/confirm them via
* {@code confirmMutation} as in the normal case.
*/
-public interface YarnConfigurationStore {
+public abstract class YarnConfigurationStore {
+ public static final Log LOG =
+ LogFactory.getLog(YarnConfigurationStore.class);
/**
* LogMutation encapsulates the fields needed for configuration mutation
* audit logging and recovery.
*/
- class LogMutation implements Serializable {
+ static class LogMutation implements Serializable {
private Map<String, String> updates;
private String user;
- private long id;
/**
- * Create log mutation prior to logging.
+ * Create log mutation.
* @param updates key-value configuration updates
* @param user user who requested configuration change
*/
- public LogMutation(Map<String, String> updates, String user) {
- this(updates, user, 0);
- }
-
- /**
- * Create log mutation for recovery.
- * @param updates key-value configuration updates
- * @param user user who requested configuration change
- * @param id transaction id of configuration change
- */
- LogMutation(Map<String, String> updates, String user, long id) {
+ LogMutation(Map<String, String> updates, String user) {
this.updates = updates;
this.user = user;
- this.id = id;
}
/**
@@ -86,75 +81,92 @@ public interface YarnConfigurationStore {
public String getUser() {
return user;
}
-
- /**
- * Get transaction id of this configuration change.
- * @return transaction id
- */
- public long getId() {
- return id;
- }
-
- /**
- * Set transaction id of this configuration change.
- * @param id transaction id
- */
- public void setId(long id) {
- this.id = id;
- }
}
/**
- * Initialize the configuration store.
+ * Initialize the configuration store, with schedConf as the initial
+ * scheduler configuration. If a persisted store already exists, use the
+ * scheduler configuration stored there, and ignore schedConf.
* @param conf configuration to initialize store with
- * @param schedConf Initial key-value configuration to persist
+ * @param schedConf Initial key-value scheduler configuration to persist.
+ * @param rmContext RMContext for this configuration store
* @throws IOException if initialization fails
*/
- void initialize(Configuration conf, Configuration schedConf)
- throws IOException;
+ public abstract void initialize(Configuration conf, Configuration schedConf,
+ RMContext rmContext) throws Exception;
/**
- * Logs the configuration change to backing store. Generates an id associated
- * with this mutation, sets it in {@code logMutation}, and returns it.
+ * Logs the configuration change to backing store.
* @param logMutation configuration change to be persisted in write ahead log
- * @return id which configuration store associates with this mutation
* @throws IOException if logging fails
*/
- long logMutation(LogMutation logMutation) throws IOException;
+ public abstract void logMutation(LogMutation logMutation) throws Exception;
/**
* Should be called after {@code logMutation}. Gets the pending mutation
- * associated with {@code id} and marks the mutation as persisted (no longer
- * pending). If isValid is true, merge the mutation with the persisted
+ * last logged by {@code logMutation} and marks the mutation as persisted (no
+ * longer pending). If isValid is true, merge the mutation with the persisted
* configuration.
- *
- * If {@code confirmMutation} is called with ids in a different order than
- * was returned by {@code logMutation}, the result is implementation
- * dependent.
- * @param id id of mutation to be confirmed
- * @param isValid if true, update persisted configuration with mutation
- * associated with {@code id}.
- * @return true on success
- * @throws IOException if mutation confirmation fails
+ * @param isValid if true, update persisted configuration with pending
+ * mutation.
+ * @throws Exception if mutation confirmation fails
*/
- boolean confirmMutation(long id, boolean isValid) throws IOException;
+ public abstract void confirmMutation(boolean isValid) throws Exception;
/**
* Retrieve the persisted configuration.
* @return configuration as key-value
*/
- Configuration retrieve();
-
- /**
- * Get the list of pending mutations, in the order they were logged.
- * @return list of mutations
- */
- List<LogMutation> getPendingMutations();
+ public abstract Configuration retrieve();
/**
* Get a list of confirmed configuration mutations starting from a given id.
* @param fromId id from which to start getting mutations, inclusive
* @return list of configuration mutations
*/
- List<LogMutation> getConfirmedConfHistory(long fromId);
+ public abstract List<LogMutation> getConfirmedConfHistory(long fromId);
+
+ /**
+ * Get schema version of persisted conf store, for detecting compatibility
+ * issues when changing conf store schema.
+ * @return Schema version currently used by the persisted configuration store.
+ * @throws Exception On version fetch failure
+ */
+ protected abstract Version getConfStoreVersion() throws Exception;
+
+ /**
+ * Persist the hard-coded schema version to the conf store.
+ * @throws Exception On storage failure
+ */
+ protected abstract void storeVersion() throws Exception;
+
+ /**
+ * Get the hard-coded schema version, for comparison against the schema
+ * version currently persisted.
+ * @return Current hard-coded schema version
+ */
+ protected abstract Version getCurrentVersion();
+
+ public void checkVersion() throws Exception {
+ // TODO this was taken from RMStateStore. Should probably refactor
+ Version loadedVersion = getConfStoreVersion();
+ LOG.info("Loaded configuration store version info " + loadedVersion);
+ if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) {
+ return;
+ }
+ // if there is no version info, treat it as CURRENT_VERSION_INFO;
+ if (loadedVersion == null) {
+ loadedVersion = getCurrentVersion();
+ }
+ if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
+ LOG.info("Storing configuration store version info "
+ + getCurrentVersion());
+ storeVersion();
+ } else {
+ throw new RMStateVersionIncompatibleException(
+ "Expecting configuration store version " + getCurrentVersion()
+ + ", but loading version " + loadedVersion);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9726e1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java
new file mode 100644
index 0000000..a0bba8c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.curator.ZKCuratorManager;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.ACL;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A Zookeeper-based implementation of {@link YarnConfigurationStore}.
+ */
+public class ZKConfigurationStore extends YarnConfigurationStore {
+
+ public static final Log LOG =
+ LogFactory.getLog(ZKConfigurationStore.class);
+
+ private long maxLogs;
+
+ @VisibleForTesting
+ protected static final Version CURRENT_VERSION_INFO = Version
+ .newInstance(0, 1);
+ private Configuration conf;
+ private LogMutation pendingMutation;
+
+ private String znodeParentPath;
+
+ private static final String ZK_VERSION_PATH = "VERSION";
+ private static final String LOGS_PATH = "LOGS";
+ private static final String CONF_STORE_PATH = "CONF_STORE";
+ private static final String FENCING_PATH = "FENCING";
+
+ private String zkVersionPath;
+ private String logsPath;
+ private String confStorePath;
+ private String fencingNodePath;
+
+ @VisibleForTesting
+ protected ZKCuratorManager zkManager;
+ private List<ACL> zkAcl;
+
+ @Override
+ public void initialize(Configuration config, Configuration schedConf,
+ RMContext rmContext) throws Exception {
+ this.conf = config;
+ this.maxLogs = conf.getLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS,
+ YarnConfiguration.DEFAULT_RM_SCHEDCONF_ZK_MAX_LOGS);
+ this.znodeParentPath =
+ conf.get(YarnConfiguration.RM_SCHEDCONF_STORE_ZK_PARENT_PATH,
+ YarnConfiguration.DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH);
+ this.zkManager = rmContext.getResourceManager().getAndStartZKManager(conf);
+ this.zkAcl = ZKCuratorManager.getZKAcls(conf);
+
+ this.zkVersionPath = getNodePath(znodeParentPath, ZK_VERSION_PATH);
+ this.logsPath = getNodePath(znodeParentPath, LOGS_PATH);
+ this.confStorePath = getNodePath(znodeParentPath, CONF_STORE_PATH);
+ this.fencingNodePath = getNodePath(znodeParentPath, FENCING_PATH);
+
+ zkManager.createRootDirRecursively(znodeParentPath);
+ zkManager.delete(fencingNodePath);
+
+ if (!zkManager.exists(logsPath)) {
+ zkManager.create(logsPath);
+ zkManager.setData(logsPath,
+ serializeObject(new LinkedList<LogMutation>()), -1);
+ }
+
+ if (!zkManager.exists(confStorePath)) {
+ zkManager.create(confStorePath);
+ HashMap<String, String> mapSchedConf = new HashMap<>();
+ for (Map.Entry<String, String> entry : schedConf) {
+ mapSchedConf.put(entry.getKey(), entry.getValue());
+ }
+ zkManager.setData(confStorePath, serializeObject(mapSchedConf), -1);
+ }
+ }
+
+ @VisibleForTesting
+ protected LinkedList<LogMutation> getLogs() throws Exception {
+ return (LinkedList<LogMutation>)
+ deserializeObject(zkManager.getData(logsPath));
+ }
+
+ // TODO: following version-related code is taken from ZKRMStateStore
+ @Override
+ public Version getCurrentVersion() {
+ return CURRENT_VERSION_INFO;
+ }
+
+ @Override
+ public Version getConfStoreVersion() throws Exception {
+ if (zkManager.exists(zkVersionPath)) {
+ byte[] data = zkManager.getData(zkVersionPath);
+ return new VersionPBImpl(YarnServerCommonProtos.VersionProto
+ .parseFrom(data));
+ }
+
+ return null;
+ }
+
+ @Override
+ public synchronized void storeVersion() throws Exception {
+ byte[] data =
+ ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
+
+ if (zkManager.exists(zkVersionPath)) {
+ zkManager.safeSetData(zkVersionPath, data, -1, zkAcl, fencingNodePath);
+ } else {
+ zkManager.safeCreate(zkVersionPath, data, zkAcl, CreateMode.PERSISTENT,
+ zkAcl, fencingNodePath);
+ }
+ }
+
+ @Override
+ public void logMutation(LogMutation logMutation) throws Exception {
+ byte[] storedLogs = zkManager.getData(logsPath);
+ LinkedList<LogMutation> logs = new LinkedList<>();
+ if (storedLogs != null) {
+ logs = (LinkedList<LogMutation>) deserializeObject(storedLogs);
+ }
+ logs.add(logMutation);
+ if (logs.size() > maxLogs) {
+ logs.remove(logs.removeFirst());
+ }
+ zkManager.safeSetData(logsPath, serializeObject(logs), -1, zkAcl,
+ fencingNodePath);
+ pendingMutation = logMutation;
+ }
+
+ @Override
+ public void confirmMutation(boolean isValid)
+ throws Exception {
+ if (isValid) {
+ Configuration storedConfigs = retrieve();
+ Map<String, String> mapConf = new HashMap<>();
+ for (Map.Entry<String, String> storedConf : storedConfigs) {
+ mapConf.put(storedConf.getKey(), storedConf.getValue());
+ }
+ for (Map.Entry<String, String> confChange :
+ pendingMutation.getUpdates().entrySet()) {
+ if (confChange.getValue() == null || confChange.getValue().isEmpty()) {
+ mapConf.remove(confChange.getKey());
+ } else {
+ mapConf.put(confChange.getKey(), confChange.getValue());
+ }
+ }
+ zkManager.safeSetData(confStorePath, serializeObject(mapConf), -1,
+ zkAcl, fencingNodePath);
+ }
+ pendingMutation = null;
+ }
+
+ @Override
+ public synchronized Configuration retrieve() {
+ byte[] serializedSchedConf;
+ try {
+ serializedSchedConf = zkManager.getData(confStorePath);
+ } catch (Exception e) {
+ LOG.error("Failed to retrieve configuration from zookeeper store", e);
+ return null;
+ }
+ try {
+ Map<String, String> map =
+ (HashMap<String, String>) deserializeObject(serializedSchedConf);
+ Configuration c = new Configuration();
+ for (Map.Entry<String, String> e : map.entrySet()) {
+ c.set(e.getKey(), e.getValue());
+ }
+ return c;
+ } catch (Exception e) {
+ LOG.error("Exception while deserializing scheduler configuration " +
+ "from store", e);
+ }
+ return null;
+ }
+
+ @Override
+ public List<LogMutation> getConfirmedConfHistory(long fromId) {
+ return null; // unimplemented
+ }
+
+ private static String getNodePath(String root, String nodeName) {
+ return ZKCuratorManager.getNodePath(root, nodeName);
+ }
+
+ private static byte[] serializeObject(Object o) throws Exception {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);) {
+ oos.writeObject(o);
+ oos.flush();
+ baos.flush();
+ return baos.toByteArray();
+ }
+ }
+
+ private static Object deserializeObject(byte[] bytes) throws Exception {
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ ObjectInputStream ois = new ObjectInputStream(bais);) {
+ return ois.readObject();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9726e1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
index 516b6293..45210a4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
@@ -129,6 +129,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -2414,7 +2415,7 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
- public Response updateSchedulerConfiguration(SchedConfUpdateInfo
+ public synchronized Response updateSchedulerConfiguration(SchedConfUpdateInfo
mutationInfo, @Context HttpServletRequest hsr)
throws AuthorizationException, InterruptedException {
init();
@@ -2429,17 +2430,32 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
}
ResourceScheduler scheduler = rm.getResourceScheduler();
- if (scheduler instanceof MutableConfScheduler) {
+ if (scheduler instanceof MutableConfScheduler && ((MutableConfScheduler)
+ scheduler).isConfigurationMutable()) {
try {
callerUGI.doAs(new PrivilegedExceptionAction<Void>() {
@Override
- public Void run() throws IOException, YarnException {
- ((MutableConfScheduler) scheduler).updateConfiguration(callerUGI,
- mutationInfo);
+ public Void run() throws Exception {
+ MutableConfigurationProvider provider = ((MutableConfScheduler)
+ scheduler).getMutableConfProvider();
+ if (!provider.getAclMutationPolicy().isMutationAllowed(callerUGI,
+ mutationInfo)) {
+ throw new org.apache.hadoop.security.AccessControlException("User"
+ + " is not admin of all modified queues.");
+ }
+ provider.logAndApplyMutation(callerUGI, mutationInfo);
+ try {
+ rm.getRMContext().getRMAdminService().refreshQueues();
+ } catch (IOException | YarnException e) {
+ provider.confirmPendingMutation(false);
+ throw e;
+ }
+ provider.confirmPendingMutation(true);
return null;
}
});
} catch (IOException e) {
+ LOG.error("Exception thrown when modifying configuration.", e);
return Response.status(Status.BAD_REQUEST).entity(e.getMessage())
.build();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9726e1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java
new file mode 100644
index 0000000..bbe9570
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Base class for {@link YarnConfigurationStore} implementations.
+ */
+public abstract class ConfigurationStoreBaseTest {
+
+ protected YarnConfigurationStore confStore = createConfStore();
+
+ protected abstract YarnConfigurationStore createConfStore();
+
+ protected Configuration conf;
+ protected Configuration schedConf;
+ protected RMContext rmContext;
+
+ protected static final String TEST_USER = "testUser";
+
+ @Before
+ public void setUp() throws Exception {
+ this.conf = new Configuration();
+ this.schedConf = new Configuration(false);
+ }
+
+ @Test
+ public void testConfigurationUpdate() throws Exception {
+ schedConf.set("key1", "val1");
+ confStore.initialize(conf, schedConf, rmContext);
+ assertEquals("val1", confStore.retrieve().get("key1"));
+
+ Map<String, String> update1 = new HashMap<>();
+ update1.put("keyUpdate1", "valUpdate1");
+ YarnConfigurationStore.LogMutation mutation1 =
+ new YarnConfigurationStore.LogMutation(update1, TEST_USER);
+ confStore.logMutation(mutation1);
+ confStore.confirmMutation(true);
+ assertEquals("valUpdate1", confStore.retrieve().get("keyUpdate1"));
+
+ Map<String, String> update2 = new HashMap<>();
+ update2.put("keyUpdate2", "valUpdate2");
+ YarnConfigurationStore.LogMutation mutation2 =
+ new YarnConfigurationStore.LogMutation(update2, TEST_USER);
+ confStore.logMutation(mutation2);
+ confStore.confirmMutation(false);
+ assertNull("Configuration should not be updated",
+ confStore.retrieve().get("keyUpdate2"));
+ }
+
+ @Test
+ public void testNullConfigurationUpdate() throws Exception {
+ schedConf.set("key", "val");
+ confStore.initialize(conf, schedConf, rmContext);
+ assertEquals("val", confStore.retrieve().get("key"));
+
+ Map<String, String> update = new HashMap<>();
+ update.put("key", null);
+ YarnConfigurationStore.LogMutation mutation =
+ new YarnConfigurationStore.LogMutation(update, TEST_USER);
+ confStore.logMutation(mutation);
+ confStore.confirmMutation(true);
+ assertNull(confStore.retrieve().get("key"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9726e1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestInMemoryConfigurationStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestInMemoryConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestInMemoryConfigurationStore.java
new file mode 100644
index 0000000..c40d16a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestInMemoryConfigurationStore.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+
+/**
+ * Tests {@link InMemoryConfigurationStore}.
+ */
+public class TestInMemoryConfigurationStore extends ConfigurationStoreBaseTest {
+
+ @Override
+ protected YarnConfigurationStore createConfStore() {
+ return new InMemoryConfigurationStore();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9726e1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java
index 635a184..9b080cd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@@ -30,14 +29,11 @@ import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
import org.junit.Before;
import org.junit.Test;
-import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -82,25 +78,21 @@ public class TestMutableCSConfigurationProvider {
}
@Test
- public void testInMemoryBackedProvider() throws IOException, YarnException {
+ public void testInMemoryBackedProvider() throws Exception {
Configuration conf = new Configuration();
confProvider.init(conf);
assertNull(confProvider.loadConfiguration(conf)
.get("yarn.scheduler.capacity.root.a.goodKey"));
- doNothing().when(adminService).refreshQueues();
- confProvider.mutateConfiguration(TEST_USER, goodUpdate);
+ confProvider.logAndApplyMutation(TEST_USER, goodUpdate);
+ confProvider.confirmPendingMutation(true);
assertEquals("goodVal", confProvider.loadConfiguration(conf)
.get("yarn.scheduler.capacity.root.a.goodKey"));
assertNull(confProvider.loadConfiguration(conf).get(
"yarn.scheduler.capacity.root.a.badKey"));
- doThrow(new IOException()).when(adminService).refreshQueues();
- try {
- confProvider.mutateConfiguration(TEST_USER, badUpdate);
- } catch (IOException e) {
- // Expected exception.
- }
+ confProvider.logAndApplyMutation(TEST_USER, badUpdate);
+ confProvider.confirmPendingMutation(false);
assertNull(confProvider.loadConfiguration(conf).get(
"yarn.scheduler.capacity.root.a.badKey"));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9726e1fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java
deleted file mode 100644
index 631ce65..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-public class TestYarnConfigurationStore {
-
- private YarnConfigurationStore confStore;
- private Configuration schedConf;
-
- private static final String testUser = "testUser";
-
- @Before
- public void setUp() {
- schedConf = new Configuration(false);
- schedConf.set("key1", "val1");
- }
-
- @Test
- public void testInMemoryConfigurationStore() throws IOException {
- confStore = new InMemoryConfigurationStore();
- confStore.initialize(new Configuration(), schedConf);
- assertEquals("val1", confStore.retrieve().get("key1"));
-
- Map<String, String> update1 = new HashMap<>();
- update1.put("keyUpdate1", "valUpdate1");
- LogMutation mutation1 = new LogMutation(update1, testUser);
- long id = confStore.logMutation(mutation1);
- assertEquals(1, confStore.getPendingMutations().size());
- confStore.confirmMutation(id, true);
- assertEquals("valUpdate1", confStore.retrieve().get("keyUpdate1"));
- assertEquals(0, confStore.getPendingMutations().size());
-
- Map<String, String> update2 = new HashMap<>();
- update2.put("keyUpdate2", "valUpdate2");
- LogMutation mutation2 = new LogMutation(update2, testUser);
- id = confStore.logMutation(mutation2);
- assertEquals(1, confStore.getPendingMutations().size());
- confStore.confirmMutation(id, false);
- assertNull("Configuration should not be updated",
- confStore.retrieve().get("keyUpdate2"));
- assertEquals(0, confStore.getPendingMutations().size());
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org