You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sn...@apache.org on 2020/04/24 09:16:04 UTC

[hadoop] branch branch-3.2 updated: YARN-9998. Code cleanup in LeveldbConfigurationStore. Contributed by Benjamin Teke

This is an automated email from the ASF dual-hosted git repository.

snemeth pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 3b67dc2  YARN-9998. Code cleanup in LeveldbConfigurationStore. Contributed by Benjamin Teke
3b67dc2 is described below

commit 3b67dc24aaad6bcad3aa63f6081b97b623126768
Author: Szilard Nemeth <sn...@apache.org>
AuthorDate: Fri Apr 24 11:15:53 2020 +0200

    YARN-9998. Code cleanup in LeveldbConfigurationStore. Contributed by Benjamin Teke
---
 .../capacity/conf/LeveldbConfigurationStore.java   | 105 +++++++++++----------
 1 file changed, 55 insertions(+), 50 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
index f921a9a..2605e0e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
@@ -54,6 +54,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.function.Consumer;
 
 import static org.fusesource.leveldbjni.JniDBFactory.bytes;
 
@@ -72,22 +73,23 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
   private static final String CONF_VERSION_KEY = "conf-version";
 
   private DB db;
-  private DB versiondb;
+  private DB versionDb;
   private long maxLogs;
   private Configuration conf;
   private LogMutation pendingMutation;
+  private Configuration initSchedConf;
   @VisibleForTesting
   protected static final Version CURRENT_VERSION_INFO = Version
       .newInstance(0, 1);
-  private Timer compactionTimer;
   private long compactionIntervalMsec;
 
   @Override
   public void initialize(Configuration config, Configuration schedConf,
       RMContext rmContext) throws IOException {
     this.conf = config;
+    this.initSchedConf = schedConf;
     try {
-      initDatabase(schedConf);
+      initDatabase();
       this.maxLogs = config.getLong(
           YarnConfiguration.RM_SCHEDCONF_MAX_LOGS,
           YarnConfiguration.DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS);
@@ -108,7 +110,15 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
     fs.delete(getStorageDir(DB_NAME), true);
   }
 
-  private void initDatabase(Configuration config) throws Exception {
+  private void initDatabase() throws Exception {
+    Path confVersion = createStorageDir(CONF_VERSION_NAME);
+    Options confOptions = new Options();
+    confOptions.createIfMissing(false);
+    File confVersionFile = new File(confVersion.toString());
+
+    versionDb = initDatabaseHelper(confVersionFile, confOptions,
+        this::initVersionDb);
+
     Path storeRoot = createStorageDir(DB_NAME);
     Options options = new Options();
     options.createIfMissing(false);
@@ -144,49 +154,37 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
         return key;
       }
     });
+    LOG.info("Using conf database at {}", storeRoot);
+    File dbFile = new File(storeRoot.toString());
+    db = initDatabaseHelper(dbFile, options, this::initDb);
+  }
 
-    Path confVersion = createStorageDir(CONF_VERSION_NAME);
-    Options confOptions = new Options();
-    confOptions.createIfMissing(false);
-    LOG.info("Using conf version at " + confVersion);
-    File confVersionFile = new File(confVersion.toString());
-    try {
-      versiondb = JniDBFactory.factory.open(confVersionFile, confOptions);
-    } catch (NativeDB.DBException e) {
-      if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
-        LOG.info("Creating conf version at " + confVersionFile);
-        confOptions.createIfMissing(true);
-        try {
-          versiondb = JniDBFactory.factory.open(confVersionFile, confOptions);
-          versiondb.put(bytes(CONF_VERSION_KEY), bytes(String.valueOf(0)));
-        } catch (DBException dbErr) {
-          throw new IOException(dbErr.getMessage(), dbErr);
-        }
-      } else {
-        throw e;
-      }
-    }
+  private void initVersionDb(DB database) {
+    database.put(bytes(CONF_VERSION_KEY), bytes(String.valueOf(0)));
+  }
 
+  private void initDb(DB database) {
+    WriteBatch initBatch = database.createWriteBatch();
+    for (Map.Entry<String, String> kv : initSchedConf) {
+      initBatch.put(bytes(kv.getKey()), bytes(kv.getValue()));
+    }
+    database.write(initBatch);
+    increaseConfigVersion();
+  }
 
