You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sn...@apache.org on 2020/01/14 11:26:35 UTC
[hadoop] branch trunk updated: YARN-9788. Queue Management API does
not support parallel updates. Contributed by Prabhu Joseph
This is an automated email from the ASF dual-hosted git repository.
snemeth pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1c51f36 YARN-9788. Queue Management API does not support parallel updates. Contributed by Prabhu Joseph
1c51f36 is described below
commit 1c51f36be79924489ca4a2e5ca7e96ac75a6ec18
Author: Szilard Nemeth <sn...@apache.org>
AuthorDate: Tue Jan 14 12:26:03 2020 +0100
YARN-9788. Queue Management API does not support parallel updates. Contributed by Prabhu Joseph
---
.../hadoop/yarn/client/cli/TestSchedConfCLI.java | 7 +--
.../scheduler/MutableConfigurationProvider.java | 10 ++--
.../conf/FSSchedulerConfigurationStore.java | 8 +--
.../capacity/conf/InMemoryConfigurationStore.java | 10 ++--
.../capacity/conf/LeveldbConfigurationStore.java | 6 +--
.../conf/MutableCSConfigurationProvider.java | 8 +--
.../capacity/conf/YarnConfigurationStore.java | 6 ++-
.../capacity/conf/ZKConfigurationStore.java | 7 +--
.../resourcemanager/webapp/RMWebServices.java | 8 +--
.../capacity/conf/ConfigurationStoreBaseTest.java | 6 +--
.../conf/TestFSSchedulerConfigurationStore.java | 8 +--
.../conf/TestLeveldbConfigurationStore.java | 16 +++---
.../conf/TestMutableCSConfigurationProvider.java | 57 +++++++++++++++++-----
.../capacity/conf/TestZKConfigurationStore.java | 30 ++++++------
14 files changed, 116 insertions(+), 71 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestSchedConfCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestSchedConfCLI.java
index 4233b4c..3b961df 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestSchedConfCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestSchedConfCLI.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.JAXBContextResolver;
@@ -247,10 +248,10 @@ public class TestSchedConfCLI extends JerseyTestBase {
globalUpdates.put("schedKey1", "schedVal1");
schedUpdateInfo.setGlobalParams(globalUpdates);
- provider.logAndApplyMutation(UserGroupInformation.getCurrentUser(),
- schedUpdateInfo);
+ LogMutation log = provider.logAndApplyMutation(
+ UserGroupInformation.getCurrentUser(), schedUpdateInfo);
rm.getRMContext().getRMAdminService().refreshQueues();
- provider.confirmPendingMutation(true);
+ provider.confirmPendingMutation(log, true);
Configuration schedulerConf = provider.getConfiguration();
assertEquals("schedVal1", schedulerConf.get("schedKey1"));
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
index 03902e3..751c9a3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
import java.io.IOException;
@@ -46,18 +47,21 @@ public interface MutableConfigurationProvider {
* Log user's requested configuration mutation, and applies it in-memory.
* @param user User who requested the change
* @param confUpdate User's requested configuration change
+ * @return LogMutation with update info from given SchedConfUpdateInfo
* @throws Exception if logging the mutation fails
*/
- void logAndApplyMutation(UserGroupInformation user, SchedConfUpdateInfo
- confUpdate) throws Exception;
+ LogMutation logAndApplyMutation(UserGroupInformation user,
+ SchedConfUpdateInfo confUpdate) throws Exception;
/**
* Confirm last logged mutation.
+ * @param pendingMutation the log mutation to apply
* @param isValid if the last logged mutation is applied to scheduler
* properly.
* @throws Exception if confirming mutation fails
*/
- void confirmPendingMutation(boolean isValid) throws Exception;
+ void confirmPendingMutation(LogMutation pendingMutation,
+ boolean isValid) throws Exception;
/**
* Returns scheduler configuration cached in this provider.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java
index 855939e..eeb38d3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java
@@ -58,7 +58,6 @@ public class FSSchedulerConfigurationStore extends YarnConfigurationStore {
private int maxVersion;
private Path schedulerConfDir;
private FileSystem fileSystem;
- private LogMutation pendingMutation;
private PathFilter configFilePathFilter;
private volatile Configuration schedConf;
private volatile Configuration oldConf;
@@ -134,10 +133,9 @@ public class FSSchedulerConfigurationStore extends YarnConfigurationStore {
*/
@Override
public void logMutation(LogMutation logMutation) throws IOException {
- pendingMutation = logMutation;
LOG.info(new GsonBuilder().serializeNulls().create().toJson(logMutation));
oldConf = new Configuration(schedConf);
- Map<String, String> mutations = pendingMutation.getUpdates();
+ Map<String, String> mutations = logMutation.getUpdates();
for (Map.Entry<String, String> kv : mutations.entrySet()) {
if (kv.getValue() == null) {
this.schedConf.unset(kv.getKey());
@@ -149,12 +147,14 @@ public class FSSchedulerConfigurationStore extends YarnConfigurationStore {
}
/**
+ * @param pendingMutation the log mutation to apply
* @param isValid if true, finalize temp configuration file
* if false, remove temp configuration file and rollback
* @throws Exception throw IOE when write temp configuration file fail
*/
@Override
- public void confirmMutation(boolean isValid) throws Exception {
+ public void confirmMutation(LogMutation pendingMutation,
+ boolean isValid) throws Exception {
if (pendingMutation == null || tempConfigPath == null) {
LOG.warn("pendingMutation or tempConfigPath is null, do nothing");
return;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
index 47dd6bd..d031ea9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
@@ -32,7 +32,6 @@ import java.util.Map;
public class InMemoryConfigurationStore extends YarnConfigurationStore {
private Configuration schedConf;
- private LogMutation pendingMutation;
private long configVersion;
@Override
@@ -42,13 +41,17 @@ public class InMemoryConfigurationStore extends YarnConfigurationStore {
this.configVersion = 1L;
}
+ /**
+ * This method does not log as it does not support backing store.
+ * The mutation to be applied on top of schedConf will be directly passed
+ * in confirmMutation.
+ */
@Override
public void logMutation(LogMutation logMutation) {
- pendingMutation = logMutation;
}
@Override
- public void confirmMutation(boolean isValid) {
+ public void confirmMutation(LogMutation pendingMutation, boolean isValid) {
if (isValid) {
for (Map.Entry<String, String> kv : pendingMutation.getUpdates()
.entrySet()) {
@@ -60,7 +63,6 @@ public class InMemoryConfigurationStore extends YarnConfigurationStore {
}
this.configVersion = this.configVersion + 1L;
}
- pendingMutation = null;
}
@Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
index 39cd8ff..bcdfb59 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
@@ -75,7 +75,6 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
private DB versiondb;
private long maxLogs;
private Configuration conf;
- private LogMutation pendingMutation;
@VisibleForTesting
protected static final Version CURRENT_VERSION_INFO = Version
.newInstance(0, 1);
@@ -232,11 +231,11 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
}
db.put(bytes(LOG_KEY), serLogMutations(logs));
}
- pendingMutation = logMutation;
}
@Override
- public void confirmMutation(boolean isValid) throws IOException {
+ public void confirmMutation(LogMutation pendingMutation,
+ boolean isValid) throws IOException {
WriteBatch updateBatch = db.createWriteBatch();
if (isValid) {
for (Map.Entry<String, String> changes :
@@ -252,7 +251,6 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
bytes(String.valueOf(configVersion)));
}
db.write(updateBatch);
- pendingMutation = null;
}
private byte[] serLogMutations(LinkedList<LogMutation> mutations) throws
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
index c8d0a0c..0914640 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
@@ -128,7 +128,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
}
@Override
- public void logAndApplyMutation(UserGroupInformation user,
+ public LogMutation logAndApplyMutation(UserGroupInformation user,
SchedConfUpdateInfo confUpdate) throws Exception {
oldConf = new Configuration(schedConf);
Map<String, String> kvUpdate = constructKeyValueConfUpdate(confUpdate);
@@ -141,6 +141,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
schedConf.set(kv.getKey(), kv.getValue());
}
}
+ return log;
}
@Override
@@ -184,10 +185,11 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
}
@Override
- public void confirmPendingMutation(boolean isValid) throws Exception {
+ public void confirmPendingMutation(LogMutation pendingMutation,
+ boolean isValid) throws Exception {
formatLock.readLock().lock();
try {
- confStore.confirmMutation(isValid);
+ confStore.confirmMutation(pendingMutation, isValid);
if (!isValid) {
schedConf = oldConf;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
index 6af11a3..34aa174 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
@@ -52,7 +52,7 @@ public abstract class YarnConfigurationStore {
* LogMutation encapsulates the fields needed for configuration mutation
* audit logging and recovery.
*/
- static class LogMutation implements Serializable {
+ public static class LogMutation implements Serializable {
private Map<String, String> updates;
private String user;
@@ -113,11 +113,13 @@ public abstract class YarnConfigurationStore {
* last logged by {@code logMutation} and marks the mutation as persisted (no
* longer pending). If isValid is true, merge the mutation with the persisted
* configuration.
+ * @param pendingMutation the log mutation to apply
* @param isValid if true, update persisted configuration with pending
* mutation.
* @throws Exception if mutation confirmation fails
*/
- public abstract void confirmMutation(boolean isValid) throws Exception;
+ public abstract void confirmMutation(LogMutation pendingMutation,
+ boolean isValid) throws Exception;
/**
* Retrieve the persisted configuration.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java
index 75ae727..0fa48b4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java
@@ -54,7 +54,6 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
protected static final Version CURRENT_VERSION_INFO = Version
.newInstance(0, 1);
private Configuration conf;
- private LogMutation pendingMutation;
private String znodeParentPath;
@@ -175,12 +174,11 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
zkManager.safeSetData(logsPath, serializeObject(logs), -1, zkAcl,
fencingNodePath);
}
- pendingMutation = logMutation;
}
@Override
- public void confirmMutation(boolean isValid)
- throws Exception {
+ public void confirmMutation(LogMutation pendingMutation,
+ boolean isValid) throws Exception {
if (isValid) {
Configuration storedConfigs = retrieve();
Map<String, String> mapConf = new HashMap<>();
@@ -201,7 +199,6 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
zkManager.setData(confVersionPath, String.valueOf(configVersion), -1);
}
- pendingMutation = null;
}
@Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
index e63dc06..bdd8e64 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
@@ -148,6 +148,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
@@ -2643,14 +2644,15 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
throw new org.apache.hadoop.security.AccessControlException("User"
+ " is not admin of all modified queues.");
}
- provider.logAndApplyMutation(callerUGI, mutationInfo);
+ LogMutation logMutation = provider.logAndApplyMutation(callerUGI,
+ mutationInfo);
try {
rm.getRMContext().getRMAdminService().refreshQueues();
} catch (IOException | YarnException e) {
- provider.confirmPendingMutation(false);
+ provider.confirmPendingMutation(logMutation, false);
throw e;
}
- provider.confirmPendingMutation(true);
+ provider.confirmPendingMutation(logMutation, true);
return null;
}
});
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java
index 0f50b53..4b3153a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java
@@ -64,7 +64,7 @@ public abstract class ConfigurationStoreBaseTest {
YarnConfigurationStore.LogMutation mutation1 =
new YarnConfigurationStore.LogMutation(update1, TEST_USER);
confStore.logMutation(mutation1);
- confStore.confirmMutation(true);
+ confStore.confirmMutation(mutation1, true);
assertEquals("valUpdate1", confStore.retrieve().get("keyUpdate1"));
Map<String, String> update2 = new HashMap<>();
@@ -72,7 +72,7 @@ public abstract class ConfigurationStoreBaseTest {
YarnConfigurationStore.LogMutation mutation2 =
new YarnConfigurationStore.LogMutation(update2, TEST_USER);
confStore.logMutation(mutation2);
- confStore.confirmMutation(false);
+ confStore.confirmMutation(mutation2, false);
assertNull("Configuration should not be updated",
confStore.retrieve().get("keyUpdate2"));
confStore.close();
@@ -89,7 +89,7 @@ public abstract class ConfigurationStoreBaseTest {
YarnConfigurationStore.LogMutation mutation =
new YarnConfigurationStore.LogMutation(update, TEST_USER);
confStore.logMutation(mutation);
- confStore.confirmMutation(true);
+ confStore.confirmMutation(mutation, true);
assertNull(confStore.retrieve().get("key"));
confStore.close();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java
index 7968372..e4ca3d3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java
@@ -100,7 +100,7 @@ public class TestFSSchedulerConfigurationStore {
LogMutation logMutation = new LogMutation(updates, "test");
configurationStore.logMutation(logMutation);
- configurationStore.confirmMutation(true);
+ configurationStore.confirmMutation(logMutation, true);
storeConf = configurationStore.retrieve();
assertEquals(null, storeConf.get("a"));
assertEquals("bb", storeConf.get("b"));
@@ -110,7 +110,7 @@ public class TestFSSchedulerConfigurationStore {
updates.put("b", "bbb");
configurationStore.logMutation(logMutation);
- configurationStore.confirmMutation(true);
+ configurationStore.confirmMutation(logMutation, true);
storeConf = configurationStore.retrieve();
assertEquals(null, storeConf.get("a"));
assertEquals("bbb", storeConf.get("b"));
@@ -133,7 +133,7 @@ public class TestFSSchedulerConfigurationStore {
LogMutation logMutation = new LogMutation(updates, "test");
configurationStore.logMutation(logMutation);
- configurationStore.confirmMutation(false);
+ configurationStore.confirmMutation(logMutation, false);
storeConf = configurationStore.retrieve();
compareConfig(conf, storeConf);
@@ -168,7 +168,7 @@ public class TestFSSchedulerConfigurationStore {
updates.put("testkey", "testvalue");
LogMutation logMutation = new LogMutation(updates, "test");
configStore.logMutation(logMutation);
- configStore.confirmMutation(true);
+ configStore.confirmMutation(logMutation, true);
} catch (IOException e) {
if (e.getMessage().contains("Filesystem closed")) {
fail("FSSchedulerConfigurationStore failed to handle " +
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java
index 5f78aa2..0ae7624 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
import org.junit.Before;
import org.junit.Test;
@@ -103,7 +104,7 @@ public class TestLeveldbConfigurationStore extends ConfigurationStoreBaseTest {
YarnConfigurationStore.LogMutation mutation =
new YarnConfigurationStore.LogMutation(update, TEST_USER);
confStore.logMutation(mutation);
- confStore.confirmMutation(true);
+ confStore.confirmMutation(mutation, true);
assertEquals("val", confStore.retrieve().get("key"));
confStore.close();
@@ -159,7 +160,7 @@ public class TestLeveldbConfigurationStore extends ConfigurationStoreBaseTest {
logs = ((LeveldbConfigurationStore) confStore).getLogs();
assertEquals(1, logs.size());
assertEquals("val1", logs.get(0).getUpdates().get("key1"));
- confStore.confirmMutation(true);
+ confStore.confirmMutation(mutation, true);
assertEquals(1, logs.size());
assertEquals("val1", logs.get(0).getUpdates().get("key1"));
@@ -171,7 +172,7 @@ public class TestLeveldbConfigurationStore extends ConfigurationStoreBaseTest {
assertEquals(2, logs.size());
assertEquals("val1", logs.get(0).getUpdates().get("key1"));
assertEquals("val2", logs.get(1).getUpdates().get("key2"));
- confStore.confirmMutation(true);
+ confStore.confirmMutation(mutation, true);
assertEquals(2, logs.size());
assertEquals("val1", logs.get(0).getUpdates().get("key1"));
assertEquals("val2", logs.get(1).getUpdates().get("key2"));
@@ -185,7 +186,7 @@ public class TestLeveldbConfigurationStore extends ConfigurationStoreBaseTest {
assertEquals(2, logs.size());
assertEquals("val2", logs.get(0).getUpdates().get("key2"));
assertEquals("val3", logs.get(1).getUpdates().get("key3"));
- confStore.confirmMutation(true);
+ confStore.confirmMutation(mutation, true);
assertEquals(2, logs.size());
assertEquals("val2", logs.get(0).getUpdates().get("key2"));
assertEquals("val3", logs.get(1).getUpdates().get("key3"));
@@ -211,16 +212,17 @@ public class TestLeveldbConfigurationStore extends ConfigurationStoreBaseTest {
rm1.getResourceScheduler()).getMutableConfProvider();
UserGroupInformation user = UserGroupInformation
.createUserForTesting(TEST_USER, new String[0]);
- confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+ LogMutation log = confProvider.logAndApplyMutation(user,
+ schedConfUpdateInfo);
rm1.getResourceScheduler().reinitialize(conf, rm1.getRMContext());
assertEquals("val", ((MutableConfScheduler) rm1.getResourceScheduler())
.getConfiguration().get("key"));
- confProvider.confirmPendingMutation(true);
+ confProvider.confirmPendingMutation(log, true);
assertEquals("val", ((MutableCSConfigurationProvider) confProvider)
.getConfStore().retrieve().get("key"));
// Next update is not persisted, it should not be recovered
schedConfUpdateInfo.getGlobalParams().put("key", "badVal");
- confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+ log = confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
rm1.close();
// Start RM2 and verifies it starts with updated configuration
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java
index cb416e2..0c9a312 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
import org.junit.Before;
@@ -93,15 +94,15 @@ public class TestMutableCSConfigurationProvider {
assertNull(confProvider.loadConfiguration(conf)
.get("yarn.scheduler.capacity.root.a.goodKey"));
- confProvider.logAndApplyMutation(TEST_USER, goodUpdate);
- confProvider.confirmPendingMutation(true);
+ LogMutation log = confProvider.logAndApplyMutation(TEST_USER, goodUpdate);
+ confProvider.confirmPendingMutation(log, true);
assertEquals("goodVal", confProvider.loadConfiguration(conf)
.get("yarn.scheduler.capacity.root.a.goodKey"));
assertNull(confProvider.loadConfiguration(conf).get(
"yarn.scheduler.capacity.root.a.badKey"));
- confProvider.logAndApplyMutation(TEST_USER, badUpdate);
- confProvider.confirmPendingMutation(false);
+ log = confProvider.logAndApplyMutation(TEST_USER, badUpdate);
+ confProvider.confirmPendingMutation(log, false);
assertNull(confProvider.loadConfiguration(conf).get(
"yarn.scheduler.capacity.root.a.badKey"));
@@ -125,8 +126,8 @@ public class TestMutableCSConfigurationProvider {
QueueConfigInfo("root.a", updateMap);
updateInfo.getUpdateQueueInfo().add(queueConfigInfo);
- confProvider.logAndApplyMutation(TEST_USER, updateInfo);
- confProvider.confirmPendingMutation(true);
+ LogMutation log = confProvider.logAndApplyMutation(TEST_USER, updateInfo);
+ confProvider.confirmPendingMutation(log, true);
assertEquals("testval1", confProvider.loadConfiguration(conf)
.get("yarn.scheduler.capacity.root.a.testkey1"));
assertEquals("testval2", confProvider.loadConfiguration(conf)
@@ -138,8 +139,8 @@ public class TestMutableCSConfigurationProvider {
queueConfigInfo = new QueueConfigInfo("root.a", updateMap);
updateInfo.getUpdateQueueInfo().add(queueConfigInfo);
- confProvider.logAndApplyMutation(TEST_USER, updateInfo);
- confProvider.confirmPendingMutation(true);
+ log = confProvider.logAndApplyMutation(TEST_USER, updateInfo);
+ confProvider.confirmPendingMutation(log, true);
assertNull("Failed to remove config",
confProvider.loadConfiguration(conf)
.get("yarn.scheduler.capacity.root.a.testkey1"));
@@ -148,6 +149,38 @@ public class TestMutableCSConfigurationProvider {
}
@Test
+ public void testMultipleUpdatesNotLost() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
+ YarnConfiguration.MEMORY_CONFIGURATION_STORE);
+ confProvider.init(conf);
+
+ SchedConfUpdateInfo updateInfo1 = new SchedConfUpdateInfo();
+ Map<String, String> updateMap1 = new HashMap<>();
+ updateMap1.put("key1", "val1");
+ QueueConfigInfo queueConfigInfo1 = new
+ QueueConfigInfo("root.a", updateMap1);
+ updateInfo1.getUpdateQueueInfo().add(queueConfigInfo1);
+ LogMutation log1 = confProvider.logAndApplyMutation(TEST_USER, updateInfo1);
+
+ SchedConfUpdateInfo updateInfo2 = new SchedConfUpdateInfo();
+ Map<String, String> updateMap2 = new HashMap<>();
+ updateMap2.put("key2", "val2");
+ QueueConfigInfo queueConfigInfo2 = new
+ QueueConfigInfo("root.a", updateMap2);
+ updateInfo2.getUpdateQueueInfo().add(queueConfigInfo2);
+ LogMutation log2 = confProvider.logAndApplyMutation(TEST_USER, updateInfo2);
+
+ confProvider.confirmPendingMutation(log1, true);
+ confProvider.confirmPendingMutation(log2, true);
+
+ assertEquals("val1", confProvider.loadConfiguration(conf)
+ .get("yarn.scheduler.capacity.root.a.key1"));
+ assertEquals("val2", confProvider.loadConfiguration(conf)
+ .get("yarn.scheduler.capacity.root.a.key2"));
+ }
+
+ @Test
public void testHDFSBackedProvider() throws Exception {
File testSchedulerConfigurationDir = new File(
TestMutableCSConfigurationProvider.class.getResource("").getPath()
@@ -166,15 +199,15 @@ public class TestMutableCSConfigurationProvider {
assertNull(confProvider.loadConfiguration(conf)
.get("yarn.scheduler.capacity.root.a.goodKey"));
- confProvider.logAndApplyMutation(TEST_USER, goodUpdate);
- confProvider.confirmPendingMutation(true);
+ LogMutation log = confProvider.logAndApplyMutation(TEST_USER, goodUpdate);
+ confProvider.confirmPendingMutation(log, true);
assertEquals("goodVal", confProvider.loadConfiguration(conf)
.get("yarn.scheduler.capacity.root.a.goodKey"));
assertNull(confProvider.loadConfiguration(conf).get(
"yarn.scheduler.capacity.root.a.badKey"));
- confProvider.logAndApplyMutation(TEST_USER, badUpdate);
- confProvider.confirmPendingMutation(false);
+ log = confProvider.logAndApplyMutation(TEST_USER, badUpdate);
+ confProvider.confirmPendingMutation(log, false);
assertNull(confProvider.loadConfiguration(conf).get(
"yarn.scheduler.capacity.root.a.badKey"));
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java
index 57ccd75..e67e382 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
import org.junit.After;
@@ -148,7 +149,7 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
YarnConfigurationStore.LogMutation mutation =
new YarnConfigurationStore.LogMutation(update, TEST_USER);
confStore.logMutation(mutation);
- confStore.confirmMutation(true);
+ confStore.confirmMutation(mutation, true);
long v2 = confStore.getConfigVersion();
assertEquals(2, v2);
}
@@ -160,10 +161,9 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
Map<String, String> update = new HashMap<>();
update.put("key", "val");
- YarnConfigurationStore.LogMutation mutation =
- new YarnConfigurationStore.LogMutation(update, TEST_USER);
+ LogMutation mutation = new LogMutation(update, TEST_USER);
confStore.logMutation(mutation);
- confStore.confirmMutation(true);
+ confStore.confirmMutation(mutation, true);
assertEquals("val", confStore.retrieve().get("key"));
// Create a new configuration store, and check for updated configuration
@@ -190,7 +190,7 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
logs = ((ZKConfigurationStore) confStore).getLogs();
assertEquals(1, logs.size());
assertEquals("val1", logs.get(0).getUpdates().get("key1"));
- confStore.confirmMutation(true);
+ confStore.confirmMutation(mutation, true);
assertEquals(1, logs.size());
assertEquals("val1", logs.get(0).getUpdates().get("key1"));
@@ -202,7 +202,7 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
assertEquals(2, logs.size());
assertEquals("val1", logs.get(0).getUpdates().get("key1"));
assertEquals("val2", logs.get(1).getUpdates().get("key2"));
- confStore.confirmMutation(true);
+ confStore.confirmMutation(mutation, true);
assertEquals(2, logs.size());
assertEquals("val1", logs.get(0).getUpdates().get("key1"));
assertEquals("val2", logs.get(1).getUpdates().get("key2"));
@@ -216,7 +216,7 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
assertEquals(2, logs.size());
assertEquals("val2", logs.get(0).getUpdates().get("key2"));
assertEquals("val3", logs.get(1).getUpdates().get("key3"));
- confStore.confirmMutation(true);
+ confStore.confirmMutation(mutation, true);
assertEquals(2, logs.size());
assertEquals("val2", logs.get(0).getUpdates().get("key2"));
assertEquals("val3", logs.get(1).getUpdates().get("key3"));
@@ -308,16 +308,17 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
rm1.getResourceScheduler()).getMutableConfProvider();
UserGroupInformation user = UserGroupInformation
.createUserForTesting(TEST_USER, new String[0]);
- confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+ LogMutation log = confProvider.logAndApplyMutation(user,
+ schedConfUpdateInfo);
rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext());
assertEquals("val", ((MutableConfScheduler) rm1.getResourceScheduler())
.getConfiguration().get("key"));
- confProvider.confirmPendingMutation(true);
+ confProvider.confirmPendingMutation(log, true);
assertEquals("val", ((MutableCSConfigurationProvider) confProvider)
.getConfStore().retrieve().get("key"));
// Next update is not persisted, it should not be recovered
schedConfUpdateInfo.getGlobalParams().put("key", "badVal");
- confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+ log = confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
// Start RM2 and verifies it starts with updated configuration
rm2.getRMContext().getRMAdminService().transitionToActive(req);
@@ -400,9 +401,10 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
stopParams.put("capacity", "0");
QueueConfigInfo stopInfo = new QueueConfigInfo("root.default", stopParams);
schedConfUpdateInfo.getUpdateQueueInfo().add(stopInfo);
- confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+ LogMutation log = confProvider.logAndApplyMutation(user,
+ schedConfUpdateInfo);
rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext());
- confProvider.confirmPendingMutation(true);
+ confProvider.confirmPendingMutation(log, true);
assertTrue(Arrays.asList(((MutableConfScheduler) rm1.getResourceScheduler())
.getConfiguration().get("yarn.scheduler.capacity.root.queues").split
(",")).contains("a"));
@@ -411,9 +413,9 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
schedConfUpdateInfo.getUpdateQueueInfo().clear();
schedConfUpdateInfo.getAddQueueInfo().clear();
schedConfUpdateInfo.getRemoveQueueInfo().add("root.default");
- confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+ log = confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext());
- confProvider.confirmPendingMutation(true);
+ confProvider.confirmPendingMutation(log, true);
assertEquals("a", ((MutableConfScheduler) rm1.getResourceScheduler())
.getConfiguration().get("yarn.scheduler.capacity.root.queues"));
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org