You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ww...@apache.org on 2017/07/16 01:34:34 UTC

[2/2] hadoop git commit: HDFS-12069. Ozone: Create a general abstraction for metadata store. Contributed by Weiwei Yang.

HDFS-12069. Ozone: Create a general abstraction for metadata store. Contributed by Weiwei Yang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8f122a75
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8f122a75
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8f122a75

Branch: refs/heads/HDFS-7240
Commit: 8f122a75054163a2bb0bb8310c2b75016fdf1c3e
Parents: 90f1d58
Author: Weiwei Yang <ww...@apache.org>
Authored: Sun Jul 16 09:34:02 2017 +0800
Committer: Weiwei Yang <ww...@apache.org>
Committed: Sun Jul 16 09:34:02 2017 +0800

----------------------------------------------------------------------
 .../apache/hadoop/ozone/OzoneConfigKeys.java    |   9 +
 .../common/helpers/ContainerUtils.java          |  16 +-
 .../container/common/helpers/KeyUtils.java      |  20 +-
 .../common/impl/ContainerManagerImpl.java       |   6 +-
 .../container/common/impl/KeyManagerImpl.java   |  57 ++--
 .../container/common/interfaces/KeyManager.java |  13 +-
 .../container/common/utils/ContainerCache.java  |  14 +-
 .../hadoop/ozone/ksm/MetadataManager.java       |  27 +-
 .../hadoop/ozone/ksm/MetadataManagerImpl.java   | 146 ++++-----
 .../hadoop/ozone/ksm/VolumeManagerImpl.java     |  50 ++--
 .../ozone/scm/block/BlockManagerImpl.java       |  67 +++--
 .../org/apache/hadoop/ozone/scm/cli/SQLCLI.java | 112 +++----
 .../ozone/scm/container/ContainerMapping.java   |  19 +-
 .../ContainerReplicationManager.java            |  14 +-
 .../ozone/scm/exceptions/SCMException.java      |   3 +-
 .../hadoop/ozone/scm/node/NodePoolManager.java  |   5 +-
 .../hadoop/ozone/scm/node/SCMNodeManager.java   |  14 +-
 .../ozone/scm/node/SCMNodePoolManager.java      |  53 ++--
 .../web/localstorage/OzoneMetadataManager.java  | 133 ++++-----
 .../org/apache/hadoop/utils/BatchOperation.java |  90 ++++++
 .../org/apache/hadoop/utils/EntryConsumer.java  |  38 +++
 .../apache/hadoop/utils/LevelDBKeyFilters.java  |  65 ----
 .../org/apache/hadoop/utils/LevelDBStore.java   | 182 +++++++-----
 .../apache/hadoop/utils/MetadataKeyFilters.java |  65 ++++
 .../org/apache/hadoop/utils/MetadataStore.java  | 152 ++++++++++
 .../hadoop/utils/MetadataStoreBuilder.java      |  96 ++++++
 .../src/main/resources/ozone-default.xml        |  11 +
 .../apache/hadoop/ozone/TestLevelDBStore.java   | 165 -----------
 .../apache/hadoop/ozone/TestMetadataStore.java  | 296 +++++++++++++++++++
 .../common/impl/TestContainerPersistence.java   |  12 +-
 .../TestContainerReplicationManager.java        |   5 +-
 .../hadoop/ozone/ksm/TestKeySpaceManager.java   |   2 +
 32 files changed, 1247 insertions(+), 710 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index cebb1b0..1822a2a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -61,6 +61,15 @@ public final class OzoneConfigKeys {
   public static final String OZONE_CONTAINER_METADATA_DIRS =
       "ozone.container.metadata.dirs";
 
+  public static final String OZONE_METADATA_STORE_IMPL =
+      "ozone.metastore.impl";
+  public static final String OZONE_METADATA_STORE_IMPL_LEVELDB =
+      "LevelDB";
+  public static final String OZONE_METADATA_STORE_IMPL_ROCKSDB =
+      "RocksDB";
+  public static final String OZONE_METADATA_STORE_IMPL_DEFAULT =
+      OZONE_METADATA_STORE_IMPL_LEVELDB;
+
   public static final String OZONE_KEY_CACHE = "ozone.key.cache.size";
   public static final int OZONE_KEY_CACHE_DEFAULT = 1024;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
index 7d0e756..0a360e2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
@@ -27,8 +27,9 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
-import org.apache.hadoop.utils.LevelDBStore;
 import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -233,7 +234,8 @@ public final class ContainerUtils {
    * @param containerPath - Container Path.
    * @throws IOException
    */
-  public static Path createMetadata(Path containerPath) throws IOException {
+  public static Path createMetadata(Path containerPath, Configuration conf)
+      throws IOException {
     Logger log = LoggerFactory.getLogger(ContainerManagerImpl.class);
     Preconditions.checkNotNull(containerPath);
     Path metadataPath = containerPath.resolve(OzoneConsts.CONTAINER_META_PATH);
@@ -243,9 +245,11 @@ public final class ContainerUtils {
       throw new IOException("Unable to create directory for metadata storage." +
           " Path: " + metadataPath);
     }
-    LevelDBStore store =
-        new LevelDBStore(metadataPath.resolve(OzoneConsts.CONTAINER_DB)
-            .toFile(), true);
+    MetadataStore store = MetadataStoreBuilder.newBuilder()
+        .setConf(conf)
+        .setCreateIfMissing(true)
+        .setDbFile(metadataPath.resolve(OzoneConsts.CONTAINER_DB).toFile())
+        .build();
 
     // we close since the SCM pre-creates containers.
     // we will open and put Db handle into a cache when keys are being created
@@ -347,7 +351,7 @@ public final class ContainerUtils {
     Preconditions.checkNotNull(containerData);
     Path dbPath = Paths.get(containerData.getDBPath());
 
-    LevelDBStore db = KeyUtils.getDB(containerData, conf);
+    MetadataStore db = KeyUtils.getDB(containerData, conf);
     // If the container is not empty and cannot be deleted forcibly,
     // then throw a SCE to stop deleting.
     if(!forceDelete && !db.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java
index 4eb8999..1a36b71 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java
@@ -23,7 +23,8 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
 import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.container.common.impl.KeyManagerImpl;
 import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
-import org.apache.hadoop.utils.LevelDBStore;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,20 +58,21 @@ public final class KeyUtils {
    *
    * @param container container.
    * @param conf configuration.
-   * @return LevelDB handle.
+   * @return MetadataStore handle.
    * @throws StorageContainerException
    */
-  public static LevelDBStore getDB(ContainerData container,
+  public static MetadataStore getDB(ContainerData container,
       Configuration conf) throws StorageContainerException {
     Preconditions.checkNotNull(container);
     ContainerCache cache = ContainerCache.getInstance(conf);
     Preconditions.checkNotNull(cache);
     try {
-      LevelDBStore db = cache.getDB(container.getContainerName());
+      MetadataStore db = cache.getDB(container.getContainerName());
       if (db == null) {
-        db = new LevelDBStore(
-            new File(container.getDBPath()),
-            false);
+        db = MetadataStoreBuilder.newBuilder()
+            .setDbFile(new File(container.getDBPath()))
+            .setCreateIfMissing(false)
+            .build();
         cache.putDB(container.getContainerName(), db);
       }
       return db;
@@ -103,10 +105,10 @@ public final class KeyUtils {
   @SuppressWarnings("unchecked")
   public static void shutdownCache(ContainerCache cache)  {
     Logger log = LoggerFactory.getLogger(KeyManagerImpl.class);
-    LevelDBStore[] handles = new LevelDBStore[cache.values().size()];
+    MetadataStore[] handles = new MetadataStore[cache.values().size()];
     cache.values().toArray(handles);
     Preconditions.checkState(handles.length == cache.values().size());
-    for (LevelDBStore db : handles) {
+    for (MetadataStore db : handles) {
       try {
         db.close();
       } catch (IOException ex) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
index f3e010d..b77ac55 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
@@ -41,7 +41,7 @@ import org.apache.hadoop.ozone.container.common.interfaces
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
 import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.utils.LevelDBStore;
+import org.apache.hadoop.utils.MetadataStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -335,7 +335,7 @@ public class ContainerManagerImpl implements ContainerManager {
         ContainerUtils.verifyIsNewContainer(containerFile, metadataFile);
         metadataPath = this.locationManager.getDataPath(
             containerData.getContainerName());
-        metadataPath = ContainerUtils.createMetadata(metadataPath);
+        metadataPath = ContainerUtils.createMetadata(metadataPath, conf);
       }  else {
         metadataPath = ContainerUtils.getMetadataDirectory(containerData);
       }
@@ -502,7 +502,7 @@ public class ContainerManagerImpl implements ContainerManager {
     ContainerData containerData = readContainer(containerName);
     containerData.closeContainer();
     writeContainerInfo(containerData, true);
-    LevelDBStore db = KeyUtils.getDB(containerData, conf);
+    MetadataStore db = KeyUtils.getDB(containerData, conf);
 
     // It is ok if this operation takes a bit of time.
     // Close container is not expected to be instantaneous.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java
index 2b64b71..7fff0a3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java
@@ -30,8 +30,9 @@ import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
 import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
-import org.apache.hadoop.utils.LevelDBKeyFilters.KeyPrefixFilter;
-import org.apache.hadoop.utils.LevelDBStore;
+import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
+import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
+import org.apache.hadoop.utils.MetadataStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,11 +42,8 @@ import java.util.List;
 import java.util.Map;
 
 import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
-    .Result.IO_EXCEPTION;
-import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
     .Result.NO_SUCH_KEY;
 
-
 /**
  * Key Manager impl.
  */
@@ -74,8 +72,7 @@ public class KeyManagerImpl implements KeyManager {
    * {@inheritDoc}
    */
   @Override
-  public void putKey(Pipeline pipeline, KeyData data)
-      throws StorageContainerException {
+  public void putKey(Pipeline pipeline, KeyData data) throws IOException {
     containerManager.readLock();
     try {
       // We are not locking the key manager since LevelDb serializes all actions
@@ -85,7 +82,7 @@ public class KeyManagerImpl implements KeyManager {
           "Container name cannot be null");
       ContainerData cData = containerManager.readContainer(
           pipeline.getContainerName());
-      LevelDBStore db = KeyUtils.getDB(cData, conf);
+      MetadataStore db = KeyUtils.getDB(cData, conf);
 
       // This is a post condition that acts as a hint to the user.
       // Should never fail.
@@ -102,7 +99,7 @@ public class KeyManagerImpl implements KeyManager {
    * {@inheritDoc}
    */
   @Override
-  public KeyData getKey(KeyData data) throws StorageContainerException {
+  public KeyData getKey(KeyData data) throws IOException {
     containerManager.readLock();
     try {
       Preconditions.checkNotNull(data, "Key data cannot be null");
@@ -110,7 +107,7 @@ public class KeyManagerImpl implements KeyManager {
           "Container name cannot be null");
       ContainerData cData = containerManager.readContainer(data
           .getContainerName());
-      LevelDBStore db = KeyUtils.getDB(cData, conf);
+      MetadataStore db = KeyUtils.getDB(cData, conf);
 
       // This is a post condition that acts as a hint to the user.
       // Should never fail.
@@ -124,8 +121,6 @@ public class KeyManagerImpl implements KeyManager {
       ContainerProtos.KeyData keyData =
           ContainerProtos.KeyData.parseFrom(kData);
       return KeyData.getFromProtoBuf(keyData);
-    } catch (IOException ex) {
-      throw new StorageContainerException(ex, IO_EXCEPTION);
     } finally {
       containerManager.readUnlock();
     }
@@ -136,7 +131,7 @@ public class KeyManagerImpl implements KeyManager {
    */
   @Override
   public void deleteKey(Pipeline pipeline, String keyName)
-      throws StorageContainerException {
+      throws IOException {
     containerManager.readLock();
     try {
       Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
@@ -144,7 +139,7 @@ public class KeyManagerImpl implements KeyManager {
           "Container name cannot be null");
       ContainerData cData = containerManager.readContainer(pipeline
           .getContainerName());
-      LevelDBStore db = KeyUtils.getDB(cData, conf);
+      MetadataStore db = KeyUtils.getDB(cData, conf);
 
       // This is a post condition that acts as a hint to the user.
       // Should never fail.
@@ -171,32 +166,28 @@ public class KeyManagerImpl implements KeyManager {
   @Override
   public List<KeyData> listKey(
       Pipeline pipeline, String prefix, String startKey, int count)
-      throws StorageContainerException {
+      throws IOException {
     Preconditions.checkNotNull(pipeline,
         "Pipeline cannot be null.");
     Preconditions.checkArgument(count > 0,
         "Count must be a positive number.");
     ContainerData cData = containerManager.readContainer(pipeline
         .getContainerName());
-    LevelDBStore db = KeyUtils.getDB(cData, conf);
-    try {
-      List<KeyData> result = new ArrayList<KeyData>();
-      byte[] startKeyInBytes = startKey == null ? null :
-          DFSUtil.string2Bytes(startKey);
-      KeyPrefixFilter prefixFilter = new KeyPrefixFilter(prefix);
-      List<Map.Entry<byte[], byte[]>> range =
-          db.getRangeKVs(startKeyInBytes, count, prefixFilter);
-      for(Map.Entry<byte[], byte[]> entry : range) {
-        String keyName = KeyUtils.getKeyName(entry.getKey());
-        KeyData value = KeyUtils.getKeyData(entry.getValue());
-        KeyData data = new KeyData(value.getContainerName(), keyName);
-        result.add(data);
-      }
-      return result;
-    } catch (IOException e) {
-      throw new StorageContainerException(e,
-          ContainerProtos.Result.IO_EXCEPTION);
+    MetadataStore db = KeyUtils.getDB(cData, conf);
+
+    List<KeyData> result = new ArrayList<KeyData>();
+    byte[] startKeyInBytes = startKey == null ? null :
+        DFSUtil.string2Bytes(startKey);
+    MetadataKeyFilter prefixFilter = new KeyPrefixFilter(prefix);
+    List<Map.Entry<byte[], byte[]>> range =
+        db.getRangeKVs(startKeyInBytes, count, prefixFilter);
+    for (Map.Entry<byte[], byte[]> entry : range) {
+      String keyName = KeyUtils.getKeyName(entry.getKey());
+      KeyData value = KeyUtils.getKeyData(entry.getValue());
+      KeyData data = new KeyData(value.getContainerName(), keyName);
+      result.add(data);
     }
+    return result;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java
index a362d07..a613d2a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.container.common.helpers.KeyData;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 
+import java.io.IOException;
 import java.util.List;
 
 /**
@@ -32,18 +33,18 @@ public interface KeyManager {
    *
    * @param pipeline - Pipeline.
    * @param data     - Key Data.
-   * @throws StorageContainerException
+   * @throws IOException
    */
-  void putKey(Pipeline pipeline, KeyData data) throws StorageContainerException;
+  void putKey(Pipeline pipeline, KeyData data) throws IOException;
 
   /**
    * Gets an existing key.
    *
    * @param data - Key Data.
    * @return Key Data.
-   * @throws StorageContainerException
+   * @throws IOException
    */
-  KeyData getKey(KeyData data) throws StorageContainerException;
+  KeyData getKey(KeyData data) throws IOException;
 
   /**
    * Deletes an existing Key.
@@ -53,7 +54,7 @@ public interface KeyManager {
    * @throws StorageContainerException
    */
   void deleteKey(Pipeline pipeline, String keyName)
-      throws StorageContainerException;
+      throws IOException;
 
   /**
    * List keys in a container.
@@ -65,7 +66,7 @@ public interface KeyManager {
    * @return List of Keys that match the criteria.
    */
   List<KeyData> listKey(Pipeline pipeline, String prefix, String startKey,
-      int count) throws StorageContainerException;
+      int count) throws IOException;
 
   /**
    * Shutdown keyManager.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java
index 81cdcf2..f4caad0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java
@@ -24,7 +24,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.utils.LevelDBStore;
+import org.apache.hadoop.utils.MetadataStore;
 
 import java.io.IOException;
 import java.util.concurrent.locks.Lock;
@@ -69,7 +69,7 @@ public final class ContainerCache extends LRUMap {
   protected boolean removeLRU(LinkEntry entry) {
     lock.lock();
     try {
-      LevelDBStore db = (LevelDBStore) entry.getValue();
+      MetadataStore db = (MetadataStore) entry.getValue();
       db.close();
     } catch (IOException e) {
       LOG.error("Error closing DB. Container: " + entry.getKey().toString(), e);
@@ -83,14 +83,14 @@ public final class ContainerCache extends LRUMap {
    * Returns a DB handle if available, null otherwise.
    *
    * @param containerName - Name of the container.
-   * @return OzoneLevelDBStore.
+   * @return MetadataStore.
    */
-  public LevelDBStore getDB(String containerName) {
+  public MetadataStore getDB(String containerName) {
     Preconditions.checkNotNull(containerName);
     Preconditions.checkState(!containerName.isEmpty());
     lock.lock();
     try {
-      return (LevelDBStore) this.get(containerName);
+      return (MetadataStore) this.get(containerName);
     } finally {
       lock.unlock();
     }
@@ -106,7 +106,7 @@ public final class ContainerCache extends LRUMap {
     Preconditions.checkState(!containerName.isEmpty());
     lock.lock();
     try {
-      LevelDBStore db = this.getDB(containerName);
+      MetadataStore db = this.getDB(containerName);
       if (db != null) {
         try {
           db.close();
@@ -126,7 +126,7 @@ public final class ContainerCache extends LRUMap {
    * @param containerName - Name of the container
    * @param db            - DB handle
    */
-  public void putDB(String containerName, LevelDBStore db) {
+  public void putDB(String containerName, MetadataStore db) {
     Preconditions.checkNotNull(containerName);
     Preconditions.checkState(!containerName.isEmpty());
     lock.lock();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java
index 1e42307..36e5b3a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java
@@ -19,10 +19,10 @@ package org.apache.hadoop.ozone.ksm;
 import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
 import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
 import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
+import org.apache.hadoop.utils.BatchOperation;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.locks.Lock;
 
 /**
@@ -56,36 +56,27 @@ public interface MetadataManager {
    * @param key - key
    * @return value
    */
-  byte[] get(byte[] key);
+  byte[] get(byte[] key) throws IOException;
 
   /**
    * Puts a Key into Metadata DB.
    * @param key   - key
    * @param value - value
    */
-  void put(byte[] key, byte[] value);
+  void put(byte[] key, byte[] value) throws IOException;
 
   /**
    * Deletes a Key from Metadata DB.
    * @param key   - key
    */
-  void delete(byte[] key);
+  void delete(byte[] key) throws IOException;
 
   /**
-   * Performs batch Put and Delete to Metadata DB.
-   * Can be used to do multiple puts and deletes atomically.
-   * @param putList - list of Key/Value to put into DB
-   * @param delList - list of Key to delete from DB
-   */
-  void batchPutDelete(List<Map.Entry<byte[], byte[]>> putList,
-                      List<byte[]> delList) throws IOException;
-
-  /**
-   * Performs a batch Put to Metadata DB.
-   * Can be used to do multiple puts atomically.
-   * @param putList - list of Key/Value to put into DB
+   * Atomic write a batch of operations.
+   * @param batch
+   * @throws IOException
    */
-  void batchPut(List<Map.Entry<byte[], byte[]>> putList) throws IOException;
+  void writeBatch(BatchOperation batch) throws IOException;
 
   /**
    * Given a volume return the corresponding DB key.
@@ -120,7 +111,7 @@ public interface MetadataManager {
    *
    * @param key - key name
    */
-  void deleteKey(byte[] key);
+  void deleteKey(byte[] key) throws IOException;
 
   /**
    * Given a volume, check if it is empty,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java
index dd3d524..7e48eda 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java
@@ -18,7 +18,7 @@ package org.apache.hadoop.ozone.ksm;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
-import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
 import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
@@ -32,12 +32,11 @@ import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyI
 import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeInfo;
 import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeList;
 import org.apache.hadoop.ozone.web.utils.OzoneUtils;
-import org.apache.hadoop.utils.LevelDBKeyFilters.KeyPrefixFilter;
-import org.apache.hadoop.utils.LevelDBKeyFilters.LevelDBKeyFilter;
-import org.apache.hadoop.utils.LevelDBStore;
-import org.iq80.leveldb.DBIterator;
-import org.iq80.leveldb.Options;
-import org.iq80.leveldb.WriteBatch;
+import org.apache.hadoop.utils.BatchOperation;
+import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
+import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
 
 import java.io.File;
 import java.io.IOException;
@@ -57,9 +56,9 @@ import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
 /**
  * KSM metadata manager interface.
  */
-public class MetadataManagerImpl implements  MetadataManager {
+public class MetadataManagerImpl implements MetadataManager {
 
-  private final LevelDBStore store;
+  private final MetadataStore store;
   private final ReadWriteLock lock;
 
 
@@ -67,10 +66,12 @@ public class MetadataManagerImpl implements  MetadataManager {
     File metaDir = OzoneUtils.getScmMetadirPath(conf);
     final int cacheSize = conf.getInt(OZONE_KSM_DB_CACHE_SIZE_MB,
         OZONE_KSM_DB_CACHE_SIZE_DEFAULT);
-    Options options = new Options();
-    options.cacheSize(cacheSize * OzoneConsts.MB);
     File ksmDBFile = new File(metaDir.getPath(), KSM_DB_NAME);
-    this.store = new LevelDBStore(ksmDBFile, options);
+    this.store = MetadataStoreBuilder.newBuilder()
+        .setConf(conf)
+        .setDbFile(ksmDBFile)
+        .setCacheSize(cacheSize * OzoneConsts.MB)
+        .build();
     this.lock = new ReentrantReadWriteLock();
   }
 
@@ -153,7 +154,7 @@ public class MetadataManagerImpl implements  MetadataManager {
    * @param key - key name
    */
   @Override
-  public void deleteKey(byte[] key) {
+  public void deleteKey(byte[] key) throws IOException {
     store.delete(key);
   }
 
@@ -181,7 +182,7 @@ public class MetadataManagerImpl implements  MetadataManager {
    * @return value
    */
   @Override
-  public byte[] get(byte[] key) {
+  public byte[] get(byte[] key) throws IOException {
     return store.get(key);
   }
 
@@ -191,7 +192,7 @@ public class MetadataManagerImpl implements  MetadataManager {
    * @param value - value
    */
   @Override
-  public void put(byte[] key, byte[] value) {
+  public void put(byte[] key, byte[] value) throws IOException {
     store.put(key, value);
   }
 
@@ -199,45 +200,13 @@ public class MetadataManagerImpl implements  MetadataManager {
    * Deletes a Key from Metadata DB.
    * @param key   - key
    */
-  public void delete(byte[] key) {
+  public void delete(byte[] key) throws IOException {
     store.delete(key);
   }
 
-  /**
-   * Performs a batch Put and Delete from Metadata DB.
-   * Can be used to do multiple puts and deletes atomically.
-   * @param putList - list of key and value pairs to put to Metadata DB.
-   * @param delList - list of keys to delete from Metadata DB.
-   */
   @Override
-  public void batchPutDelete(List<Map.Entry<byte[], byte[]>> putList,
-                             List<byte[]> delList)
-      throws IOException {
-    WriteBatch batch = store.createWriteBatch();
-    putList.forEach(entry -> batch.put(entry.getKey(), entry.getValue()));
-    delList.forEach(entry -> batch.delete(entry));
-    try {
-      store.commitWriteBatch(batch);
-    } finally {
-      store.closeWriteBatch(batch);
-    }
-  }
-
-  /**
-   * Performs a batch Put to Metadata DB.
-   * Can be used to do multiple puts atomically.
-   * @param list - list of Map.Entry
-   */
-  @Override
-  public void batchPut(List<Map.Entry<byte[], byte[]>> list)
-      throws IOException {
-    WriteBatch batch = store.createWriteBatch();
-    list.forEach(entry -> batch.put(entry.getKey(), entry.getValue()));
-    try {
-      store.commitWriteBatch(batch);
-    } finally {
-      store.closeWriteBatch(batch);
-    }
+  public void writeBatch(BatchOperation batch) throws IOException {
+    this.store.writeBatch(batch);
   }
 
   /**
@@ -246,21 +215,17 @@ public class MetadataManagerImpl implements  MetadataManager {
    * @return true if the volume is empty
    */
   public boolean isVolumeEmpty(String volume) throws IOException {
-    try (DBIterator iterator = store.getIterator()) {
-      String dbVolumeRootName = OzoneConsts.KSM_VOLUME_PREFIX + volume
-          + OzoneConsts.KSM_BUCKET_PREFIX;
-      byte[] dbVolumeRootKey = DFSUtil.string2Bytes(dbVolumeRootName);
-      // Seek to the root of the volume and look for the next key
-      iterator.seek(dbVolumeRootKey);
-      if (iterator.hasNext()) {
-        String firstBucketKey = DFSUtil.bytes2String(iterator.next().getKey());
-        // if the key starts with /<volume name>/
-        // then there is at least one bucket
-        return !firstBucketKey.startsWith(dbVolumeRootName);
-      } else {
-        return true;
-      }
+    String dbVolumeRootName = OzoneConsts.KSM_VOLUME_PREFIX + volume;
+    byte[] dbVolumeRootKey = DFSUtil.string2Bytes(dbVolumeRootName);
+    // Seek to the root of the volume and look for the next key
+    ImmutablePair<byte[], byte[]> volumeRoot =
+        store.peekAround(1, dbVolumeRootKey);
+    if (volumeRoot != null) {
+      String firstBucketKey = DFSUtil.bytes2String(volumeRoot.getKey());
+      return !firstBucketKey.startsWith(dbVolumeRootName
+          + OzoneConsts.KSM_BUCKET_PREFIX);
     }
+    return true;
   }
 
   /**
@@ -272,18 +237,15 @@ public class MetadataManagerImpl implements  MetadataManager {
    */
   public boolean isBucketEmpty(String volume, String bucket)
       throws IOException {
-    try (DBIterator iterator = store.getIterator()) {
-      String keyRootName = OzoneConsts.KSM_VOLUME_PREFIX + volume
-          + OzoneConsts.KSM_BUCKET_PREFIX + bucket
-          + OzoneConsts.KSM_KEY_PREFIX;
-      byte[] keyRoot = DFSUtil.string2Bytes(keyRootName);
-      iterator.seek(keyRoot);
-      if(iterator.hasNext()) {
-        return !DFSUtil.bytes2String(iterator.next().getKey())
-            .startsWith(keyRootName);
-      }
-      return true;
+    String keyRootName = OzoneConsts.KSM_VOLUME_PREFIX + volume
+        + OzoneConsts.KSM_BUCKET_PREFIX + bucket;
+    byte[] keyRoot = DFSUtil.string2Bytes(keyRootName);
+    ImmutablePair<byte[], byte[]> firstKey = store.peekAround(1, keyRoot);
+    if (firstKey != null) {
+      return !DFSUtil.bytes2String(firstKey.getKey())
+          .startsWith(keyRootName + OzoneConsts.KSM_KEY_PREFIX);
     }
+    return true;
   }
 
   /**
@@ -305,8 +267,19 @@ public class MetadataManagerImpl implements  MetadataManager {
           ResultCodes.FAILED_VOLUME_NOT_FOUND);
     }
 
-    LevelDBKeyFilter filter =
-        new KeyPrefixFilter(getBucketKeyPrefix(volumeName, bucketPrefix));
+
+    // A bucket must start with /volume/bucket_prefix
+    // and exclude keys /volume/bucket_xxx/key_xxx
+    MetadataKeyFilter filter = (preKey, currentKey, nextKey) -> {
+      if (currentKey != null) {
+        String bucketNamePrefix = getBucketKeyPrefix(volumeName, bucketPrefix);
+        String bucket = DFSUtil.bytes2String(currentKey);
+        return bucket.startsWith(bucketNamePrefix) &&
+            !bucket.replaceFirst(bucketNamePrefix, "")
+                .contains(OzoneConsts.KSM_KEY_PREFIX);
+      }
+      return false;
+    };
 
     List<Map.Entry<byte[], byte[]>> rangeResult;
     if (!Strings.isNullOrEmpty(startBucket)) {
@@ -349,7 +322,7 @@ public class MetadataManagerImpl implements  MetadataManager {
           ResultCodes.FAILED_BUCKET_NOT_FOUND);
     }
 
-    LevelDBKeyFilter filter =
+    MetadataKeyFilter filter =
         new KeyPrefixFilter(getKeyKeyPrefix(volumeName, bucketName, keyPrefix));
 
     List<Map.Entry<byte[], byte[]>> rangeResult;
@@ -427,18 +400,17 @@ public class MetadataManagerImpl implements  MetadataManager {
   private VolumeList getVolumesByUser(byte[] userNameKey)
       throws KSMException {
     VolumeList volumes = null;
-    byte[] volumesInBytes = store.get(userNameKey);
-    if (volumesInBytes == null) {
-      // No volume found for this user, return an empty list
-      return VolumeList.newBuilder().build();
-    }
-
     try {
+      byte[] volumesInBytes = store.get(userNameKey);
+      if (volumesInBytes == null) {
+        // No volume found for this user, return an empty list
+        return VolumeList.newBuilder().build();
+      }
       volumes = VolumeList.parseFrom(volumesInBytes);
-    } catch (InvalidProtocolBufferException e) {
+    } catch (IOException e) {
       throw new KSMException("Unable to get volumes info by the given user, "
-          + "metadata might be corrupted",
-          e, ResultCodes.FAILED_METADATA_ERROR);
+          + "metadata might be corrupted", e,
+          ResultCodes.FAILED_METADATA_ERROR);
     }
     return volumes;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
index 0757c9d..f4590a2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
@@ -26,15 +26,13 @@ import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.VolumeList;
 import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.VolumeInfo;
-import org.iq80.leveldb.DBException;
+import org.apache.hadoop.utils.BatchOperation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.AbstractMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 
 import static org.apache.hadoop.ozone.ksm
     .KSMConfigKeys.OZONE_KSM_USER_MAX_VOLUME_DEFAULT;
@@ -67,8 +65,7 @@ public class VolumeManagerImpl implements VolumeManager {
 
   // Helpers to add and delete volume from user list
   private void addVolumeToOwnerList(String volume, String owner,
-                                    List<Map.Entry<byte[], byte[]>> putBatch)
-      throws IOException {
+      BatchOperation batchOperation) throws IOException {
     // Get the volume list
     byte[] dbUserKey = metadataManager.getUserKey(owner);
     byte[] volumeList  = metadataManager.get(dbUserKey);
@@ -88,12 +85,11 @@ public class VolumeManagerImpl implements VolumeManager {
     prevVolList.add(volume);
     VolumeList newVolList = VolumeList.newBuilder()
         .addAllVolumeNames(prevVolList).build();
-    putBatch.add(batchEntry(dbUserKey, newVolList.toByteArray()));
+    batchOperation.put(dbUserKey, newVolList.toByteArray());
   }
 
   private void delVolumeFromOwnerList(String volume, String owner,
-                                      List<Map.Entry<byte[], byte[]>> putBatch,
-                                      List<byte[]> deleteBatch)
+                                      BatchOperation batchOperation)
       throws IOException {
     // Get the volume list
     byte[] dbUserKey = metadataManager.getUserKey(owner);
@@ -109,18 +105,14 @@ public class VolumeManagerImpl implements VolumeManager {
     // Remove the volume from the list
     prevVolList.remove(volume);
     if (prevVolList.size() == 0) {
-      deleteBatch.add(dbUserKey);
+      batchOperation.delete(dbUserKey);
     } else {
       VolumeList newVolList = VolumeList.newBuilder()
           .addAllVolumeNames(prevVolList).build();
-      putBatch.add(batchEntry(dbUserKey, newVolList.toByteArray()));
+      batchOperation.put(dbUserKey, newVolList.toByteArray());
     }
   }
 
-  private Map.Entry<byte[], byte[]> batchEntry(byte[] key, byte[] value) {
-    return new AbstractMap.SimpleEntry<>(key, value);
-  }
-
   /**
    * Creates a volume.
    * @param args - KsmVolumeArgs.
@@ -129,7 +121,6 @@ public class VolumeManagerImpl implements VolumeManager {
   public void createVolume(KsmVolumeArgs args) throws IOException {
     Preconditions.checkNotNull(args);
     metadataManager.writeLock().lock();
-    List<Map.Entry<byte[], byte[]>> batch = new LinkedList<>();
     try {
       byte[] dbVolumeKey = metadataManager.getVolumeKey(args.getVolume());
       byte[] volumeInfo = metadataManager.get(dbVolumeKey);
@@ -140,16 +131,17 @@ public class VolumeManagerImpl implements VolumeManager {
         throw new KSMException(ResultCodes.FAILED_VOLUME_ALREADY_EXISTS);
       }
 
+      BatchOperation batch = new BatchOperation();
       // Write the vol info
       VolumeInfo newVolumeInfo = args.getProtobuf();
-      batch.add(batchEntry(dbVolumeKey, newVolumeInfo.toByteArray()));
+      batch.put(dbVolumeKey, newVolumeInfo.toByteArray());
 
       // Add volume to user list
       addVolumeToOwnerList(args.getVolume(), args.getOwnerName(), batch);
-      metadataManager.batchPut(batch);
+      metadataManager.writeBatch(batch);
       LOG.info("created volume:{} user:{}",
                                   args.getVolume(), args.getOwnerName());
-    } catch (IOException | DBException ex) {
+    } catch (IOException ex) {
       LOG.error("Volume creation failed for user:{} volname:{}",
                                 args.getOwnerName(), args.getVolume(), ex);
       throw ex;
@@ -169,8 +161,6 @@ public class VolumeManagerImpl implements VolumeManager {
   public void setOwner(String volume, String owner) throws IOException {
     Preconditions.checkNotNull(volume);
     Preconditions.checkNotNull(owner);
-    List<Map.Entry<byte[], byte[]>> putBatch = new LinkedList<>();
-    List<byte[]> deleteBatch = new LinkedList<>();
     metadataManager.writeLock().lock();
     try {
       byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
@@ -183,9 +173,9 @@ public class VolumeManagerImpl implements VolumeManager {
       KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(volumeInfo);
       Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
 
-      delVolumeFromOwnerList(volume, volumeArgs.getOwnerName(),
-          putBatch, deleteBatch);
-      addVolumeToOwnerList(volume, owner, putBatch);
+      BatchOperation batch = new BatchOperation();
+      delVolumeFromOwnerList(volume, volumeArgs.getOwnerName(), batch);
+      addVolumeToOwnerList(volume, owner, batch);
 
       KsmVolumeArgs newVolumeArgs =
           KsmVolumeArgs.newBuilder().setVolume(volumeArgs.getVolume())
@@ -195,9 +185,9 @@ public class VolumeManagerImpl implements VolumeManager {
               .build();
 
       VolumeInfo newVolumeInfo = newVolumeArgs.getProtobuf();
-      putBatch.add(batchEntry(dbVolumeKey, newVolumeInfo.toByteArray()));
+      batch.put(dbVolumeKey, newVolumeInfo.toByteArray());
 
-      metadataManager.batchPutDelete(putBatch, deleteBatch);
+      metadataManager.writeBatch(batch);
     } catch (IOException ex) {
       LOG.error("Changing volume ownership failed for user:{} volume:{}",
           owner, volume, ex);
@@ -285,8 +275,7 @@ public class VolumeManagerImpl implements VolumeManager {
     Preconditions.checkNotNull(volume);
     metadataManager.writeLock().lock();
     try {
-      List<Map.Entry<byte[], byte[]>> putBatch = new LinkedList<>();
-      List<byte[]> deleteBatch = new LinkedList<>();
+      BatchOperation batch = new BatchOperation();
       byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
       byte[] volInfo = metadataManager.get(dbVolumeKey);
       if (volInfo == null) {
@@ -301,10 +290,9 @@ public class VolumeManagerImpl implements VolumeManager {
       Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
       // delete the volume from the owner list
       // as well as delete the volume entry
-      delVolumeFromOwnerList(volume, volumeInfo.getOwnerName(),
-          putBatch, deleteBatch);
-      deleteBatch.add(dbVolumeKey);
-      metadataManager.batchPutDelete(putBatch, deleteBatch);
+      delVolumeFromOwnerList(volume, volumeInfo.getOwnerName(), batch);
+      batch.delete(dbVolumeKey);
+      metadataManager.writeBatch(batch);
     } catch (IOException ex) {
       LOG.error("Delete volume failed for volume:{}", volume, ex);
       throw ex;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
index 8e49b5f..98da9b1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
@@ -29,10 +29,9 @@ import org.apache.hadoop.scm.ScmConfigKeys;
 import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.utils.LevelDBStore;
-import org.iq80.leveldb.DBIterator;
-import org.iq80.leveldb.Options;
-import org.iq80.leveldb.WriteBatch;
+import org.apache.hadoop.utils.BatchOperation;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,13 +74,13 @@ public class BlockManagerImpl implements BlockManager {
 
   private final NodeManager nodeManager;
   private final Mapping containerManager;
-  private final LevelDBStore blockStore;
+  private final MetadataStore blockStore;
 
   private final Lock lock;
   private final long containerSize;
   private final long cacheSize;
 
-  private final LevelDBStore openContainerStore;
+  private final MetadataStore openContainerStore;
   private Map<String, Long> openContainers;
   private final int containerProvisionBatchSize;
   private final Random rand;
@@ -102,12 +101,14 @@ public class BlockManagerImpl implements BlockManager {
     this.cacheSize = cacheSizeMB;
     File metaDir = OzoneUtils.getScmMetadirPath(conf);
     String scmMetaDataDir = metaDir.getPath();
-    Options options = new Options();
-    options.cacheSize(this.cacheSize * OzoneConsts.MB);
 
     // Write the block key to container name mapping.
     File blockContainerDbPath = new File(scmMetaDataDir, BLOCK_DB);
-    blockStore = new LevelDBStore(blockContainerDbPath, options);
+    blockStore = MetadataStoreBuilder.newBuilder()
+        .setConf(conf)
+        .setDbFile(blockContainerDbPath)
+        .setCacheSize(this.cacheSize * OzoneConsts.MB)
+        .build();
 
     this.containerSize = OzoneConsts.GB * conf.getInt(
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB,
@@ -115,7 +116,12 @@ public class BlockManagerImpl implements BlockManager {
 
     // Load store of all open contains for block allocation
     File openContainsDbPath = new File(scmMetaDataDir, OPEN_CONTAINERS_DB);
-    openContainerStore = new LevelDBStore(openContainsDbPath, options);
+    openContainerStore = MetadataStoreBuilder.newBuilder()
+        .setConf(conf)
+        .setDbFile(openContainsDbPath)
+        .setCacheSize(this.cacheSize * OzoneConsts.MB)
+        .build();
+
     openContainers = new HashMap<>();
     loadOpenContainers();
 
@@ -132,20 +138,19 @@ public class BlockManagerImpl implements BlockManager {
    * @throws IOException
    */
   private void loadOpenContainers() throws IOException {
-    try (DBIterator iter = openContainerStore.getIterator()) {
-      for (iter.seekToFirst(); iter.hasNext(); iter.next()) {
+    try {
+      openContainerStore.iterate(null, (key, value) -> {
         try {
-          byte[] key = iter.peekNext().getKey();
           String containerName = DFSUtil.bytes2String(key);
-          byte[] value = iter.peekNext().getValue();
           Long containerUsed = Long.parseLong(DFSUtil.bytes2String(value));
           openContainers.put(containerName, containerUsed);
           LOG.debug("Loading open container: {} used : {}", containerName,
               containerUsed);
-        } catch (Exception ex) {
+        } catch (Exception e) {
           LOG.warn("Failed loading open container, continue next...");
         }
-      }
+        return true;
+      });
     } catch (IOException e) {
       LOG.error("Loading open container store failed." + e);
       throw new SCMException("Failed to load open container store",
@@ -321,21 +326,19 @@ public class BlockManagerImpl implements BlockManager {
         throw new SCMException("Specified block key does not exist. key : " +
             key, FAILED_TO_FIND_BLOCK);
       }
-      try (WriteBatch wb = blockStore.createWriteBatch()) {
-        containerManager.getContainer(
-            DFSUtil.bytes2String(containerBytes));
-        String deletedKeyName = getDeletedKeyName(key);
-        // Add a tombstone for the deleted key
-        wb.put(DFSUtil.string2Bytes(deletedKeyName), containerBytes);
-        // Delete the block key
-        wb.delete(DFSUtil.string2Bytes(key));
-        blockStore.commitWriteBatch(wb);
-        // TODO: Add async tombstone clean thread to send delete command to
-        // datanodes in the pipeline to clean up the blocks from containers.
-        // TODO: Container report handling of the deleted blocks:
-        // Remove tombstone and update open container usage.
-        // We will revisit this when the closed container replication is done.
-      }
+      BatchOperation batch = new BatchOperation();
+      containerManager.getContainer(DFSUtil.bytes2String(containerBytes));
+      String deletedKeyName = getDeletedKeyName(key);
+      // Add a tombstone for the deleted key
+      batch.put(DFSUtil.string2Bytes(deletedKeyName), containerBytes);
+      // Delete the block key
+      batch.delete(DFSUtil.string2Bytes(key));
+      blockStore.writeBatch(batch);
+      // TODO: Add async tombstone clean thread to send delete command to
+      // datanodes in the pipeline to clean up the blocks from containers.
+      // TODO: Container report handling of the deleted blocks:
+      // Remove tombstone and update open container usage.
+      // We will revisit this when the closed container replication is done.
     } finally {
       lock.unlock();
     }
@@ -359,4 +362,4 @@ public class BlockManagerImpl implements BlockManager {
       openContainerStore.close();
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
index f558882..0e436e9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
@@ -31,8 +31,8 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.Pipeline;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.utils.LevelDBStore;
-import org.iq80.leveldb.DBIterator;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,7 +47,6 @@ import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
 
 import static org.apache.hadoop.ozone.OzoneConsts.BLOCK_DB;
@@ -153,6 +152,7 @@ public class SQLCLI  extends Configured implements Tool {
         .withDescription("specify output path")
         .create("o");
     allOptions.addOption(outPathOption);
+
     return allOptions;
   }
 
@@ -254,22 +254,25 @@ public class SQLCLI  extends Configured implements Tool {
       throws Exception {
     LOG.info("Create tables for sql container db.");
     File dbFile = dbPath.toFile();
-    org.iq80.leveldb.Options dbOptions = new org.iq80.leveldb.Options();
-    try (LevelDBStore dbStore = new LevelDBStore(dbFile, dbOptions);
-         Connection conn = connectDB(outPath.toString());
-         DBIterator iter = dbStore.getIterator()) {
+    try (MetadataStore dbStore = MetadataStoreBuilder.newBuilder()
+        .setDbFile(dbFile).build();
+        Connection conn = connectDB(outPath.toString())) {
       executeSQL(conn, CREATE_CONTAINER_INFO);
       executeSQL(conn, CREATE_CONTAINER_MEMBERS);
       executeSQL(conn, CREATE_DATANODE_INFO);
 
-      iter.seekToFirst();
       HashSet<String> uuidChecked = new HashSet<>();
-      while (iter.hasNext()) {
-        Map.Entry<byte[], byte[]> entry = iter.next();
-        String containerName = new String(entry.getKey(), encoding);
-        Pipeline pipeline = Pipeline.parseFrom(entry.getValue());
-        insertContainerDB(conn, containerName, pipeline, uuidChecked);
-      }
+      dbStore.iterate(null, (key, value) -> {
+        String containerName = new String(key, encoding);
+        Pipeline pipeline = null;
+        pipeline = Pipeline.parseFrom(value);
+        try {
+          insertContainerDB(conn, containerName, pipeline, uuidChecked);
+          return true;
+        } catch (SQLException e) {
+          throw new IOException(e);
+        }
+      });
     }
   }
 
@@ -330,21 +333,24 @@ public class SQLCLI  extends Configured implements Tool {
   private void convertBlockDB(Path dbPath, Path outPath) throws Exception {
     LOG.info("Create tables for sql block db.");
     File dbFile = dbPath.toFile();
-    org.iq80.leveldb.Options dbOptions = new org.iq80.leveldb.Options();
-    try (LevelDBStore dbStore = new LevelDBStore(dbFile, dbOptions);
-         Connection conn = connectDB(outPath.toString());
-         DBIterator iter = dbStore.getIterator()) {
+    try (MetadataStore dbStore = MetadataStoreBuilder.newBuilder()
+        .setDbFile(dbFile).build();
+        Connection conn = connectDB(outPath.toString())) {
       executeSQL(conn, CREATE_BLOCK_CONTAINER);
 
-      iter.seekToFirst();
-      while (iter.hasNext()) {
-        Map.Entry<byte[], byte[]> entry = iter.next();
-        String blockKey = DFSUtilClient.bytes2String(entry.getKey());
-        String containerName = DFSUtilClient.bytes2String(entry.getValue());
+      dbStore.iterate(null, (key, value) -> {
+        String blockKey = DFSUtilClient.bytes2String(key);
+        String containerName = DFSUtilClient.bytes2String(value);
         String insertBlockContainer = String.format(
             INSERT_BLOCK_CONTAINER, blockKey, containerName);
-        executeSQL(conn, insertBlockContainer);
-      }
+
+        try {
+          executeSQL(conn, insertBlockContainer);
+          return true;
+        } catch (SQLException e) {
+          throw new IOException(e);
+        }
+      });
     }
   }
 
@@ -374,21 +380,23 @@ public class SQLCLI  extends Configured implements Tool {
   private void convertNodePoolDB(Path dbPath, Path outPath) throws Exception {
     LOG.info("Create table for sql node pool db.");
     File dbFile = dbPath.toFile();
-    org.iq80.leveldb.Options dbOptions = new org.iq80.leveldb.Options();
-    try (LevelDBStore dbStore = new LevelDBStore(dbFile, dbOptions);
-         Connection conn = connectDB(outPath.toString());
-         DBIterator iter = dbStore.getIterator()) {
+    try (MetadataStore dbStore = MetadataStoreBuilder.newBuilder()
+        .setDbFile(dbFile).build();
+        Connection conn = connectDB(outPath.toString())) {
       executeSQL(conn, CREATE_NODE_POOL);
       executeSQL(conn, CREATE_DATANODE_INFO);
 
-      iter.seekToFirst();
-      while (iter.hasNext()) {
-        Map.Entry<byte[], byte[]> entry = iter.next();
-        DatanodeID nodeId = DatanodeID.getFromProtoBuf(
-            HdfsProtos.DatanodeIDProto.PARSER.parseFrom(entry.getKey()));
-        String blockPool = DFSUtil.bytes2String(entry.getValue());
-        insertNodePoolDB(conn, blockPool, nodeId);
-      }
+      dbStore.iterate(null, (key, value) -> {
+        DatanodeID nodeId = DatanodeID
+            .getFromProtoBuf(HdfsProtos.DatanodeIDProto.PARSER.parseFrom(key));
+        String blockPool = DFSUtil.bytes2String(value);
+        try {
+          insertNodePoolDB(conn, blockPool, nodeId);
+          return true;
+        } catch (SQLException e) {
+          throw new IOException(e);
+        }
+      });
     }
   }
 
@@ -423,22 +431,24 @@ public class SQLCLI  extends Configured implements Tool {
       throws Exception {
     LOG.info("Create table for open container db.");
     File dbFile = dbPath.toFile();
-    org.iq80.leveldb.Options dbOptions = new org.iq80.leveldb.Options();
-    try (LevelDBStore dbStore = new LevelDBStore(dbFile, dbOptions);
-        Connection conn = connectDB(outPath.toString());
-        DBIterator iter = dbStore.getIterator()) {
+    try (MetadataStore dbStore = MetadataStoreBuilder.newBuilder()
+        .setDbFile(dbFile).build();
+        Connection conn = connectDB(outPath.toString())) {
       executeSQL(conn, CREATE_OPEN_CONTAINER);
 
-      iter.seekToFirst();
-      while (iter.hasNext()) {
-        Map.Entry<byte[], byte[]> entry = iter.next();
-        String containerName = DFSUtil.bytes2String(entry.getKey());
-        Long containerUsed = Long.parseLong(
-            DFSUtil.bytes2String(entry.getValue()));
-        String insertOpenContainer = String.format(
-            INSERT_OPEN_CONTAINER, containerName, containerUsed);
-        executeSQL(conn, insertOpenContainer);
-      }
+      dbStore.iterate(null, (key, value) -> {
+        String containerName = DFSUtil.bytes2String(key);
+        Long containerUsed =
+            Long.parseLong(DFSUtil.bytes2String(value));
+        String insertOpenContainer = String
+            .format(INSERT_OPEN_CONTAINER, containerName, containerUsed);
+        try {
+          executeSQL(conn, insertOpenContainer);
+          return true;
+        } catch (SQLException e) {
+          throw new IOException(e);
+        }
+      });
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
index 75694bb..643779d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
@@ -32,9 +32,10 @@ import org.apache.hadoop.ozone.web.utils.OzoneUtils;
 import org.apache.hadoop.scm.ScmConfigKeys;
 import org.apache.hadoop.scm.client.ScmClient;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.utils.LevelDBKeyFilters.KeyPrefixFilter;
-import org.apache.hadoop.utils.LevelDBStore;
-import org.iq80.leveldb.Options;
+import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
+import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,7 +64,7 @@ public class ContainerMapping implements Mapping {
   private final long cacheSize;
   private final Lock lock;
   private final Charset encoding = Charset.forName("UTF-8");
-  private final LevelDBStore containerStore;
+  private final MetadataStore containerStore;
   private final ContainerPlacementPolicy placementPolicy;
   private final long containerSize;
 
@@ -85,12 +86,14 @@ public class ContainerMapping implements Mapping {
     this.cacheSize = cacheSizeMB;
 
     File metaDir = OzoneUtils.getScmMetadirPath(conf);
-    Options options = new Options();
-    options.cacheSize(this.cacheSize * OzoneConsts.MB);
 
     // Write the container name to pipeline mapping.
     File containerDBPath = new File(metaDir, CONTAINER_DB);
-    containerStore = new LevelDBStore(containerDBPath, options);
+    containerStore = MetadataStoreBuilder.newBuilder()
+        .setConf(conf)
+        .setDbFile(containerDBPath)
+        .setCacheSize(this.cacheSize * OzoneConsts.MB)
+        .build();
 
     this.lock = new ReentrantLock();
 
@@ -192,7 +195,7 @@ public class ContainerMapping implements Mapping {
       if(containerStore.isEmpty()) {
         throw new IOException("No container exists in current db");
       }
-      KeyPrefixFilter prefixFilter = new KeyPrefixFilter(prefixName);
+      MetadataKeyFilter prefixFilter = new KeyPrefixFilter(prefixName);
       byte[] startKey = startName == null ?
           null : DFSUtil.string2Bytes(startName);
       List<Map.Entry<byte[], byte[]>> range =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java
index 71836db..a22cd12 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.ozone.scm.exceptions.SCMException;
 import org.apache.hadoop.ozone.scm.node.CommandQueue;
 import org.apache.hadoop.ozone.scm.node.NodeManager;
 import org.apache.hadoop.ozone.scm.node.NodePoolManager;
@@ -260,8 +261,17 @@ public class ContainerReplicationManager implements Closeable {
    * a datanode.
    */
   public void handleContainerReport(ContainerReportsProto containerReport) {
-    String poolName = poolManager.getNodePool(
-        DatanodeID.getFromProtoBuf(containerReport.getDatanodeID()));
+    String poolName = null;
+    DatanodeID datanodeID = DatanodeID
+        .getFromProtoBuf(containerReport.getDatanodeID());
+    try {
+      poolName = poolManager.getNodePool(datanodeID);
+    } catch (SCMException e) {
+      LOG.warn("Skipping processing container report from datanode {}, "
+              + "cause: failed to get the corresponding node pool",
+          datanodeID.toString(), e);
+      return;
+    }
 
     for(InProgressPool ppool : inProgressPoolList) {
       if(ppool.getPoolName().equalsIgnoreCase(poolName)) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java
index ad902ba..f60bdc6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java
@@ -110,6 +110,7 @@ public class SCMException extends IOException {
     FAILED_TO_FIND_CONTAINER,
     FAILED_TO_FIND_CONTAINER_WITH_SAPCE,
     BLOCK_EXISTS,
-    FAILED_TO_FIND_BLOCK
+    FAILED_TO_FIND_BLOCK,
+    IO_EXCEPTION
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodePoolManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodePoolManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodePoolManager.java
index f91c705..d3218b7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodePoolManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodePoolManager.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ozone.scm.exceptions.SCMException;
 
 import java.io.Closeable;
+import java.io.IOException;
 import java.util.List;
 
 /**
@@ -35,7 +36,7 @@ public interface NodePoolManager extends Closeable {
    * @param pool - name of the node pool.
    * @param node - data node.
    */
-  void addNode(String pool, DatanodeID node);
+  void addNode(String pool, DatanodeID node) throws IOException;
 
   /**
    * Remove a node from a node pool.
@@ -67,5 +68,5 @@ public interface NodePoolManager extends Closeable {
    * @return node pool name if it has been assigned.
    * null if the node has not been assigned to any node pool yet.
    */
-  String getNodePool(DatanodeID datanodeID);
+  String getNodePool(DatanodeID datanodeID) throws SCMException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
index a71fbcc..235809b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
@@ -726,8 +726,16 @@ public class SCMNodeManager
     // TODO: define node pool policy for non-default node pool.
     // For now, all nodes are added to the "DefaultNodePool" upon registration
     // if it has not been added to any node pool yet.
-    if (nodePoolManager.getNodePool(datanodeID) == null) {
-      nodePoolManager.addNode(SCMNodePoolManager.DEFAULT_NODEPOOL, datanodeID);
+    try {
+      if (nodePoolManager.getNodePool(datanodeID) == null) {
+        nodePoolManager.addNode(SCMNodePoolManager.DEFAULT_NODEPOOL,
+            datanodeID);
+      }
+    } catch (IOException e) {
+      // TODO: make sure registration failure is handled correctly.
+      return RegisteredCommand.newBuilder()
+          .setErrorCode(ErrorCode.errorNodeNotPermitted)
+          .build();
     }
     LOG.info("Data node with ID: {} Registered.",
         datanodeID.getDatanodeUuid());
@@ -823,4 +831,4 @@ public class SCMNodeManager
     }
     return nodeCountMap;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodePoolManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodePoolManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodePoolManager.java
index 9c1821f..aa34f29 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodePoolManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodePoolManager.java
@@ -26,9 +26,8 @@ import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.scm.exceptions.SCMException;
 import org.apache.hadoop.ozone.web.utils.OzoneUtils;
-import org.apache.hadoop.utils.LevelDBStore;
-import org.iq80.leveldb.DBIterator;
-import org.iq80.leveldb.Options;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,7 +64,7 @@ public final class SCMNodePoolManager implements NodePoolManager {
   public static final String DEFAULT_NODEPOOL = "DefaultNodePool";
 
   // DB that saves the node to node pool mapping.
-  private LevelDBStore nodePoolStore;
+  private MetadataStore nodePoolStore;
 
   // In-memory node pool to nodes mapping
   private HashMap<String, Set<DatanodeID>> nodePools;
@@ -84,11 +83,12 @@ public final class SCMNodePoolManager implements NodePoolManager {
         OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
     File metaDir = OzoneUtils.getScmMetadirPath(conf);
     String scmMetaDataDir = metaDir.getPath();
-    Options options = new Options();
-    options.cacheSize(cacheSize * OzoneConsts.MB);
-
     File nodePoolDBPath = new File(scmMetaDataDir, NODEPOOL_DB);
-    nodePoolStore = new LevelDBStore(nodePoolDBPath, options);
+    nodePoolStore = MetadataStoreBuilder.newBuilder()
+        .setConf(conf)
+        .setDbFile(nodePoolDBPath)
+        .setCacheSize(cacheSize * OzoneConsts.MB)
+        .build();
     nodePools = new HashMap<>();
     lock = new ReentrantReadWriteLock();
     init();
@@ -100,14 +100,11 @@ public final class SCMNodePoolManager implements NodePoolManager {
    * @throws SCMException
    */
   private void init() throws SCMException {
-    try (DBIterator iter = nodePoolStore.getIterator()) {
-      for (iter.seekToFirst(); iter.hasNext(); iter.next()) {
+    try {
+      nodePoolStore.iterate(null, (key, value) -> {
         try {
-          byte[] key = iter.peekNext().getKey();
           DatanodeID nodeId = DatanodeID.getFromProtoBuf(
               HdfsProtos.DatanodeIDProto.PARSER.parseFrom(key));
-
-          byte[] value = iter.peekNext().getValue();
           String poolName = DFSUtil.bytes2String(value);
 
           Set<DatanodeID> nodePool = null;
@@ -119,12 +116,14 @@ public final class SCMNodePoolManager implements NodePoolManager {
           }
           nodePool.add(nodeId);
           if (LOG.isDebugEnabled()) {
-            LOG.debug("Adding node: {} to node pool: {}", nodeId, poolName);
+            LOG.debug("Adding node: {} to node pool: {}",
+                nodeId, poolName);
           }
-        } catch (Exception ex) {
+        } catch (IOException e) {
           LOG.warn("Can't add a datanode to node pool, continue next...");
         }
-      }
+        return true;
+      });
     } catch (IOException e) {
       LOG.error("Loading node pool error " + e);
       throw new SCMException("Failed to load node pool",
@@ -138,7 +137,8 @@ public final class SCMNodePoolManager implements NodePoolManager {
    * @param node - name of the datanode.
    */
   @Override
-  public void addNode(final String pool, final DatanodeID node) {
+  public void addNode(final String pool, final DatanodeID node)
+      throws IOException {
     Preconditions.checkNotNull(pool, "pool name is null");
     Preconditions.checkNotNull(node, "node is null");
     lock.writeLock().lock();
@@ -192,6 +192,10 @@ public final class SCMNodePoolManager implements NodePoolManager {
         throw new SCMException(String.format("Unable to find node %s from" +
             " pool %s in MAP.", DFSUtil.bytes2String(kName), pool),
             FAILED_TO_FIND_NODE_IN_POOL);      }
+    } catch (IOException e) {
+      throw new SCMException("Failed to remove node " + node.toString()
+          + " from node pool " + pool, e,
+          SCMException.ResultCodes.IO_EXCEPTION);
     } finally {
       lock.writeLock().unlock();
     }
@@ -238,14 +242,17 @@ public final class SCMNodePoolManager implements NodePoolManager {
    * TODO: Put this in a in-memory map if performance is an issue.
    */
   @Override
-  public String getNodePool(final DatanodeID datanodeID) {
+  public String getNodePool(final DatanodeID datanodeID) throws SCMException {
     Preconditions.checkNotNull(datanodeID, "node is null");
-    byte[] result = nodePoolStore.get(
-        datanodeID.getProtoBufMessage().toByteArray());
-    if (result == null) {
-      return null;
+    try {
+      byte[]  result = nodePoolStore.get(
+          datanodeID.getProtoBufMessage().toByteArray());
+      return result == null ? null : DFSUtil.bytes2String(result);
+    } catch (IOException e) {
+      throw new SCMException("Failed to get node pool for node "
+          + datanodeID.toString(), e,
+          SCMException.ResultCodes.IO_EXCEPTION);
     }
-    return DFSUtil.bytes2String(result);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/OzoneMetadataManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/OzoneMetadataManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/OzoneMetadataManager.java
index d849612..f28555a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/OzoneMetadataManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/OzoneMetadataManager.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.utils.LevelDBStore;
 import org.apache.hadoop.ozone.web.exceptions.ErrorTable;
 import org.apache.hadoop.ozone.web.exceptions.OzoneException;
 import org.apache.hadoop.ozone.web.handlers.BucketArgs;
@@ -39,8 +38,8 @@ import org.apache.hadoop.ozone.web.response.ListKeys;
 import org.apache.hadoop.ozone.web.response.ListVolumes;
 import org.apache.hadoop.ozone.web.response.VolumeInfo;
 import org.apache.hadoop.ozone.web.response.VolumeOwner;
-import org.iq80.leveldb.DBException;
-import org.iq80.leveldb.DBIterator;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -128,8 +127,8 @@ public final class OzoneMetadataManager {
   private static final String USER_DB = "/user.db";
   private static final String META_DB = "/metadata.db";
   private static OzoneMetadataManager bm = null;
-  private LevelDBStore userDB;
-  private LevelDBStore metadataDB;
+  private MetadataStore userDB;
+  private MetadataStore metadataDB;
   private ReadWriteLock lock;
   private Charset encoding = Charset.forName("UTF-8");
   private String storageRoot;
@@ -157,8 +156,14 @@ public final class OzoneMetadataManager {
     }
 
     try {
-      userDB = new LevelDBStore(new File(storageRoot + USER_DB), true);
-      metadataDB = new LevelDBStore(new File(storageRoot + META_DB), true);
+      userDB = MetadataStoreBuilder.newBuilder()
+          .setDbFile(new File(storageRoot + USER_DB))
+          .setCreateIfMissing(true)
+          .build();
+      metadataDB = MetadataStoreBuilder.newBuilder()
+          .setDbFile(new File(storageRoot + META_DB))
+          .setCreateIfMissing(true)
+          .build();
       inProgressObjects = new ConcurrentHashMap<>();
     } catch (IOException ex) {
       LOG.error("Cannot open db :" + ex.getMessage());
@@ -230,7 +235,7 @@ public final class OzoneMetadataManager {
       metadataDB.put(args.getVolumeName().getBytes(encoding),
           newVInfo.toDBString().getBytes(encoding));
 
-    } catch (IOException | DBException ex) {
+    } catch (IOException ex) {
       throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex);
     } finally {
       lock.writeLock().unlock();
@@ -295,7 +300,7 @@ public final class OzoneMetadataManager {
       userDB.put(args.getResourceName().getBytes(encoding),
           volumeList.toDBString().getBytes(encoding));
 
-    } catch (IOException | DBException ex) {
+    } catch (IOException ex) {
       throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex);
     } finally {
       lock.writeLock().unlock();
@@ -341,7 +346,7 @@ public final class OzoneMetadataManager {
 
       VolumeInfo info = VolumeInfo.parse(new String(volumeInfo, encoding));
       return info.getOwner().getName().equals(acl.getName());
-    } catch (IOException | DBException ex) {
+    } catch (IOException ex) {
       throw ErrorTable.newError(ErrorTable.SERVER_ERROR, null, ex);
     } finally {
       lock.readLock().unlock();
@@ -365,7 +370,7 @@ public final class OzoneMetadataManager {
       }
 
       return VolumeInfo.parse(new String(volumeInfo, encoding));
-    } catch (IOException | DBException ex) {
+    } catch (IOException ex) {
       throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex);
     } finally {
       lock.readLock().unlock();
@@ -405,7 +410,7 @@ public final class OzoneMetadataManager {
         prevKey = volName[1];
       }
       return getFilteredVolumes(volumeList, prefix, prevKey, maxCount);
-    } catch (IOException | DBException ex) {
+    } catch (IOException ex) {
       throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args.getArgs(), ex);
     } finally {
       lock.readLock().unlock();
@@ -448,80 +453,54 @@ public final class OzoneMetadataManager {
    * @return ListVolumes.
    * @throws OzoneException
    */
-  public ListVolumes listAllVolumes(ListArgs args) throws OzoneException,
-      IOException {
+  public ListVolumes listAllVolumes(ListArgs args)
+      throws OzoneException, IOException {
     String prefix = args.getPrefix();
-    String prevKey = args.getPrevKey();
+    final String prevKey;
     int maxCount = args.getMaxKeys();
     String userName = null;
-    try (DBIterator iterator = this.userDB.getDB().iterator()) {
-
-      if (prevKey != null) {
-        // Format is username/volumeName
 
-        String[] volName = args.getPrevKey().split("/");
-        if (volName.length < 2) {
-          throw ErrorTable.newError(ErrorTable.USER_NOT_FOUND, args.getArgs());
-        }
-        seekToUser(iterator, volName[0]);
-        userName = new String(iterator.peekNext().getKey(), encoding);
-        prevKey = volName[1];
-      } else {
-        userName = getFirstUser(iterator);
-      }
-
-      if (userName == null || userName.isEmpty()) {
+    if (args.getPrevKey() != null) {
+      // Format is username/volumeName
+      String[] volName = args.getPrevKey().split("/");
+      if (volName.length < 2) {
         throw ErrorTable.newError(ErrorTable.USER_NOT_FOUND, args.getArgs());
       }
 
-      ListVolumes returnSet = new ListVolumes();
-      int count = maxCount - returnSet.getVolumes().size();
-
-      // we need to iterate through users until we get maxcount volumes
-      // or no more volumes are left.
-      while (iterator.hasNext() && count > 0) {
+      byte[] userNameBytes = userDB.get(volName[0].getBytes(encoding));
+      userName = new String(userNameBytes, encoding);
+      prevKey = volName[1];
+    } else {
+      userName = new String(userDB.peekAround(0, null).getKey(), encoding);
+      prevKey = null;
+    }
 
-        userName = new String(iterator.next().getKey(), encoding);
+    if (userName == null || userName.isEmpty()) {
+      throw ErrorTable.newError(ErrorTable.USER_NOT_FOUND, args.getArgs());
+    }
 
-        byte[] volumeList = userDB.get(userName.getBytes(encoding));
+    ListVolumes returnSet = new ListVolumes();
+    // we need to iterate through users until we get maxcount volumes
+    // or no more volumes are left.
+    userDB.iterate(null, (key, value) -> {
+      int currentSize = returnSet.getVolumes().size();
+      if (currentSize < maxCount) {
+        String name = new String(key, encoding);
+        byte[] volumeList = userDB.get(name.getBytes(encoding));
         if (volumeList == null) {
-          throw ErrorTable.newError(ErrorTable.USER_NOT_FOUND, args.getArgs());
+          throw new IOException(
+              ErrorTable.newError(ErrorTable.USER_NOT_FOUND, args.getArgs()));
         }
-
-        returnSet.getVolumes().addAll(getFilteredVolumes(
-            volumeList, prefix, prevKey, count).getVolumes());
-        count = maxCount - returnSet.getVolumes().size();
+        returnSet.getVolumes().addAll(
+            getFilteredVolumes(volumeList, prefix, prevKey,
+                maxCount - currentSize).getVolumes());
+        return true;
+      } else {
+        return false;
       }
-      return returnSet;
-    }
-  }
+    });
 
-  /**
-   * Returns the first user name from the UserDB.
-   *
-   * @return - UserName.
-   * @throws IOException
-   */
-  String getFirstUser(DBIterator iterator) throws IOException {
-    iterator.seekToFirst();
-    if (iterator.hasNext()) {
-      return new String(iterator.peekNext().getKey(), encoding);
-    }
-    return null;
-  }
-
-  /**
-   * Reposition the DB cursor to the user name.
-   *
-   * @param iterator - Current Iterator.
-   * @param userName - userName to seek to
-   * @return - DBIterator.
-   * @throws IOException
-   */
-  DBIterator seekToUser(DBIterator iterator, String userName) throws
-      IOException {
-    iterator.seek(userName.getBytes(encoding));
-    return iterator;
+    return returnSet;
   }
 
   /**
@@ -587,7 +566,7 @@ public final class OzoneMetadataManager {
       metadataDB.delete(args.getVolumeName().getBytes(encoding));
       userDB.put(user.getBytes(encoding),
           volumeList.toDBString().getBytes(encoding));
-    } catch (IOException | DBException ex) {
+    } catch (IOException ex) {
       throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex);
     } finally {
       lock.writeLock().unlock();
@@ -659,7 +638,7 @@ public final class OzoneMetadataManager {
       metadataDB.put(args.getResourceName().getBytes(encoding),
           bucketInfo.toDBString().getBytes(encoding));
 
-    } catch (IOException | DBException ex) {
+    } catch (IOException ex) {
       throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex);
     } finally {
       lock.writeLock().unlock();
@@ -716,7 +695,7 @@ public final class OzoneMetadataManager {
 
       userDB.put(args.getParentName().getBytes(encoding),
           bucketList.toDBString().getBytes(encoding));
-    } catch (IOException | DBException ex) {
+    } catch (IOException ex) {
       throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex);
     } finally {
       lock.writeLock().unlock();
@@ -807,7 +786,7 @@ public final class OzoneMetadataManager {
       metadataDB.delete(args.getResourceName().getBytes(encoding));
       userDB.put(args.getParentName().getBytes(encoding),
           bucketList.toDBString().getBytes(encoding));
-    } catch (IOException | DBException ex) {
+    } catch (IOException ex) {
       throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex);
     } finally {
       lock.writeLock().unlock();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BatchOperation.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BatchOperation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BatchOperation.java
new file mode 100644
index 0000000..47699eb
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BatchOperation.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.utils;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * An utility class to store a batch of DB write operations.
+ */
+public class BatchOperation {
+
+  /**
+   * Enum for write operations.
+   */
+  public enum Operation {
+    DELETE, PUT
+  }
+
+  private List<SingleOperation> operations =
+      Lists.newArrayList();
+
+  /**
+   * Add a PUT operation into the batch.
+   */
+  public void put(byte[] key, byte[] value) {
+    operations.add(new SingleOperation(Operation.PUT, key, value));
+  }
+
+  /**
+   * Add a DELETE operation into the batch.
+   */
+  public void delete(byte[] key) {
+    operations.add(new SingleOperation(Operation.DELETE, key, null));
+
+  }
+
+  public List<SingleOperation> getOperations() {
+    return operations;
+  }
+
+  /**
+   * A SingleOperation represents a PUT or DELETE operation
+   * and the data the operation needs to manipulates.
+   */
+  public static class SingleOperation {
+
+    private Operation opt;
+    private byte[] key;
+    private byte[] value;
+
+    public SingleOperation(Operation opt, byte[] key, byte[] value) {
+      this.opt = opt;
+      if (key == null) {
+        throw new IllegalArgumentException("key cannot be null");
+      }
+      this.key = key.clone();
+      this.value = value == null ? null : value.clone();
+    }
+
+    public Operation getOpt() {
+      return opt;
+    }
+
+    public byte[] getKey() {
+      return key.clone();
+    }
+
+    public byte[] getValue() {
+      return value == null ? null : value.clone();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f122a75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/EntryConsumer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/EntryConsumer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/EntryConsumer.java
new file mode 100644
index 0000000..c407398
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/EntryConsumer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.utils;
+
+import java.io.IOException;
+
+/**
+ * A consumer for metadata store key-value entries.
+ * Used by {@link MetadataStore} class.
+ */
+@FunctionalInterface
+public interface EntryConsumer {
+
+  /**
+   * Consumes a key and value and produces a boolean result.
+   * @param key key
+   * @param value value
+   * @return a boolean value produced by the consumer
+   * @throws IOException
+   */
+  boolean consume(byte[] key, byte[] value) throws IOException;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org