You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sn...@apache.org on 2020/04/24 09:16:04 UTC
[hadoop] branch branch-3.2 updated: YARN-9998. Code cleanup in
LeveldbConfigurationStore. Contributed by Benjamin Teke
This is an automated email from the ASF dual-hosted git repository.
snemeth pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 3b67dc2 YARN-9998. Code cleanup in LeveldbConfigurationStore. Contributed by Benjamin Teke
3b67dc2 is described below
commit 3b67dc24aaad6bcad3aa63f6081b97b623126768
Author: Szilard Nemeth <sn...@apache.org>
AuthorDate: Fri Apr 24 11:15:53 2020 +0200
YARN-9998. Code cleanup in LeveldbConfigurationStore. Contributed by Benjamin Teke
---
.../capacity/conf/LeveldbConfigurationStore.java | 105 +++++++++++----------
1 file changed, 55 insertions(+), 50 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
index f921a9a..2605e0e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
@@ -54,6 +54,7 @@ import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.function.Consumer;
import static org.fusesource.leveldbjni.JniDBFactory.bytes;
@@ -72,22 +73,23 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
private static final String CONF_VERSION_KEY = "conf-version";
private DB db;
- private DB versiondb;
+ private DB versionDb;
private long maxLogs;
private Configuration conf;
private LogMutation pendingMutation;
+ private Configuration initSchedConf;
@VisibleForTesting
protected static final Version CURRENT_VERSION_INFO = Version
.newInstance(0, 1);
- private Timer compactionTimer;
private long compactionIntervalMsec;
@Override
public void initialize(Configuration config, Configuration schedConf,
RMContext rmContext) throws IOException {
this.conf = config;
+ this.initSchedConf = schedConf;
try {
- initDatabase(schedConf);
+ initDatabase();
this.maxLogs = config.getLong(
YarnConfiguration.RM_SCHEDCONF_MAX_LOGS,
YarnConfiguration.DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS);
@@ -108,7 +110,15 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
fs.delete(getStorageDir(DB_NAME), true);
}
- private void initDatabase(Configuration config) throws Exception {
+ private void initDatabase() throws Exception {
+ Path confVersion = createStorageDir(CONF_VERSION_NAME);
+ Options confOptions = new Options();
+ confOptions.createIfMissing(false);
+ File confVersionFile = new File(confVersion.toString());
+
+ versionDb = initDatabaseHelper(confVersionFile, confOptions,
+ this::initVersionDb);
+
Path storeRoot = createStorageDir(DB_NAME);
Options options = new Options();
options.createIfMissing(false);
@@ -144,49 +154,37 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
return key;
}
});
+ LOG.info("Using conf database at {}", storeRoot);
+ File dbFile = new File(storeRoot.toString());
+ db = initDatabaseHelper(dbFile, options, this::initDb);
+ }
- Path confVersion = createStorageDir(CONF_VERSION_NAME);
- Options confOptions = new Options();
- confOptions.createIfMissing(false);
- LOG.info("Using conf version at " + confVersion);
- File confVersionFile = new File(confVersion.toString());
- try {
- versiondb = JniDBFactory.factory.open(confVersionFile, confOptions);
- } catch (NativeDB.DBException e) {
- if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
- LOG.info("Creating conf version at " + confVersionFile);
- confOptions.createIfMissing(true);
- try {
- versiondb = JniDBFactory.factory.open(confVersionFile, confOptions);
- versiondb.put(bytes(CONF_VERSION_KEY), bytes(String.valueOf(0)));
- } catch (DBException dbErr) {
- throw new IOException(dbErr.getMessage(), dbErr);
- }
- } else {
- throw e;
- }
- }
+ private void initVersionDb(DB database) {
+ database.put(bytes(CONF_VERSION_KEY), bytes(String.valueOf(0)));
+ }
+ private void initDb(DB database) {
+ WriteBatch initBatch = database.createWriteBatch();
+ for (Map.Entry<String, String> kv : initSchedConf) {
+ initBatch.put(bytes(kv.getKey()), bytes(kv.getValue()));
+ }
+ database.write(initBatch);
+ increaseConfigVersion();
+ }
- LOG.info("Using conf database at " + storeRoot);
- File dbfile = new File(storeRoot.toString());
+ private DB initDatabaseHelper(File configurationFile, Options options,
+ Consumer<DB> initMethod) throws Exception {
+ DB database;
try {
- db = JniDBFactory.factory.open(dbfile, options);
+ database = JniDBFactory.factory.open(configurationFile, options);
} catch (NativeDB.DBException e) {
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
- LOG.info("Creating conf database at " + dbfile);
+ LOG.info("Creating configuration version/database at {}",
+ configurationFile);
options.createIfMissing(true);
try {
- db = JniDBFactory.factory.open(dbfile, options);
- // Write the initial scheduler configuration
- WriteBatch initBatch = db.createWriteBatch();
- for (Map.Entry<String, String> kv : config) {
- initBatch.put(bytes(kv.getKey()), bytes(kv.getValue()));
- }
- db.write(initBatch);
- long configVersion = getConfigVersion() + 1L;
- versiondb.put(bytes(CONF_VERSION_KEY),
- bytes(String.valueOf(configVersion)));
+ database = JniDBFactory.factory.open(configurationFile, options);
+ initMethod.accept(database);
} catch (DBException dbErr) {
throw new IOException(dbErr.getMessage(), dbErr);
}
@@ -194,6 +192,8 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
throw e;
}
}
+
+ return database;
}
private Path createStorageDir(String storageName) throws IOException {
@@ -217,8 +217,8 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
if (db != null) {
db.close();
}
- if (versiondb != null) {
- versiondb.close();
+ if (versionDb != null) {
+ versionDb.close();
}
}
@@ -245,9 +245,7 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
updateBatch.put(bytes(changes.getKey()), bytes(changes.getValue()));
}
}
- long configVersion = getConfigVersion() + 1L;
- versiondb.put(bytes(CONF_VERSION_KEY),
- bytes(String.valueOf(configVersion)));
+ increaseConfigVersion();
}
db.write(updateBatch);
pendingMutation = null;
@@ -263,6 +261,10 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
}
}
+ // Because of type erasure casting to LinkedList<LogMutation> will be
+ // unchecked. A way around that would be to iterate over the logMutations
+ // which is overkill in this case.
+ @SuppressWarnings("unchecked")
private LinkedList<LogMutation> deserLogMutations(byte[] mutations) throws
IOException {
if (mutations == null) {
@@ -293,9 +295,15 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
return config;
}
+ private void increaseConfigVersion() {
+ long configVersion = getConfigVersion() + 1L;
+ versionDb.put(bytes(CONF_VERSION_KEY),
+ bytes(String.valueOf(configVersion)));
+ }
+
@Override
public long getConfigVersion() {
- String version = new String(versiondb.get(bytes(CONF_VERSION_KEY)),
+ String version = new String(versionDb.get(bytes(CONF_VERSION_KEY)),
StandardCharsets.UTF_8);
return Long.parseLong(version);
}
@@ -305,18 +313,15 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
return null; // unimplemented
}
- // TODO below was taken from LeveldbRMStateStore, it can probably be
- // refactored
private void startCompactionTimer() {
if (compactionIntervalMsec > 0) {
- compactionTimer = new Timer(
+ Timer compactionTimer = new Timer(
this.getClass().getSimpleName() + " compaction timer", true);
compactionTimer.schedule(new CompactionTimerTask(),
compactionIntervalMsec, compactionIntervalMsec);
}
}
- // TODO: following is taken from LeveldbRMStateStore
@Override
public Version getConfStoreVersion() throws Exception {
Version version = null;
@@ -370,7 +375,7 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
LOG.error("Error compacting database", e);
}
long duration = Time.monotonicNow() - start;
- LOG.info("Full compaction cycle completed in " + duration + " msec");
+ LOG.info("Full compaction cycle completed in {} msec", duration);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org