You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/08/29 21:03:06 UTC

hadoop git commit: HDFS-12366. Ozone: Refactor KSM metadata class names to avoid confusion. Contributed by Weiwei Yang.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 0f6050752 -> 6215afcd8


HDFS-12366. Ozone: Refactor KSM metadata class names to avoid confusion. Contributed by Weiwei Yang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6215afcd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6215afcd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6215afcd

Branch: refs/heads/HDFS-7240
Commit: 6215afcd8c476decb39f7d8caf0ca9e9adfc8f40
Parents: 0f60507
Author: Anu Engineer <ae...@apache.org>
Authored: Tue Aug 29 13:57:51 2017 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Tue Aug 29 13:57:51 2017 -0700

----------------------------------------------------------------------
 .../hadoop/ozone/ksm/BucketManagerImpl.java     |   6 +-
 .../hadoop/ozone/ksm/KSMMetadataManager.java    | 216 +++++++++
 .../ozone/ksm/KSMMetadataManagerImpl.java       | 453 +++++++++++++++++++
 .../apache/hadoop/ozone/ksm/KeyManagerImpl.java |   4 +-
 .../hadoop/ozone/ksm/KeySpaceManager.java       |   6 +-
 .../hadoop/ozone/ksm/MetadataManager.java       | 216 ---------
 .../hadoop/ozone/ksm/MetadataManagerImpl.java   | 453 -------------------
 .../hadoop/ozone/ksm/VolumeManagerImpl.java     |   4 +-
 .../hadoop/ozone/ksm/TestBucketManagerImpl.java |  28 +-
 9 files changed, 693 insertions(+), 693 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6215afcd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManagerImpl.java