-    LOG.info("Using conf database at " + storeRoot);
-    File dbfile = new File(storeRoot.toString());
+  private DB initDatabaseHelper(File configurationFile, Options options,
+      Consumer<DB> initMethod) throws Exception {
+    DB database;
     try {
-      db = JniDBFactory.factory.open(dbfile, options);
+      database = JniDBFactory.factory.open(configurationFile, options);
     } catch (NativeDB.DBException e) {
       if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
-        LOG.info("Creating conf database at " + dbfile);
+        LOG.info("Creating configuration version/database at {}",
+            configurationFile);
         options.createIfMissing(true);
         try {
-          db = JniDBFactory.factory.open(dbfile, options);
-          // Write the initial scheduler configuration
-          WriteBatch initBatch = db.createWriteBatch();
-          for (Map.Entry<String, String> kv : config) {
-            initBatch.put(bytes(kv.getKey()), bytes(kv.getValue()));
-          }
-          db.write(initBatch);
-          long configVersion = getConfigVersion() + 1L;
-          versiondb.put(bytes(CONF_VERSION_KEY),
-              bytes(String.valueOf(configVersion)));
+          database = JniDBFactory.factory.open(configurationFile, options);
+          initMethod.accept(database);
         } catch (DBException dbErr) {
           throw new IOException(dbErr.getMessage(), dbErr);
         }
@@ -194,6 +192,8 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
         throw e;
       }
     }
+
+    return database;
   }
 
   private Path createStorageDir(String storageName) throws IOException {
@@ -217,8 +217,8 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
     if (db != null) {
       db.close();
     }
-    if (versiondb != null) {
-      versiondb.close();
+    if (versionDb != null) {
+      versionDb.close();
     }
   }
 
@@ -245,9 +245,7 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
           updateBatch.put(bytes(changes.getKey()), bytes(changes.getValue()));
         }
       }
-      long configVersion = getConfigVersion() + 1L;
-      versiondb.put(bytes(CONF_VERSION_KEY),
-          bytes(String.valueOf(configVersion)));
+      increaseConfigVersion();
     }
     db.write(updateBatch);
     pendingMutation = null;
@@ -263,6 +261,10 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
     }
   }
 
+  // Because of type erasure casting to LinkedList<LogMutation> will be
+  // unchecked. A way around that would be to iterate over the logMutations
+  // which is overkill in this case.
+  @SuppressWarnings("unchecked")
   private LinkedList<LogMutation> deserLogMutations(byte[] mutations) throws
       IOException {
     if (mutations == null) {
@@ -293,9 +295,15 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
     return config;
   }
 
+  private void increaseConfigVersion() {
+    long configVersion = getConfigVersion() + 1L;
+    versionDb.put(bytes(CONF_VERSION_KEY),
+        bytes(String.valueOf(configVersion)));
+  }
+
   @Override
   public long getConfigVersion() {
-    String version = new String(versiondb.get(bytes(CONF_VERSION_KEY)),
+    String version = new String(versionDb.get(bytes(CONF_VERSION_KEY)),
         StandardCharsets.UTF_8);
     return Long.parseLong(version);
   }
@@ -305,18 +313,15 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
     return null; // unimplemented
   }
 
-  // TODO below was taken from LeveldbRMStateStore, it can probably be
-  // refactored
   private void startCompactionTimer() {
     if (compactionIntervalMsec > 0) {
-      compactionTimer = new Timer(
+      Timer compactionTimer = new Timer(
           this.getClass().getSimpleName() + " compaction timer", true);
       compactionTimer.schedule(new CompactionTimerTask(),
           compactionIntervalMsec, compactionIntervalMsec);
     }
   }
 
-  // TODO: following is taken from LeveldbRMStateStore
   @Override
   public Version getConfStoreVersion() throws Exception {
     Version version = null;
@@ -370,7 +375,7 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
         LOG.error("Error compacting database", e);
       }
       long duration = Time.monotonicNow() - start;
-      LOG.info("Full compaction cycle completed in " + duration + " msec");
+      LOG.info("Full compaction cycle completed in {} msec", duration);
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org