You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sn...@apache.org on 2020/04/24 15:09:44 UTC

[hadoop] branch branch-3.3 updated: YARN-10189. Code cleanup in LeveldbRMStateStore. Contributed by Benjamin Teke

This is an automated email from the ASF dual-hosted git repository.

snemeth pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 7a3f190  YARN-10189. Code cleanup in LeveldbRMStateStore. Contributed by Benjamin Teke
7a3f190 is described below

commit 7a3f190d89b1af0169eb25b4a58c4bf0eb4d75c9
Author: Szilard Nemeth <sn...@apache.org>
AuthorDate: Fri Apr 24 17:09:35 2020 +0200

    YARN-10189. Code cleanup in LeveldbRMStateStore. Contributed by Benjamin Teke
---
 .../yarn/server/resourcemanager/DBManager.java     | 131 +++++++++++++++
 .../recovery/LeveldbRMStateStore.java              | 176 ++++-----------------
 .../capacity/conf/LeveldbConfigurationStore.java   | 105 +++---------
 .../recovery/TestLeveldbRMStateStore.java          |  21 ++-
 4 files changed, 197 insertions(+), 236 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DBManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DBManager.java
new file mode 100644
index 0000000..13529ba
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DBManager.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.fusesource.leveldbjni.internal.NativeDB;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBException;
+import org.iq80.leveldb.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.function.Consumer;
+
+import static org.fusesource.leveldbjni.JniDBFactory.bytes;
+
+public class DBManager implements Closeable {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(DBManager.class);
+  private DB db;
+  private Timer compactionTimer;
+
+  public DB initDatabase(File configurationFile, Options options,
+                         Consumer<DB> initMethod) throws Exception {
+    try {
+      db = JniDBFactory.factory.open(configurationFile, options);
+    } catch (NativeDB.DBException e) {
+      if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
+        LOG.info("Creating configuration version/database at {}",
+            configurationFile);
+        options.createIfMissing(true);
+        try {
+          db = JniDBFactory.factory.open(configurationFile, options);
+          initMethod.accept(db);
+        } catch (DBException dbErr) {
+          throw new IOException(dbErr.getMessage(), dbErr);
+        }
+      } else {
+        throw e;
+      }
+    }
+
+    return db;
+  }
+
+  public void close() throws IOException {
+    if (compactionTimer != null) {
+      compactionTimer.cancel();
+      compactionTimer = null;
+    }
+    if (db != null) {
+      db.close();
+      db = null;
+    }
+  }
+
+  public void storeVersion(String versionKey, Version versionValue) {
+    byte[] data = ((VersionPBImpl) versionValue).getProto().toByteArray();
+    db.put(bytes(versionKey), data);
+  }
+
+  public Version loadVersion(String versionKey) throws Exception {
+    Version version = null;
+    try {
+      byte[] data = db.get(bytes(versionKey));
+      if (data != null) {
+        version = new VersionPBImpl(YarnServerCommonProtos.VersionProto
+            .parseFrom(data));
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+    return version;
+  }
+
+  @VisibleForTesting
+  public void setDb(DB db) {
+    this.db = db;
+  }
+
+  public void startCompactionTimer(long compactionIntervalMsec,
+                                    String className) {
+    if (compactionIntervalMsec > 0) {
+      compactionTimer = new Timer(
+          className + " compaction timer", true);
+      compactionTimer.schedule(new CompactionTimerTask(),
+          compactionIntervalMsec, compactionIntervalMsec);
+    }
+  }
+
+  private class CompactionTimerTask extends TimerTask {
+    @Override
+    public void run() {
+      long start = Time.monotonicNow();
+      LOG.info("Starting full compaction cycle");
+      try {
+        db.compactRange(null, null);
+      } catch (DBException e) {
+        LOG.error("Error compacting database", e);
+      }
+      long duration = Time.monotonicNow() - start;
+      LOG.info("Full compaction cycle completed in " + duration + " msec");
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
index 2420735..2f4d9be 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
@@ -31,9 +31,8 @@ import java.security.PrivateKey;
 import java.security.cert.X509Certificate;
 import java.util.HashMap;
 import java.util.Map.Entry;
-import java.util.Timer;
-import java.util.TimerTask;
 
+import org.apache.hadoop.yarn.server.resourcemanager.DBManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -42,13 +41,11 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
-import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto;
@@ -56,7 +53,6 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.Appl
 import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.records.Version;
-import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
@@ -66,8 +62,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AM
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
-import org.fusesource.leveldbjni.JniDBFactory;
-import org.fusesource.leveldbjni.internal.NativeDB;
 import org.iq80.leveldb.DB;
 import org.iq80.leveldb.DBException;
 import org.iq80.leveldb.Options;
@@ -100,7 +94,7 @@ public class LeveldbRMStateStore extends RMStateStore {
       .newInstance(1, 1);
 
   private DB db;
-  private Timer compactionTimer;
+  private DBManager dbManager = new DBManager();
   private long compactionIntervalMsec;
 
   private String getApplicationNodeKey(ApplicationId appId) {
@@ -140,7 +134,7 @@ public class LeveldbRMStateStore extends RMStateStore {
   }
 
   @Override
-  protected void initInternal(Configuration conf) throws Exception {
+  protected void initInternal(Configuration conf) {
     compactionIntervalMsec = conf.getLong(
         YarnConfiguration.RM_LEVELDB_COMPACTION_INTERVAL_SECS,
         YarnConfiguration.DEFAULT_RM_LEVELDB_COMPACTION_INTERVAL_SECS) * 1000;
@@ -165,55 +159,20 @@ public class LeveldbRMStateStore extends RMStateStore {
 
   @Override
   protected void startInternal() throws Exception {
-    db = openDatabase();
-    startCompactionTimer();
-  }
-
-  protected DB openDatabase() throws Exception {
     Path storeRoot = createStorageDir();
     Options options = new Options();
     options.createIfMissing(false);
     LOG.info("Using state database at " + storeRoot + " for recovery");
     File dbfile = new File(storeRoot.toString());
-    try {
-      db = JniDBFactory.factory.open(dbfile, options);
-    } catch (NativeDB.DBException e) {
-      if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
-        LOG.info("Creating state database at " + dbfile);
-        options.createIfMissing(true);
-        try {
-          db = JniDBFactory.factory.open(dbfile, options);
-          // store version
-          storeVersion();
-        } catch (DBException dbErr) {
-          throw new IOException(dbErr.getMessage(), dbErr);
-        }
-      } else {
-        throw e;
-      }
-    }
-    return db;
-  }
-
-  private void startCompactionTimer() {
-    if (compactionIntervalMsec > 0) {
-      compactionTimer = new Timer(
-          this.getClass().getSimpleName() + " compaction timer", true);
-      compactionTimer.schedule(new CompactionTimerTask(),
-          compactionIntervalMsec, compactionIntervalMsec);
-    }
+    db = dbManager.initDatabase(dbfile, options, (database) ->
+        storeVersion(CURRENT_VERSION_INFO));
+    dbManager.startCompactionTimer(compactionIntervalMsec,
+        this.getClass().getSimpleName());
   }
 
   @Override
   protected void closeInternal() throws Exception {
-    if (compactionTimer != null) {
-      compactionTimer.cancel();
-      compactionTimer = null;
-    }
-    if (db != null) {
-      db.close();
-      db = null;
-    }
+    dbManager.close();
   }
 
   @VisibleForTesting
@@ -228,33 +187,22 @@ public class LeveldbRMStateStore extends RMStateStore {
 
   @Override
   protected Version loadVersion() throws Exception {
-    Version version = null;
-    try {
-      byte[] data = db.get(bytes(VERSION_NODE));
-      if (data != null) {
-        version = new VersionPBImpl(VersionProto.parseFrom(data));
-      }
-    } catch (DBException e) {
-      throw new IOException(e);
-    }
-    return version;
+    return dbManager.loadVersion(VERSION_NODE);
   }
 
   @Override
   protected void storeVersion() throws Exception {
-    dbStoreVersion(CURRENT_VERSION_INFO);
-  }
-
-  void dbStoreVersion(Version state) throws IOException {
-    String key = VERSION_NODE;
-    byte[] data = ((VersionPBImpl) state).getProto().toByteArray();
     try {
-      db.put(bytes(key), data);
+      storeVersion(CURRENT_VERSION_INFO);
     } catch (DBException e) {
       throw new IOException(e);
     }
   }
 
+  protected void storeVersion(Version version) {
+    dbManager.storeVersion(VERSION_NODE, version);
+  }
+
   @Override
   protected Version getCurrentVersion() {
     return CURRENT_VERSION_INFO;
@@ -290,9 +238,7 @@ public class LeveldbRMStateStore extends RMStateStore {
 
   private void loadReservationState(RMState rmState) throws IOException {
     int numReservations = 0;
-    LeveldbIterator iter = null;
-    try {
-      iter = new LeveldbIterator(db);
+    try (LeveldbIterator iter = new LeveldbIterator(db)) {
       iter.seek(bytes(RM_RESERVATION_KEY_PREFIX));
       while (iter.hasNext()) {
         Entry<byte[],byte[]> entry = iter.next();
@@ -324,10 +270,6 @@ public class LeveldbRMStateStore extends RMStateStore {
       }
     } catch (DBException e) {
       throw new IOException(e);
-    } finally {
-      if (iter != null) {
-        iter.close();
-      }
     }
     LOG.info("Recovered " + numReservations + " reservations");
   }
@@ -342,9 +284,7 @@ public class LeveldbRMStateStore extends RMStateStore {
 
   private int loadRMDTSecretManagerKeys(RMState state) throws IOException {
     int numKeys = 0;
-    LeveldbIterator iter = null;
-    try {
-      iter = new LeveldbIterator(db);
+    try (LeveldbIterator iter = new LeveldbIterator(db)) {
       iter.seek(bytes(RM_DT_MASTER_KEY_KEY_PREFIX));
       while (iter.hasNext()) {
         Entry<byte[],byte[]> entry = iter.next();
@@ -361,10 +301,6 @@ public class LeveldbRMStateStore extends RMStateStore {
       }
     } catch (DBException e) {
       throw new IOException(e);
-    } finally {
-      if (iter != null) {
-        iter.close();
-      }
     }
     return numKeys;
   }
@@ -382,9 +318,7 @@ public class LeveldbRMStateStore extends RMStateStore {
 
   private int loadRMDTSecretManagerTokens(RMState state) throws IOException {
     int numTokens = 0;
-    LeveldbIterator iter = null;
-    try {
-      iter = new LeveldbIterator(db);
+    try (LeveldbIterator iter = new LeveldbIterator(db)) {
       iter.seek(bytes(RM_DT_TOKEN_KEY_PREFIX));
       while (iter.hasNext()) {
         Entry<byte[],byte[]> entry = iter.next();
@@ -404,17 +338,13 @@ public class LeveldbRMStateStore extends RMStateStore {
       }
     } catch (DBException e) {
       throw new IOException(e);
-    } finally {
-      if (iter != null) {
-        iter.close();
-      }
     }
     return numTokens;
   }
 
   private RMDelegationTokenIdentifierData loadDelegationToken(byte[] data)
       throws IOException {
-    RMDelegationTokenIdentifierData tokenData = null;
+    RMDelegationTokenIdentifierData tokenData;
     DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
     try {
       tokenData = RMStateStoreUtils.readRMDelegationTokenIdentifierData(in);
@@ -426,7 +356,7 @@ public class LeveldbRMStateStore extends RMStateStore {
 
   private void loadRMDTSecretManagerTokenSequenceNumber(RMState state)
       throws IOException {
-    byte[] data = null;
+    byte[] data;
     try {
       data = db.get(bytes(RM_DT_SEQUENCE_NUMBER_KEY));
     } catch (DBException e) {
@@ -445,9 +375,7 @@ public class LeveldbRMStateStore extends RMStateStore {
   private void loadRMApps(RMState state) throws IOException {
     int numApps = 0;
     int numAppAttempts = 0;
-    LeveldbIterator iter = null;
-    try {
-      iter = new LeveldbIterator(db);
+    try (LeveldbIterator iter = new LeveldbIterator(db)) {
       iter.seek(bytes(RM_APP_KEY_PREFIX));
       while (iter.hasNext()) {
         Entry<byte[],byte[]> entry = iter.next();
@@ -467,10 +395,6 @@ public class LeveldbRMStateStore extends RMStateStore {
       }
     } catch (DBException e) {
       throw new IOException(e);
-    } finally {
-      if (iter != null) {
-        iter.close();
-      }
     }
     LOG.info("Recovered " + numApps + " applications and " + numAppAttempts
         + " application attempts");
@@ -523,7 +447,7 @@ public class LeveldbRMStateStore extends RMStateStore {
   @VisibleForTesting
   ApplicationStateData loadRMAppState(ApplicationId appId) throws IOException {
     String appKey = getApplicationNodeKey(appId);
-    byte[] data = null;
+    byte[] data;
     try {
       data = db.get(bytes(appKey));
     } catch (DBException e) {
@@ -539,7 +463,7 @@ public class LeveldbRMStateStore extends RMStateStore {
   ApplicationAttemptStateData loadRMAppAttemptState(
       ApplicationAttemptId attemptId) throws IOException {
     String attemptKey = getApplicationAttemptNodeKey(attemptId);
-    byte[] data = null;
+    byte[] data;
     try {
       data = db.get(bytes(attemptKey));
     } catch (DBException e) {
@@ -668,8 +592,7 @@ public class LeveldbRMStateStore extends RMStateStore {
         appState.getApplicationSubmissionContext().getApplicationId();
     String appKey = getApplicationNodeKey(appId);
     try {
-      WriteBatch batch = db.createWriteBatch();
-      try {
+      try (WriteBatch batch = db.createWriteBatch()) {
         batch.delete(bytes(appKey));
         for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
           String attemptKey = getApplicationAttemptNodeKey(appKey, attemptId);
@@ -680,8 +603,6 @@ public class LeveldbRMStateStore extends RMStateStore {
               + appState.attempts.size() + " attempts" + " at " + appKey);
         }
         db.write(batch);
-      } finally {
-        batch.close();
       }
     } catch (DBException e) {
       throw new IOException(e);
@@ -693,16 +614,13 @@ public class LeveldbRMStateStore extends RMStateStore {
       ReservationAllocationStateProto reservationAllocation, String planName,
       String reservationIdName) throws Exception {
     try {
-      WriteBatch batch = db.createWriteBatch();
-      try {
+      try (WriteBatch batch = db.createWriteBatch()) {
         String key = getReservationNodeKey(planName, reservationIdName);
         LOG.debug("Storing state for reservation {} plan {} at {}",
             reservationIdName, planName, key);
 
         batch.put(bytes(key), reservationAllocation.toByteArray());
         db.write(batch);
-      } finally {
-        batch.close();
       }
     } catch (DBException e) {
       throw new IOException(e);
@@ -713,16 +631,13 @@ public class LeveldbRMStateStore extends RMStateStore {
   protected void removeReservationState(String planName,
       String reservationIdName) throws Exception {
     try {
-      WriteBatch batch = db.createWriteBatch();
-      try {
+      try (WriteBatch batch = db.createWriteBatch()) {
         String reservationKey =
             getReservationNodeKey(planName, reservationIdName);
         batch.delete(bytes(reservationKey));
         LOG.debug("Removing state for reservation {} plan {} at {}",
             reservationIdName, planName, reservationKey);
         db.write(batch);
-      } finally {
-        batch.close();
       }
     } catch (DBException e) {
       throw new IOException(e);
@@ -736,10 +651,9 @@ public class LeveldbRMStateStore extends RMStateStore {
         new RMDelegationTokenIdentifierData(tokenId, renewDate);
     LOG.debug("Storing token to {}", tokenKey);
     try {
-      WriteBatch batch = db.createWriteBatch();
-      try {
+      try (WriteBatch batch = db.createWriteBatch()) {
         batch.put(bytes(tokenKey), tokenData.toByteArray());
-        if(!isUpdate) {
+        if (!isUpdate) {
           ByteArrayOutputStream bs = new ByteArrayOutputStream();
           try (DataOutputStream ds = new DataOutputStream(bs)) {
             ds.writeInt(tokenId.getSequenceNumber());
@@ -749,8 +663,6 @@ public class LeveldbRMStateStore extends RMStateStore {
           batch.put(bytes(RM_DT_SEQUENCE_NUMBER_KEY), bs.toByteArray());
         }
         db.write(batch);
-      } finally {
-        batch.close();
       }
     } catch (DBException e) {
       throw new IOException(e);
@@ -789,11 +701,8 @@ public class LeveldbRMStateStore extends RMStateStore {
     String dbKey = getRMDTMasterKeyNodeKey(masterKey);
     LOG.debug("Storing token master key to {}", dbKey);
     ByteArrayOutputStream os = new ByteArrayOutputStream();
-    DataOutputStream out = new DataOutputStream(os);
-    try {
+    try (DataOutputStream out = new DataOutputStream(os)) {
       masterKey.write(out);
-    } finally {
-      out.close();
     }
     try {
       db.put(bytes(dbKey), os.toByteArray());
@@ -833,13 +742,10 @@ public class LeveldbRMStateStore extends RMStateStore {
     String caPrivateKeyKey = getProxyCAPrivateKeyNodeKey();
 
     try {
-      WriteBatch batch = db.createWriteBatch();
-      try {
+      try (WriteBatch batch = db.createWriteBatch()) {
         batch.put(bytes(caCertKey), caCertData);
         batch.put(bytes(caPrivateKeyKey), caPrivateKeyData);
         db.write(batch);
-      } finally {
-        batch.close();
       }
     } catch (DBException e) {
       throw new IOException(e);
@@ -871,9 +777,7 @@ public class LeveldbRMStateStore extends RMStateStore {
   @VisibleForTesting
   int getNumEntriesInDatabase() throws IOException {
     int numEntries = 0;
-    LeveldbIterator iter = null;
-    try {
-      iter = new LeveldbIterator(db);
+    try (LeveldbIterator iter = new LeveldbIterator(db)) {
       iter.seekToFirst();
       while (iter.hasNext()) {
         Entry<byte[], byte[]> entry = iter.next();
@@ -882,26 +786,12 @@ public class LeveldbRMStateStore extends RMStateStore {
       }
     } catch (DBException e) {
       throw new IOException(e);
-    } finally {
-      if (iter != null) {
-        iter.close();
-      }
     }
     return numEntries;
   }
 
-  private class CompactionTimerTask extends TimerTask {
-    @Override
-    public void run() {
-      long start = Time.monotonicNow();
-      LOG.info("Starting full compaction cycle");
-      try {
-        db.compactRange(null, null);
-      } catch (DBException e) {
-        LOG.error("Error compacting database", e);
-      }
-      long duration = Time.monotonicNow() - start;
-      LOG.info("Full compaction cycle completed in " + duration + " msec");
-    }
+  @VisibleForTesting
+  protected void setDbManager(DBManager dbManager) {
+    this.dbManager = dbManager;
   }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
index 8f5dc6a..3fbdb30 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
@@ -19,20 +19,16 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.server.resourcemanager.DBManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
 import org.apache.hadoop.yarn.server.records.Version;
-import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.fusesource.leveldbjni.JniDBFactory;
-import org.fusesource.leveldbjni.internal.NativeDB;
 import org.iq80.leveldb.DB;
 import org.iq80.leveldb.DBComparator;
 import org.iq80.leveldb.DBException;
@@ -52,9 +48,6 @@ import java.nio.charset.StandardCharsets;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.function.Consumer;
 
 import static org.fusesource.leveldbjni.JniDBFactory.bytes;
 
@@ -72,6 +65,8 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
   private static final String CONF_VERSION_NAME = "conf-version-store";
   private static final String CONF_VERSION_KEY = "conf-version";
   private DB db;
+  private DBManager dbManager;
+  private DBManager versionDbManager;
   private DB versionDb;
   private long maxLogs;
   private Configuration conf;
@@ -79,23 +74,25 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
   @VisibleForTesting
   protected static final Version CURRENT_VERSION_INFO = Version
       .newInstance(0, 1);
-  private long compactionIntervalMsec;
 
   @Override
   public void initialize(Configuration config, Configuration schedConf,
       RMContext rmContext) throws IOException {
     this.conf = config;
     this.initSchedConf = schedConf;
+    this.dbManager = new DBManager();
+    this.versionDbManager = new DBManager();
     try {
       initDatabase();
       this.maxLogs = config.getLong(
           YarnConfiguration.RM_SCHEDCONF_MAX_LOGS,
           YarnConfiguration.DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS);
-      this.compactionIntervalMsec = config.getLong(
+      long compactionIntervalMsec = config.getLong(
           YarnConfiguration.RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS,
           YarnConfiguration
               .DEFAULT_RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS) * 1000;
-      startCompactionTimer();
+      dbManager.startCompactionTimer(compactionIntervalMsec,
+          this.getClass().getSimpleName());
     } catch (Exception e) {
       throw new IOException(e);
     }
@@ -114,7 +111,7 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
     confOptions.createIfMissing(false);
     File confVersionFile = new File(confVersion.toString());
 
-    versionDb = initDatabaseHelper(confVersionFile, confOptions,
+    versionDb = versionDbManager.initDatabase(confVersionFile, confOptions,
         this::initVersionDb);
 
     Path storeRoot = createStorageDir(DB_NAME);
@@ -154,7 +151,7 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
     });
     LOG.info("Using conf database at {}", storeRoot);
     File dbFile = new File(storeRoot.toString());
-    db = initDatabaseHelper(dbFile, options, this::initDb);
+    db = dbManager.initDatabase(dbFile, options, this::initDb);
   }
 
   private void initVersionDb(DB database) {
@@ -170,30 +167,6 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
     increaseConfigVersion();
   }
 
-  private DB initDatabaseHelper(File configurationFile, Options options,
-      Consumer<DB> initMethod) throws Exception {
-    DB database;
-    try {
-      database = JniDBFactory.factory.open(configurationFile, options);
-    } catch (NativeDB.DBException e) {
-      if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
-        LOG.info("Creating configuration version/database at {}",
-            configurationFile);
-        options.createIfMissing(true);
-        try {
-          database = JniDBFactory.factory.open(configurationFile, options);
-          initMethod.accept(database);
-        } catch (DBException dbErr) {
-          throw new IOException(dbErr.getMessage(), dbErr);
-        }
-      } else {
-        throw e;
-      }
-    }
-
-    return database;
-  }
-
   private Path createStorageDir(String storageName) throws IOException {
     Path root = getStorageDir(storageName);
     FileSystem fs = FileSystem.getLocal(conf);
@@ -212,12 +185,8 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
 
   @Override
   public void close() throws IOException {
-    if (db != null) {
-      db.close();
-    }
-    if (versionDb != null) {
-      versionDb.close();
-    }
+    dbManager.close();
+    versionDbManager.close();
   }
 
   @Override
@@ -313,28 +282,9 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
     return null; // unimplemented
   }
 
-  private void startCompactionTimer() {
-    if (compactionIntervalMsec > 0) {
-      Timer compactionTimer = new Timer(
-          this.getClass().getSimpleName() + " compaction timer", true);
-      compactionTimer.schedule(new CompactionTimerTask(),
-          compactionIntervalMsec, compactionIntervalMsec);
-    }
-  }
-
   @Override
   public Version getConfStoreVersion() throws Exception {
-    Version version = null;
-    try {
-      byte[] data = db.get(bytes(VERSION_KEY));
-      if (data != null) {
-        version = new VersionPBImpl(YarnServerCommonProtos.VersionProto
-            .parseFrom(data));
-      }
-    } catch (DBException e) {
-      throw new IOException(e);
-    }
-    return version;
+    return dbManager.loadVersion(VERSION_KEY);
   }
 
   @VisibleForTesting
@@ -350,37 +300,20 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
 
   @Override
   public void storeVersion() throws Exception {
-    storeVersion(CURRENT_VERSION_INFO);
-  }
-
-  @VisibleForTesting
-  protected void storeVersion(Version version) throws Exception {
-    byte[] data = ((VersionPBImpl) version).getProto()
-        .toByteArray();
     try {
-      db.put(bytes(VERSION_KEY), data);
+      storeVersion(CURRENT_VERSION_INFO);
     } catch (DBException e) {
       throw new IOException(e);
     }
   }
 
+  @VisibleForTesting
+  protected void storeVersion(Version version) {
+    dbManager.storeVersion(VERSION_KEY, version);
+  }
+
   @Override
   public Version getCurrentVersion() {
     return CURRENT_VERSION_INFO;
   }
-
-  private class CompactionTimerTask extends TimerTask {
-    @Override
-    public void run() {
-      long start = Time.monotonicNow();
-      LOG.info("Starting full compaction cycle");
-      try {
-        db.compactRange(null, null);
-      } catch (DBException e) {
-        LOG.error("Error compacting database", e);
-      }
-      long duration = Time.monotonicNow() - start;
-      LOG.info("Full compaction cycle completed in {} msec", duration);
-    }
-  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
index 7a4ead4..e93599d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
@@ -25,14 +25,17 @@ import static org.mockito.Mockito.verify;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.function.Consumer;
 
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.resourcemanager.DBManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.fusesource.leveldbjni.JniDBFactory;
 import org.iq80.leveldb.DB;
+import org.iq80.leveldb.Options;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -131,19 +134,23 @@ public class TestLeveldbRMStateStore extends RMStateStoreTestBase {
   }
 
   @Test(timeout = 60000)
-  public void testCompactionCycle() throws Exception {
+  public void testCompactionCycle() {
     final DB mockdb = mock(DB.class);
     conf.setLong(YarnConfiguration.RM_LEVELDB_COMPACTION_INTERVAL_SECS, 1);
-    stateStore = new LeveldbRMStateStore() {
+    stateStore = new LeveldbRMStateStore();
+    DBManager dbManager = new DBManager() {
       @Override
-      protected DB openDatabase() throws Exception {
+      public DB initDatabase(File configurationFile, Options options,
+                             Consumer<DB> initMethod) {
         return mockdb;
       }
     };
+    dbManager.setDb(mockdb);
+    stateStore.setDbManager(dbManager);
     stateStore.init(conf);
     stateStore.start();
     verify(mockdb, timeout(10000)).compactRange(
-        (byte[]) isNull(), (byte[]) isNull());
+        isNull(), isNull());
   }
 
   @Test
@@ -181,12 +188,12 @@ public class TestLeveldbRMStateStore extends RMStateStoreTestBase {
     }
 
     @Override
-    public void writeVersion(Version version) throws Exception {
-      stateStore.dbStoreVersion(version);
+    public void writeVersion(Version version) {
+      stateStore.storeVersion(version);
     }
 
     @Override
-    public Version getCurrentVersion() throws Exception {
+    public Version getCurrentVersion() {
       return stateStore.getCurrentVersion();
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org