index 95f76dd..d0e8149 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManagerImpl.java
@@ -39,15 +39,15 @@ public class BucketManagerImpl implements BucketManager {
       LoggerFactory.getLogger(BucketManagerImpl.class);
 
   /**
-   * MetadataManager is used for accessing KSM MetadataDB and ReadWriteLock.
+   * KSMMetadataManager is used for accessing KSM MetadataDB and ReadWriteLock.
    */
-  private final MetadataManager metadataManager;
+  private final KSMMetadataManager metadataManager;
 
   /**
    * Constructs BucketManager.
    * @param metadataManager
    */
-  public BucketManagerImpl(MetadataManager metadataManager){
+  public BucketManagerImpl(KSMMetadataManager metadataManager){
     this.metadataManager = metadataManager;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6215afcd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java
new file mode 100644
index 0000000..fe93718
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.ksm;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
+import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
+import org.apache.hadoop.utils.BatchOperation;
+import org.apache.hadoop.utils.MetadataStore;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * KSM metadata manager interface.
+ */
+public interface KSMMetadataManager {
+  /**
+   * Start metadata manager.
+   */
+  void start();
+
+  /**
+   * Stop metadata manager.
+   */
+  void stop() throws IOException;
+
+  /**
+   * Get metadata store.
+   * @return metadata store.
+   */
+  @VisibleForTesting
+  MetadataStore getStore();
+
+  /**
+   * Returns the read lock used on Metadata DB.
+   * @return readLock
+   */
+  Lock readLock();
+
+  /**
+   * Returns the write lock used on Metadata DB.
+   * @return writeLock
+   */
+  Lock writeLock();
+
+  /**
+   * Returns the value associated with this key.
+   * @param key - key
+   * @return value
+   */
+  byte[] get(byte[] key) throws IOException;
+
+  /**
+   * Puts a Key into Metadata DB.
+   * @param key   - key
+   * @param value - value
+   */
+  void put(byte[] key, byte[] value) throws IOException;
+
+  /**
+   * Deletes a Key from Metadata DB.
+   * @param key   - key
+   */
+  void delete(byte[] key) throws IOException;
+
+  /**
+   * Atomic write a batch of operations.
+   * @param batch
+   * @throws IOException
+   */
+  void writeBatch(BatchOperation batch) throws IOException;
+
+  /**
+   * Given a volume return the corresponding DB key.
+   * @param volume - Volume name
+   */
+  byte[] getVolumeKey(String volume);
+
+  /**
+   * Given a user return the corresponding DB key.
+   * @param user - User name
+   */
+  byte[] getUserKey(String user);
+
+  /**
+   * Given a volume and bucket, return the corresponding DB key.
+   * @param volume - User name
+   * @param bucket - Bucket name
+   */
+  byte[] getBucketKey(String volume, String bucket);
+
+  /**
+   * Given a volume, bucket and a key, return the corresponding DB key.
+   * @param volume - volume name
+   * @param bucket - bucket name
+   * @param key - key name
+   * @return bytes of DB key.
+   */
+  byte[] getDBKeyForKey(String volume, String bucket, String key);
+
+  /**
+   * Returns the DB key name of a deleted key in KSM metadata store.
+   * The name for a deleted key has prefix #deleting# followed by
+   * the actual key name.
+   * @param keyName - key name
+   * @return bytes of DB key.
+   */
+  byte[] getDeletedKeyName(byte[] keyName);
+
+  /**
+   * Deletes the key from DB.
+   *
+   * @param key - key name
+   */
+  void deleteKey(byte[] key) throws IOException;
+
+  /**
+   * Given a volume, check if it is empty,
+   * i.e there are no buckets inside it.
+   * @param volume - Volume name
+   */
+  boolean isVolumeEmpty(String volume) throws IOException;
+
+  /**
+   * Given a volume/bucket, check if it is empty,
+   * i.e there are no keys inside it.
+   * @param volume - Volume name
+   * @param  bucket - Bucket name
+   * @return true if the bucket is empty
+   */
+  boolean isBucketEmpty(String volume, String bucket) throws IOException;
+
+  /**
+   * Returns a list of buckets represented by {@link KsmBucketInfo}
+   * in the given volume.
+   *
+   * @param volumeName
+   *   the name of the volume. This argument is required,
+   *   this method returns buckets in this given volume.
+   * @param startBucket
+   *   the start bucket name. Only the buckets whose name is
+   *   after this value will be included in the result.
+   *   This key is excluded from the result.
+   * @param bucketPrefix
+   *   bucket name prefix. Only the buckets whose name has
+   *   this prefix will be included in the result.
+   * @param maxNumOfBuckets
+   *   the maximum number of buckets to return. It ensures
+   *   the size of the result will not exceed this limit.
+   * @return a list of buckets.
+   * @throws IOException
+   */
+  List<KsmBucketInfo> listBuckets(String volumeName, String startBucket,
+      String bucketPrefix, int maxNumOfBuckets) throws IOException;
+
+  /**
+   * Returns a list of keys represented by {@link KsmKeyInfo}
+   * in the given bucket.
+   *
+   * @param volumeName
+   *   the name of the volume.
+   * @param bucketName
+   *   the name of the bucket.
+   * @param startKey
+   *   the start key name, only the keys whose name is
+   *   after this value will be included in the result.
+   *   This key is excluded from the result.
+   * @param keyPrefix
+   *   key name prefix, only the keys whose name has
+   *   this prefix will be included in the result.
+   * @param maxKeys
+   *   the maximum number of keys to return. It ensures
+   *   the size of the result will not exceed this limit.
+   * @return a list of keys.
+   * @throws IOException
+   */
+  List<KsmKeyInfo> listKeys(String volumeName,
+      String bucketName, String startKey, String keyPrefix, int maxKeys)
+      throws IOException;
+
+  /**
+   * Returns a list of volumes owned by a given user; if user is null,
+   * returns all volumes.
+   *
+   * @param userName
+   *   volume owner
+   * @param prefix
+   *   the volume prefix used to filter the listing result.
+   * @param startKey
+   *   the start volume name determines where to start listing from,
+   *   this key is excluded from the result.
+   * @param maxKeys
+   *   the maximum number of volumes to return.
+   * @return a list of {@link KsmVolumeArgs}
+   * @throws IOException
+   */
+  List<KsmVolumeArgs> listVolumes(String userName, String prefix,
+      String startKey, int maxKeys) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6215afcd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java
new file mode 100644
index 0000000..b954685
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java
@@ -0,0 +1,453 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.ksm;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
+import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
+import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes;
+import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.BucketInfo;
+import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeInfo;
+import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeList;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.utils.BatchOperation;
+import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
+import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.KSM_DB_NAME;
+import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
+    .OZONE_KSM_DB_CACHE_SIZE_DEFAULT;
+import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
+    .OZONE_KSM_DB_CACHE_SIZE_MB;
+
+/**
+ * KSM metadata manager interface.
+ */
+public class KSMMetadataManagerImpl implements KSMMetadataManager {
+
+  private final MetadataStore store;
+  private final ReadWriteLock lock;
+
+
+  public KSMMetadataManagerImpl(OzoneConfiguration conf) throws IOException {
+    File metaDir = OzoneUtils.getScmMetadirPath(conf);
+    final int cacheSize = conf.getInt(OZONE_KSM_DB_CACHE_SIZE_MB,
+        OZONE_KSM_DB_CACHE_SIZE_DEFAULT);
+    File ksmDBFile = new File(metaDir.getPath(), KSM_DB_NAME);
+    this.store = MetadataStoreBuilder.newBuilder()
+        .setConf(conf)
+        .setDbFile(ksmDBFile)
+        .setCacheSize(cacheSize * OzoneConsts.MB)
+        .build();
+    this.lock = new ReentrantReadWriteLock();
+  }
+
+  /**
+   * Start metadata manager.
+   */
+  @Override
+  public void start() {
+
+  }
+
+  /**
+   * Stop metadata manager.
+   */
+  @Override
+  public void stop() throws IOException {
+    if (store != null) {
+      store.close();
+    }
+  }
+
+  /**
+   * Get metadata store.
+   * @return store - metadata store.
+   */
+  @VisibleForTesting
+  @Override
+  public MetadataStore getStore() {
+    return store;
+  }
+
+  /**
+   * Given a volume return the corresponding DB key.
+   * @param volume - Volume name
+   */
+  public byte[] getVolumeKey(String volume) {
+    String dbVolumeName = OzoneConsts.KSM_VOLUME_PREFIX + volume;
+    return DFSUtil.string2Bytes(dbVolumeName);
+  }
+
+  /**
+   * Given a user return the corresponding DB key.
+   * @param user - User name
+   */
+  public byte[] getUserKey(String user) {
+    String dbUserName = OzoneConsts.KSM_USER_PREFIX + user;
+    return DFSUtil.string2Bytes(dbUserName);
+  }
+
+  /**
+   * Given a volume and bucket, return the corresponding DB key.
+   * @param volume - User name
+   * @param bucket - Bucket name
+   */
+  public byte[] getBucketKey(String volume, String bucket) {
+    String bucketKeyString = OzoneConsts.KSM_VOLUME_PREFIX + volume
+        + OzoneConsts.KSM_BUCKET_PREFIX + bucket;
+    return DFSUtil.string2Bytes(bucketKeyString);
+  }
+
+  private String getBucketKeyPrefix(String volume, String bucket) {
+    StringBuffer sb = new StringBuffer();
+    sb.append(OzoneConsts.KSM_VOLUME_PREFIX)
+        .append(volume)
+        .append(OzoneConsts.KSM_BUCKET_PREFIX);
+    if (!Strings.isNullOrEmpty(bucket)) {
+      sb.append(bucket);
+    }
+    return sb.toString();
+  }
+
+  private String getKeyKeyPrefix(String volume, String bucket, String key) {
+    String keyStr = getBucketKeyPrefix(volume, bucket);
+    keyStr = Strings.isNullOrEmpty(key) ? keyStr + OzoneConsts.KSM_KEY_PREFIX
+        : keyStr + OzoneConsts.KSM_KEY_PREFIX + key;
+    return keyStr;
+  }
+
+  @Override
+  public byte[] getDBKeyForKey(String volume, String bucket, String key) {
+    String keyKeyString = OzoneConsts.KSM_VOLUME_PREFIX + volume
+        + OzoneConsts.KSM_BUCKET_PREFIX + bucket + OzoneConsts.KSM_KEY_PREFIX
+        + key;
+    return DFSUtil.string2Bytes(keyKeyString);
+  }
+
+  @Override
+  public byte[] getDeletedKeyName(byte[] keyName) {
+    return DFSUtil.string2Bytes(
+        DELETING_KEY_PREFIX + DFSUtil.bytes2String(keyName));
+  }
+
+  /**
+   * Deletes the key on Metadata DB.
+   *
+   * @param key - key name
+   */
+  @Override
+  public void deleteKey(byte[] key) throws IOException {
+    store.delete(key);
+  }
+
+  /**
+   * Returns the read lock used on Metadata DB.
+   * @return readLock
+   */
+  @Override
+  public Lock readLock() {
+    return lock.readLock();
+  }
+
+  /**
+   * Returns the write lock used on Metadata DB.
+   * @return writeLock
+   */
+  @Override
+  public Lock writeLock() {
+    return lock.writeLock();
+  }
+
+  /**
+   * Returns the value associated with this key.
+   * @param key - key
+   * @return value
+   */
+  @Override
+  public byte[] get(byte[] key) throws IOException {
+    return store.get(key);
+  }
+
+  /**
+   * Puts a Key into Metadata DB.
+   * @param key   - key
+   * @param value - value
+   */
+  @Override
+  public void put(byte[] key, byte[] value) throws IOException {
+    store.put(key, value);
+  }
+
+  /**
+   * Deletes a Key from Metadata DB.
+   * @param key   - key
+   */
+  public void delete(byte[] key) throws IOException {
+    store.delete(key);
+  }
+
+  @Override
+  public void writeBatch(BatchOperation batch) throws IOException {
+    this.store.writeBatch(batch);
+  }
+
+  /**
+   * Given a volume, check if it is empty, i.e there are no buckets inside it.
+   * @param volume - Volume name
+   * @return true if the volume is empty
+   */
+  public boolean isVolumeEmpty(String volume) throws IOException {
+    String dbVolumeRootName = OzoneConsts.KSM_VOLUME_PREFIX + volume;
+    byte[] dbVolumeRootKey = DFSUtil.string2Bytes(dbVolumeRootName);
+    // Seek to the root of the volume and look for the next key
+    ImmutablePair<byte[], byte[]> volumeRoot =
+        store.peekAround(1, dbVolumeRootKey);
+    if (volumeRoot != null) {
+      String firstBucketKey = DFSUtil.bytes2String(volumeRoot.getKey());
+      return !firstBucketKey.startsWith(dbVolumeRootName
+          + OzoneConsts.KSM_BUCKET_PREFIX);
+    }
+    return true;
+  }
+
+  /**
+   * Given a volume/bucket, check if it is empty,
+   * i.e there are no keys inside it.
+   * @param volume - Volume name
+   * @param bucket - Bucket name
+   * @return true if the bucket is empty
+   */
+  public boolean isBucketEmpty(String volume, String bucket)
+      throws IOException {
+    String keyRootName = OzoneConsts.KSM_VOLUME_PREFIX + volume
+        + OzoneConsts.KSM_BUCKET_PREFIX + bucket;
+    byte[] keyRoot = DFSUtil.string2Bytes(keyRootName);
+    ImmutablePair<byte[], byte[]> firstKey = store.peekAround(1, keyRoot);
+    if (firstKey != null) {
+      return !DFSUtil.bytes2String(firstKey.getKey())
+          .startsWith(keyRootName + OzoneConsts.KSM_KEY_PREFIX);
+    }
+    return true;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public List<KsmBucketInfo> listBuckets(final String volumeName,
+      final String startBucket, final String bucketPrefix,
+      final int maxNumOfBuckets) throws IOException {
+    List<KsmBucketInfo> result = new ArrayList<>();
+    if (Strings.isNullOrEmpty(volumeName)) {
+      throw new KSMException("Volume name is required.",
+          ResultCodes.FAILED_VOLUME_NOT_FOUND);
+    }
+
+    byte[] volumeNameBytes = getVolumeKey(volumeName);
+    if (store.get(volumeNameBytes) == null) {
+      throw new KSMException("Volume " + volumeName + " not found.",
+          ResultCodes.FAILED_VOLUME_NOT_FOUND);
+    }
+
+
+    // A bucket must start with /volume/bucket_prefix
+    // and exclude keys /volume/bucket_xxx/key_xxx
+    MetadataKeyFilter filter = (preKey, currentKey, nextKey) -> {
+      if (currentKey != null) {
+        String bucketNamePrefix = getBucketKeyPrefix(volumeName, bucketPrefix);
+        String bucket = DFSUtil.bytes2String(currentKey);
+        return bucket.startsWith(bucketNamePrefix) &&
+            !bucket.replaceFirst(bucketNamePrefix, "")
+                .contains(OzoneConsts.KSM_KEY_PREFIX);
+      }
+      return false;
+    };
+
+    List<Map.Entry<byte[], byte[]>> rangeResult;
+    if (!Strings.isNullOrEmpty(startBucket)) {
+      //Since we are excluding start key from the result,
+      // the maxNumOfBuckets is incremented.
+      rangeResult = store.getRangeKVs(
+          getBucketKey(volumeName, startBucket),
+          maxNumOfBuckets + 1, filter);
+      //Remove start key from result.
+      rangeResult.remove(0);
+    } else {
+      rangeResult = store.getRangeKVs(null, maxNumOfBuckets, filter);
+    }
+
+    for (Map.Entry<byte[], byte[]> entry : rangeResult) {
+      KsmBucketInfo info = KsmBucketInfo.getFromProtobuf(
+          BucketInfo.parseFrom(entry.getValue()));
+      result.add(info);
+    }
+    return result;
+  }
+
+  @Override
+  public List<KsmKeyInfo> listKeys(String volumeName, String bucketName,
+      String startKey, String keyPrefix, int maxKeys) throws IOException {
+    List<KsmKeyInfo> result = new ArrayList<>();
+    if (Strings.isNullOrEmpty(volumeName)) {
+      throw new KSMException("Volume name is required.",
+          ResultCodes.FAILED_VOLUME_NOT_FOUND);
+    }
+
+    if (Strings.isNullOrEmpty(bucketName)) {
+      throw new KSMException("Bucket name is required.",
+          ResultCodes.FAILED_BUCKET_NOT_FOUND);
+    }
+
+    byte[] bucketNameBytes = getBucketKey(volumeName, bucketName);
+    if (store.get(bucketNameBytes) == null) {
+      throw new KSMException("Bucket " + bucketName + " not found.",
+          ResultCodes.FAILED_BUCKET_NOT_FOUND);
+    }
+
+    MetadataKeyFilter filter =
+        new KeyPrefixFilter(getKeyKeyPrefix(volumeName, bucketName, keyPrefix));
+
+    List<Map.Entry<byte[], byte[]>> rangeResult;
+    if (!Strings.isNullOrEmpty(startKey)) {
+      //Since we are excluding start key from the result,
+      // the maxNumOfBuckets is incremented.
+      rangeResult = store.getRangeKVs(
+          getDBKeyForKey(volumeName, bucketName, startKey),
+          maxKeys + 1, filter);
+      //Remove start key from result.
+      rangeResult.remove(0);
+    } else {
+      rangeResult = store.getRangeKVs(null, maxKeys, filter);
+    }
+
+    for (Map.Entry<byte[], byte[]> entry : rangeResult) {
+      KsmKeyInfo info = KsmKeyInfo.getFromProtobuf(
+          KeyInfo.parseFrom(entry.getValue()));
+      result.add(info);
+    }
+    return result;
+  }
+
+  @Override
+  public List<KsmVolumeArgs> listVolumes(String userName,
+      String prefix, String startKey, int maxKeys) throws IOException {
+    List<KsmVolumeArgs> result = Lists.newArrayList();
+    VolumeList volumes;
+    if (Strings.isNullOrEmpty(userName)) {
+      volumes = getAllVolumes();
+    } else {
+      volumes = getVolumesByUser(userName);
+    }
+
+    if (volumes == null || volumes.getVolumeNamesCount() == 0) {
+      return result;
+    }
+
+    boolean startKeyFound = Strings.isNullOrEmpty(startKey);
+    for (String volumeName : volumes.getVolumeNamesList()) {
+      if (!Strings.isNullOrEmpty(prefix)) {
+        if (!volumeName.startsWith(prefix)) {
+          continue;
+        }
+      }
+
+      if (!startKeyFound && volumeName.equals(startKey)) {
+        startKeyFound = true;
+        continue;
+      }
+      if (startKeyFound && result.size() < maxKeys) {
+        byte[] volumeInfo = store.get(this.getVolumeKey(volumeName));
+        if (volumeInfo == null) {
+          // Could not get volume info by given volume name,
+          // since the volume name is loaded from db,
+          // this probably means ksm db is corrupted or some entries are
+          // accidentally removed.
+          throw new KSMException("Volume info not found for " + volumeName,
+              ResultCodes.FAILED_VOLUME_NOT_FOUND);
+        }
+        VolumeInfo info = VolumeInfo.parseFrom(volumeInfo);
+        KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(info);
+        result.add(volumeArgs);
+      }
+    }
+
+    return result;
+  }
+
+  private VolumeList getVolumesByUser(String userName)
+      throws KSMException {
+    return getVolumesByUser(getUserKey(userName));
+  }
+
+  private VolumeList getVolumesByUser(byte[] userNameKey)
+      throws KSMException {
+    VolumeList volumes = null;
+    try {
+      byte[] volumesInBytes = store.get(userNameKey);
+      if (volumesInBytes == null) {
+        // No volume found for this user, return an empty list
+        return VolumeList.newBuilder().build();
+      }
+      volumes = VolumeList.parseFrom(volumesInBytes);
+    } catch (IOException e) {
+      throw new KSMException("Unable to get volumes info by the given user, "
+          + "metadata might be corrupted", e,
+          ResultCodes.FAILED_METADATA_ERROR);
+    }
+    return volumes;
+  }
+
+  private VolumeList getAllVolumes() throws IOException {
+    // Scan all users in database
+    KeyPrefixFilter filter = new KeyPrefixFilter(OzoneConsts.KSM_USER_PREFIX);
+    // We are not expecting a huge number of users per cluster,
+    // it should be fine to scan all users in db and return us a
+    // list of volume names in string per user.
+    List<Map.Entry<byte[], byte[]>> rangeKVs = store
+        .getRangeKVs(null, Integer.MAX_VALUE, filter);
+
+    VolumeList.Builder builder = VolumeList.newBuilder();
+    for (Map.Entry<byte[], byte[]> entry : rangeKVs) {
+      VolumeList volumes = this.getVolumesByUser(entry.getKey());
+      builder.addAllVolumeNames(volumes.getVolumeNamesList());
+    }
+
+    return builder.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6215afcd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
index f259d85..aea7b31 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
@@ -50,11 +50,11 @@ public class KeyManagerImpl implements KeyManager {
    * A SCM block client, used to talk to SCM to allocate block during putKey.
    */
   private final ScmBlockLocationProtocol scmBlockClient;
-  private final MetadataManager metadataManager;
+  private final KSMMetadataManager metadataManager;
   private final long scmBlockSize;
 
   public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
-      MetadataManager metadataManager, OzoneConfiguration conf) {
+      KSMMetadataManager metadataManager, OzoneConfiguration conf) {
     this.scmBlockClient = scmBlockClient;
     this.metadataManager = metadataManager;
     this.scmBlockSize = conf.getLong(OZONE_SCM_BLOCK_SIZE_KEY,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6215afcd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
index 96493fd..3b76e5c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
@@ -78,7 +78,7 @@ public class KeySpaceManager extends ServiceRuntimeInfoImpl
 
   private final RPC.Server ksmRpcServer;
   private final InetSocketAddress ksmRpcAddress;
-  private final MetadataManager metadataManager;
+  private final KSMMetadataManager metadataManager;
   private final VolumeManager volumeManager;
   private final BucketManager bucketManager;
   private final KeyManager keyManager;
@@ -102,7 +102,7 @@ public class KeySpaceManager extends ServiceRuntimeInfoImpl
         handlerCount);
     ksmRpcAddress = OzoneClientUtils.updateRPCListenAddress(conf,
         OZONE_KSM_ADDRESS_KEY, ksmNodeRpcAddr, ksmRpcServer);
-    metadataManager = new MetadataManagerImpl(conf);
+    metadataManager = new KSMMetadataManagerImpl(conf);
     volumeManager = new VolumeManagerImpl(metadataManager, conf);
     bucketManager = new BucketManagerImpl(metadataManager);
     metrics = KSMMetrics.create();
@@ -168,7 +168,7 @@ public class KeySpaceManager extends ServiceRuntimeInfoImpl
    * Get metadata manager.
    * @return metadata manager.
    */
-  public MetadataManager getMetadataManager() {
+  public KSMMetadataManager getMetadataManager() {
     return metadataManager;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6215afcd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java
deleted file mode 100644
index a08898f..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.ozone.ksm;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
-import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
-import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
-import org.apache.hadoop.utils.BatchOperation;
-import org.apache.hadoop.utils.MetadataStore;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.locks.Lock;
-
-/**
- * KSM metadata manager interface.
- */
-public interface MetadataManager {
-  /**
-   * Start metadata manager.
-   */
-  void start();
-
-  /**
-   * Stop metadata manager.
-   */
-  void stop() throws IOException;
-
-  /**
-   * Get metadata store.
-   * @return metadata store.
-   */
-  @VisibleForTesting
-  MetadataStore getStore();
-
-  /**
-   * Returns the read lock used on Metadata DB.
-   * @return readLock
-   */
-  Lock readLock();
-
-  /**
-   * Returns the write lock used on Metadata DB.
-   * @return writeLock
-   */
-  Lock writeLock();
-
-  /**
-   * Returns the value associated with this key.
-   * @param key - key
-   * @return value
-   */
-  byte[] get(byte[] key) throws IOException;
-
-  /**
-   * Puts a Key into Metadata DB.
-   * @param key   - key
-   * @param value - value
-   */
-  void put(byte[] key, byte[] value) throws IOException;
-
-  /**
-   * Deletes a Key from Metadata DB.
-   * @param key   - key
-   */
-  void delete(byte[] key) throws IOException;
-
-  /**
-   * Atomic write a batch of operations.
-   * @param batch
-   * @throws IOException
-   */
-  void writeBatch(BatchOperation batch) throws IOException;
-
-  /**
-   * Given a volume return the corresponding DB key.
-   * @param volume - Volume name
-   */
-  byte[] getVolumeKey(String volume);
-
-  /**
-   * Given a user return the corresponding DB key.
-   * @param user - User name
-   */
-  byte[] getUserKey(String user);
-
-  /**
-   * Given a volume and bucket, return the corresponding DB key.
-   * @param volume - User name
-   * @param bucket - Bucket name
-   */
-  byte[] getBucketKey(String volume, String bucket);
-
-  /**
-   * Given a volume, bucket and a key, return the corresponding DB key.
-   * @param volume - volume name
-   * @param bucket - bucket name
-   * @param key - key name
-   * @return bytes of DB key.
-   */
-  byte[] getDBKeyForKey(String volume, String bucket, String key);
-
-  /**
-   * Returns the DB key name of a deleted key in KSM metadata store.
-   * The name for a deleted key has prefix #deleting# followed by
-   * the actual key name.
-   * @param keyName - key name
-   * @return bytes of DB key.
-   */
-  byte[] getDeletedKeyName(byte[] keyName);
-
-  /**
-   * Deletes the key from DB.
-   *
-   * @param key - key name
-   */
-  void deleteKey(byte[] key) throws IOException;
-
-  /**
-   * Given a volume, check if it is empty,
-   * i.e there are no buckets inside it.
-   * @param volume - Volume name
-   */
-  boolean isVolumeEmpty(String volume) throws IOException;
-
-  /**
-   * Given a volume/bucket, check if it is empty,
-   * i.e there are no keys inside it.
-   * @param volume - Volume name
-   * @param  bucket - Bucket name
-   * @return true if the bucket is empty
-   */
-  boolean isBucketEmpty(String volume, String bucket) throws IOException;
-
-  /**
-   * Returns a list of buckets represented by {@link KsmBucketInfo}
-   * in the given volume.
-   *
-   * @param volumeName
-   *   the name of the volume. This argument is required,
-   *   this method returns buckets in this given volume.
-   * @param startBucket
-   *   the start bucket name. Only the buckets whose name is
-   *   after this value will be included in the result.
-   *   This key is excluded from the result.
-   * @param bucketPrefix
-   *   bucket name prefix. Only the buckets whose name has
-   *   this prefix will be included in the result.
-   * @param maxNumOfBuckets
-   *   the maximum number of buckets to return. It ensures
-   *   the size of the result will not exceed this limit.
-   * @return a list of buckets.
-   * @throws IOException
-   */
-  List<KsmBucketInfo> listBuckets(String volumeName, String startBucket,
-      String bucketPrefix, int maxNumOfBuckets) throws IOException;
-
-  /**
-   * Returns a list of keys represented by {@link KsmKeyInfo}
-   * in the given bucket.
-   *
-   * @param volumeName
-   *   the name of the volume.
-   * @param bucketName
-   *   the name of the bucket.
-   * @param startKey
-   *   the start key name, only the keys whose name is
-   *   after this value will be included in the result.
-   *   This key is excluded from the result.
-   * @param keyPrefix
-   *   key name prefix, only the keys whose name has
-   *   this prefix will be included in the result.
-   * @param maxKeys
-   *   the maximum number of keys to return. It ensures
-   *   the size of the result will not exceed this limit.
-   * @return a list of keys.
-   * @throws IOException
-   */
-  List<KsmKeyInfo> listKeys(String volumeName,
-      String bucketName, String startKey, String keyPrefix, int maxKeys)
-      throws IOException;
-
-  /**
-   * Returns a list of volumes owned by a given user; if user is null,
-   * returns all volumes.
-   *
-   * @param userName
-   *   volume owner
-   * @param prefix
-   *   the volume prefix used to filter the listing result.
-   * @param startKey
-   *   the start volume name determines where to start listing from,
-   *   this key is excluded from the result.
-   * @param maxKeys
-   *   the maximum number of volumes to return.
-   * @return a list of {@link KsmVolumeArgs}
-   * @throws IOException
-   */
-  List<KsmVolumeArgs> listVolumes(String userName, String prefix,
-      String startKey, int maxKeys) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6215afcd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java
deleted file mode 100644
index 6be4675..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java
+++ /dev/null
@@ -1,453 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.ozone.ksm;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
-import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
-import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
-import org.apache.hadoop.ozone.OzoneConfiguration;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
-import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes;
-import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.BucketInfo;
-import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo;
-import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeInfo;
-import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeList;
-import org.apache.hadoop.ozone.web.utils.OzoneUtils;
-import org.apache.hadoop.utils.BatchOperation;
-import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
-import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
-import org.apache.hadoop.utils.MetadataStore;
-import org.apache.hadoop.utils.MetadataStoreBuilder;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX;
-import static org.apache.hadoop.ozone.OzoneConsts.KSM_DB_NAME;
-import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
-    .OZONE_KSM_DB_CACHE_SIZE_DEFAULT;
-import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
-    .OZONE_KSM_DB_CACHE_SIZE_MB;
-
-/**
- * KSM metadata manager interface.
- */
-public class MetadataManagerImpl implements MetadataManager {
-
-  private final MetadataStore store;
-  private final ReadWriteLock lock;
-
-
-  public MetadataManagerImpl(OzoneConfiguration conf) throws IOException {
-    File metaDir = OzoneUtils.getScmMetadirPath(conf);
-    final int cacheSize = conf.getInt(OZONE_KSM_DB_CACHE_SIZE_MB,
-        OZONE_KSM_DB_CACHE_SIZE_DEFAULT);
-    File ksmDBFile = new File(metaDir.getPath(), KSM_DB_NAME);
-    this.store = MetadataStoreBuilder.newBuilder()
-        .setConf(conf)
-        .setDbFile(ksmDBFile)
-        .setCacheSize(cacheSize * OzoneConsts.MB)
-        .build();
-    this.lock = new ReentrantReadWriteLock();
-  }
-
-  /**
-   * Start metadata manager.
-   */
-  @Override
-  public void start() {
-
-  }
-
-  /**
-   * Stop metadata manager.
-   */
-  @Override
-  public void stop() throws IOException {
-    if (store != null) {
-      store.close();
-    }
-  }
-
-  /**
-   * Get metadata store.
-   * @return store - metadata store.
-   */
-  @VisibleForTesting
-  @Override
-  public MetadataStore getStore() {
-    return store;
-  }
-
-  /**
-   * Given a volume return the corresponding DB key.
-   * @param volume - Volume name
-   */
-  public byte[] getVolumeKey(String volume) {
-    String dbVolumeName = OzoneConsts.KSM_VOLUME_PREFIX + volume;
-    return DFSUtil.string2Bytes(dbVolumeName);
-  }
-
-  /**
-   * Given a user return the corresponding DB key.
-   * @param user - User name
-   */
-  public byte[] getUserKey(String user) {
-    String dbUserName = OzoneConsts.KSM_USER_PREFIX + user;
-    return DFSUtil.string2Bytes(dbUserName);
-  }
-
-  /**
-   * Given a volume and bucket, return the corresponding DB key.
-   * @param volume - User name
-   * @param bucket - Bucket name
-   */
-  public byte[] getBucketKey(String volume, String bucket) {
-    String bucketKeyString = OzoneConsts.KSM_VOLUME_PREFIX + volume
-        + OzoneConsts.KSM_BUCKET_PREFIX + bucket;
-    return DFSUtil.string2Bytes(bucketKeyString);
-  }
-
-  private String getBucketKeyPrefix(String volume, String bucket) {
-    StringBuffer sb = new StringBuffer();
-    sb.append(OzoneConsts.KSM_VOLUME_PREFIX)
-        .append(volume)
-        .append(OzoneConsts.KSM_BUCKET_PREFIX);
-    if (!Strings.isNullOrEmpty(bucket)) {
-      sb.append(bucket);
-    }
-    return sb.toString();
-  }
-
-  private String getKeyKeyPrefix(String volume, String bucket, String key) {
-    String keyStr = getBucketKeyPrefix(volume, bucket);
-    keyStr = Strings.isNullOrEmpty(key) ? keyStr + OzoneConsts.KSM_KEY_PREFIX
-        : keyStr + OzoneConsts.KSM_KEY_PREFIX + key;
-    return keyStr;
-  }
-
-  @Override
-  public byte[] getDBKeyForKey(String volume, String bucket, String key) {
-    String keyKeyString = OzoneConsts.KSM_VOLUME_PREFIX + volume
-        + OzoneConsts.KSM_BUCKET_PREFIX + bucket + OzoneConsts.KSM_KEY_PREFIX
-        + key;
-    return DFSUtil.string2Bytes(keyKeyString);
-  }
-
-  @Override
-  public byte[] getDeletedKeyName(byte[] keyName) {
-    return DFSUtil.string2Bytes(
-        DELETING_KEY_PREFIX + DFSUtil.bytes2String(keyName));
-  }
-
-  /**
-   * Deletes the key on Metadata DB.
-   *
-   * @param key - key name
-   */
-  @Override
-  public void deleteKey(byte[] key) throws IOException {
-    store.delete(key);
-  }
-
-  /**
-   * Returns the read lock used on Metadata DB.
-   * @return readLock
-   */
-  @Override
-  public Lock readLock() {
-    return lock.readLock();
-  }
-
-  /**
-   * Returns the write lock used on Metadata DB.
-   * @return writeLock
-   */
-  @Override
-  public Lock writeLock() {
-    return lock.writeLock();
-  }
-
-  /**
-   * Returns the value associated with this key.
-   * @param key - key
-   * @return value
-   */
-  @Override
-  public byte[] get(byte[] key) throws IOException {
-    return store.get(key);
-  }
-
-  /**
-   * Puts a Key into Metadata DB.
-   * @param key   - key
-   * @param value - value
-   */
-  @Override
-  public void put(byte[] key, byte[] value) throws IOException {
-    store.put(key, value);
-  }
-
-  /**
-   * Deletes a Key from Metadata DB.
-   * @param key   - key
-   */
-  public void delete(byte[] key) throws IOException {
-    store.delete(key);
-  }
-
-  @Override
-  public void writeBatch(BatchOperation batch) throws IOException {
-    this.store.writeBatch(batch);
-  }
-
-  /**
-   * Given a volume, check if it is empty, i.e there are no buckets inside it.
-   * @param volume - Volume name
-   * @return true if the volume is empty
-   */
-  public boolean isVolumeEmpty(String volume) throws IOException {
-    String dbVolumeRootName = OzoneConsts.KSM_VOLUME_PREFIX + volume;
-    byte[] dbVolumeRootKey = DFSUtil.string2Bytes(dbVolumeRootName);
-    // Seek to the root of the volume and look for the next key
-    ImmutablePair<byte[], byte[]> volumeRoot =
-        store.peekAround(1, dbVolumeRootKey);
-    if (volumeRoot != null) {
-      String firstBucketKey = DFSUtil.bytes2String(volumeRoot.getKey());
-      return !firstBucketKey.startsWith(dbVolumeRootName
-          + OzoneConsts.KSM_BUCKET_PREFIX);
-    }
-    return true;
-  }
-
-  /**
-   * Given a volume/bucket, check if it is empty,
-   * i.e there are no keys inside it.
-   * @param volume - Volume name
-   * @param bucket - Bucket name
-   * @return true if the bucket is empty
-   */
-  public boolean isBucketEmpty(String volume, String bucket)
-      throws IOException {
-    String keyRootName = OzoneConsts.KSM_VOLUME_PREFIX + volume
-        + OzoneConsts.KSM_BUCKET_PREFIX + bucket;
-    byte[] keyRoot = DFSUtil.string2Bytes(keyRootName);
-    ImmutablePair<byte[], byte[]> firstKey = store.peekAround(1, keyRoot);
-    if (firstKey != null) {
-      return !DFSUtil.bytes2String(firstKey.getKey())
-          .startsWith(keyRootName + OzoneConsts.KSM_KEY_PREFIX);
-    }
-    return true;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public List<KsmBucketInfo> listBuckets(final String volumeName,
-      final String startBucket, final String bucketPrefix,
-      final int maxNumOfBuckets) throws IOException {
-    List<KsmBucketInfo> result = new ArrayList<>();
-    if (Strings.isNullOrEmpty(volumeName)) {
-      throw new KSMException("Volume name is required.",
-          ResultCodes.FAILED_VOLUME_NOT_FOUND);
-    }
-
-    byte[] volumeNameBytes = getVolumeKey(volumeName);
-    if (store.get(volumeNameBytes) == null) {
-      throw new KSMException("Volume " + volumeName + " not found.",
-          ResultCodes.FAILED_VOLUME_NOT_FOUND);
-    }
-
-
-    // A bucket must start with /volume/bucket_prefix
-    // and exclude keys /volume/bucket_xxx/key_xxx
-    MetadataKeyFilter filter = (preKey, currentKey, nextKey) -> {
-      if (currentKey != null) {
-        String bucketNamePrefix = getBucketKeyPrefix(volumeName, bucketPrefix);
-        String bucket = DFSUtil.bytes2String(currentKey);
-        return bucket.startsWith(bucketNamePrefix) &&
-            !bucket.replaceFirst(bucketNamePrefix, "")
-                .contains(OzoneConsts.KSM_KEY_PREFIX);
-      }
-      return false;
-    };
-
-    List<Map.Entry<byte[], byte[]>> rangeResult;
-    if (!Strings.isNullOrEmpty(startBucket)) {
-      //Since we are excluding start key from the result,
-      // the maxNumOfBuckets is incremented.
-      rangeResult = store.getRangeKVs(
-          getBucketKey(volumeName, startBucket),
-          maxNumOfBuckets + 1, filter);
-      //Remove start key from result.
-      rangeResult.remove(0);
-    } else {
-      rangeResult = store.getRangeKVs(null, maxNumOfBuckets, filter);
-    }
-
-    for (Map.Entry<byte[], byte[]> entry : rangeResult) {
-      KsmBucketInfo info = KsmBucketInfo.getFromProtobuf(
-          BucketInfo.parseFrom(entry.getValue()));
-      result.add(info);
-    }
-    return result;
-  }
-
-  @Override
-  public List<KsmKeyInfo> listKeys(String volumeName, String bucketName,
-      String startKey, String keyPrefix, int maxKeys) throws IOException {
-    List<KsmKeyInfo> result = new ArrayList<>();
-    if (Strings.isNullOrEmpty(volumeName)) {
-      throw new KSMException("Volume name is required.",
-          ResultCodes.FAILED_VOLUME_NOT_FOUND);
-    }
-
-    if (Strings.isNullOrEmpty(bucketName)) {
-      throw new KSMException("Bucket name is required.",
-          ResultCodes.FAILED_BUCKET_NOT_FOUND);
-    }
-
-    byte[] bucketNameBytes = getBucketKey(volumeName, bucketName);
-    if (store.get(bucketNameBytes) == null) {
-      throw new KSMException("Bucket " + bucketName + " not found.",
-          ResultCodes.FAILED_BUCKET_NOT_FOUND);
-    }
-
-    MetadataKeyFilter filter =
-        new KeyPrefixFilter(getKeyKeyPrefix(volumeName, bucketName, keyPrefix));
-
-    List<Map.Entry<byte[], byte[]>> rangeResult;
-    if (!Strings.isNullOrEmpty(startKey)) {
-      //Since we are excluding start key from the result,
-      // the maxNumOfBuckets is incremented.
-      rangeResult = store.getRangeKVs(
-          getDBKeyForKey(volumeName, bucketName, startKey),
-          maxKeys + 1, filter);
-      //Remove start key from result.
-      rangeResult.remove(0);
-    } else {
-      rangeResult = store.getRangeKVs(null, maxKeys, filter);
-    }
-
-    for (Map.Entry<byte[], byte[]> entry : rangeResult) {
-      KsmKeyInfo info = KsmKeyInfo.getFromProtobuf(
-          KeyInfo.parseFrom(entry.getValue()));
-      result.add(info);
-    }
-    return result;
-  }
-
-  @Override
-  public List<KsmVolumeArgs> listVolumes(String userName,
-      String prefix, String startKey, int maxKeys) throws IOException {
-    List<KsmVolumeArgs> result = Lists.newArrayList();
-    VolumeList volumes;
-    if (Strings.isNullOrEmpty(userName)) {
-      volumes = getAllVolumes();
-    } else {
-      volumes = getVolumesByUser(userName);
-    }
-
-    if (volumes == null || volumes.getVolumeNamesCount() == 0) {
-      return result;
-    }
-
-    boolean startKeyFound = Strings.isNullOrEmpty(startKey);
-    for (String volumeName : volumes.getVolumeNamesList()) {
-      if (!Strings.isNullOrEmpty(prefix)) {
-        if (!volumeName.startsWith(prefix)) {
-          continue;
-        }
-      }
-
-      if (!startKeyFound && volumeName.equals(startKey)) {
-        startKeyFound = true;
-        continue;
-      }
-      if (startKeyFound && result.size() < maxKeys) {
-        byte[] volumeInfo = store.get(this.getVolumeKey(volumeName));
-        if (volumeInfo == null) {
-          // Could not get volume info by given volume name,
-          // since the volume name is loaded from db,
-          // this probably means ksm db is corrupted or some entries are
-          // accidentally removed.
-          throw new KSMException("Volume info not found for " + volumeName,
-              ResultCodes.FAILED_VOLUME_NOT_FOUND);
-        }
-        VolumeInfo info = VolumeInfo.parseFrom(volumeInfo);
-        KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(info);
-        result.add(volumeArgs);
-      }
-    }
-
-    return result;
-  }
-
-  private VolumeList getVolumesByUser(String userName)
-      throws KSMException {
-    return getVolumesByUser(getUserKey(userName));
-  }
-
-  private VolumeList getVolumesByUser(byte[] userNameKey)
-      throws KSMException {
-    VolumeList volumes = null;
-    try {
-      byte[] volumesInBytes = store.get(userNameKey);
-      if (volumesInBytes == null) {
-        // No volume found for this user, return an empty list
-        return VolumeList.newBuilder().build();
-      }
-      volumes = VolumeList.parseFrom(volumesInBytes);
-    } catch (IOException e) {
-      throw new KSMException("Unable to get volumes info by the given user, "
-          + "metadata might be corrupted", e,
-          ResultCodes.FAILED_METADATA_ERROR);
-    }
-    return volumes;
-  }
-
-  private VolumeList getAllVolumes() throws IOException {
-    // Scan all users in database
-    KeyPrefixFilter filter = new KeyPrefixFilter(OzoneConsts.KSM_USER_PREFIX);
-    // We are not expecting a huge number of users per cluster,
-    // it should be fine to scan all users in db and return us a
-    // list of volume names in string per user.
-    List<Map.Entry<byte[], byte[]>> rangeKVs = store
-        .getRangeKVs(null, Integer.MAX_VALUE, filter);
-
-    VolumeList.Builder builder = VolumeList.newBuilder();
-    for (Map.Entry<byte[], byte[]> entry : rangeKVs) {
-      VolumeList volumes = this.getVolumesByUser(entry.getKey());
-      builder.addAllVolumeNames(volumes.getVolumeNamesList());
-    }
-
-    return builder.build();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6215afcd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
index 1dab4aa..8ab5d68 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
@@ -46,7 +46,7 @@ public class VolumeManagerImpl implements VolumeManager {
   private static final Logger LOG =
       LoggerFactory.getLogger(VolumeManagerImpl.class);
 
-  private final MetadataManager metadataManager;
+  private final KSMMetadataManager metadataManager;
   private final int maxUserVolumeCount;
 
   /**
@@ -54,7 +54,7 @@ public class VolumeManagerImpl implements VolumeManager {
    * @param conf - Ozone configuration.
    * @throws IOException
    */
-  public VolumeManagerImpl(MetadataManager metadataManager,
+  public VolumeManagerImpl(KSMMetadataManager metadataManager,
       OzoneConfiguration conf) throws IOException {
     this.metadataManager = metadataManager;
     this.maxUserVolumeCount = conf.getInt(OZONE_KSM_USER_MAX_VOLUME,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6215afcd/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestBucketManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestBucketManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestBucketManagerImpl.java
index 852c1ff..b1a1606 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestBucketManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestBucketManagerImpl.java
@@ -47,16 +47,16 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import static org.mockito.Mockito.any;
 
 /**
- * Tests BucketManagerImpl, mocks MetadataManager for testing.
+ * Tests BucketManagerImpl, mocks KSMMetadataManager for testing.
  */
 @RunWith(MockitoJUnitRunner.class)
 public class TestBucketManagerImpl {
   @Rule
   public ExpectedException thrown = ExpectedException.none();
 
-  private MetadataManager getMetadataManagerMock(String... volumesToCreate)
+  private KSMMetadataManager getMetadataManagerMock(String... volumesToCreate)
       throws IOException {
-    MetadataManager metadataManager = Mockito.mock(MetadataManager.class);
+    KSMMetadataManager metadataManager = Mockito.mock(KSMMetadataManager.class);
     Map<String, byte[]> metadataDB = new HashMap<>();
     ReadWriteLock lock = new ReentrantReadWriteLock();
 
@@ -133,7 +133,7 @@ public class TestBucketManagerImpl {
   @Test
   public void testCreateBucketWithoutVolume() throws IOException {
     thrown.expectMessage("Volume doesn't exist");
-    MetadataManager metaMgr = getMetadataManagerMock();
+    KSMMetadataManager metaMgr = getMetadataManagerMock();
     try {
       BucketManager bucketManager = new BucketManagerImpl(metaMgr);
       KsmBucketInfo bucketInfo = KsmBucketInfo.newBuilder()
@@ -150,7 +150,7 @@ public class TestBucketManagerImpl {
 
   @Test
   public void testCreateBucket() throws IOException {
-    MetadataManager metaMgr = getMetadataManagerMock("sampleVol");
+    KSMMetadataManager metaMgr = getMetadataManagerMock("sampleVol");
     BucketManager bucketManager = new BucketManagerImpl(metaMgr);
     KsmBucketInfo bucketInfo = KsmBucketInfo.newBuilder()
         .setVolumeName("sampleVol")
@@ -163,7 +163,7 @@ public class TestBucketManagerImpl {
   @Test
   public void testCreateAlreadyExistingBucket() throws IOException {
     thrown.expectMessage("Bucket already exist");
-    MetadataManager metaMgr = getMetadataManagerMock("sampleVol");
+    KSMMetadataManager metaMgr = getMetadataManagerMock("sampleVol");
     try {
       BucketManager bucketManager = new BucketManagerImpl(metaMgr);
       KsmBucketInfo bucketInfo = KsmBucketInfo.newBuilder()
@@ -183,7 +183,7 @@ public class TestBucketManagerImpl {
   public void testGetBucketInfoForInvalidBucket() throws IOException {
     thrown.expectMessage("Bucket not found");
     try {
-      MetadataManager metaMgr = getMetadataManagerMock("sampleVol");
+      KSMMetadataManager metaMgr = getMetadataManagerMock("sampleVol");
       BucketManager bucketManager = new BucketManagerImpl(metaMgr);
       bucketManager.getBucketInfo("sampleVol", "bucketOne");
     } catch(KSMException ksmEx) {
@@ -195,7 +195,7 @@ public class TestBucketManagerImpl {
 
   @Test
   public void testGetBucketInfo() throws IOException {
-    MetadataManager metaMgr = getMetadataManagerMock("sampleVol");
+    KSMMetadataManager metaMgr = getMetadataManagerMock("sampleVol");
     BucketManager bucketManager = new BucketManagerImpl(metaMgr);
     KsmBucketInfo bucketInfo = KsmBucketInfo.newBuilder()
         .setVolumeName("sampleVol")
@@ -215,7 +215,7 @@ public class TestBucketManagerImpl {
 
   @Test
   public void testSetBucketPropertyAddACL() throws IOException {
-    MetadataManager metaMgr = getMetadataManagerMock("sampleVol");
+    KSMMetadataManager metaMgr = getMetadataManagerMock("sampleVol");
     List<OzoneAcl> acls = new LinkedList<>();
     OzoneAcl ozoneAcl = new OzoneAcl(OzoneAcl.OzoneACLType.USER,
         "root", OzoneAcl.OzoneACLRights.READ);
@@ -252,7 +252,7 @@ public class TestBucketManagerImpl {
 
   @Test
   public void testSetBucketPropertyRemoveACL() throws IOException {
-    MetadataManager metaMgr = getMetadataManagerMock("sampleVol");
+    KSMMetadataManager metaMgr = getMetadataManagerMock("sampleVol");
     List<OzoneAcl> acls = new LinkedList<>();
     OzoneAcl aclOne = new OzoneAcl(OzoneAcl.OzoneACLType.USER,
         "root", OzoneAcl.OzoneACLRights.READ);
@@ -288,7 +288,7 @@ public class TestBucketManagerImpl {
 
   @Test
   public void testSetBucketPropertyChangeStorageType() throws IOException {
-    MetadataManager metaMgr = getMetadataManagerMock("sampleVol");
+    KSMMetadataManager metaMgr = getMetadataManagerMock("sampleVol");
     BucketManager bucketManager = new BucketManagerImpl(metaMgr);
     KsmBucketInfo bucketInfo = KsmBucketInfo.newBuilder()
         .setVolumeName("sampleVol")
@@ -314,7 +314,7 @@ public class TestBucketManagerImpl {
 
   @Test
   public void testSetBucketPropertyChangeVersioning() throws IOException {
-    MetadataManager metaMgr = getMetadataManagerMock("sampleVol");
+    KSMMetadataManager metaMgr = getMetadataManagerMock("sampleVol");
     BucketManager bucketManager = new BucketManagerImpl(metaMgr);
     KsmBucketInfo bucketInfo = KsmBucketInfo.newBuilder()
         .setVolumeName("sampleVol")
@@ -339,7 +339,7 @@ public class TestBucketManagerImpl {
   @Test
   public void testDeleteBucket() throws IOException {
     thrown.expectMessage("Bucket not found");
-    MetadataManager metaMgr = getMetadataManagerMock("sampleVol");
+    KSMMetadataManager metaMgr = getMetadataManagerMock("sampleVol");
     BucketManager bucketManager = new BucketManagerImpl(metaMgr);
     for(int i = 0; i < 5; i++) {
       KsmBucketInfo bucketInfo = KsmBucketInfo.newBuilder()
@@ -372,7 +372,7 @@ public class TestBucketManagerImpl {
   @Test
   public void testDeleteNonEmptyBucket() throws IOException {
     thrown.expectMessage("Bucket is not empty");
-    MetadataManager metaMgr = getMetadataManagerMock("sampleVol");
+    KSMMetadataManager metaMgr = getMetadataManagerMock("sampleVol");
     BucketManager bucketManager = new BucketManagerImpl(metaMgr);
     KsmBucketInfo bucketInfo = KsmBucketInfo.newBuilder()
         .setVolumeName("sampleVol")


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org