You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2017/05/18 23:00:05 UTC

hadoop git commit: HDFS-11775. Ozone: KSM: add createBucket. Contributed by Nandakumar Vadivelu.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 63cafc0e2 -> 002bb5faa


HDFS-11775. Ozone: KSM: add createBucket. Contributed by Nandakumar Vadivelu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/002bb5fa
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/002bb5fa
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/002bb5fa

Branch: refs/heads/HDFS-7240
Commit: 002bb5faa844f5c8e3811216083f2aba12ad25e2
Parents: 63cafc0
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Thu May 18 13:39:21 2017 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Thu May 18 13:39:21 2017 -0700

----------------------------------------------------------------------
 .../hadoop/ksm/helpers/KsmBucketArgs.java       | 227 +++++++++++++++++++
 .../ksm/protocol/KeySpaceManagerProtocol.java   | 106 +++++++++
 .../ksm/protocol/KeyspaceManagerProtocol.java   |  97 --------
 ...ceManagerProtocolClientSideTranslatorPB.java |  39 +++-
 .../protocolPB/KeySpaceManagerProtocolPB.java   |   7 +-
 .../main/proto/KeySpaceManagerProtocol.proto    |  48 +++-
 .../org/apache/hadoop/ozone/OzoneConsts.java    |   5 +
 .../apache/hadoop/ozone/ksm/BucketManager.java  |  31 +++
 .../hadoop/ozone/ksm/BucketManagerImpl.java     | 111 +++++++++
 .../org/apache/hadoop/ozone/ksm/KSMMetrics.java |  21 ++
 .../hadoop/ozone/ksm/KeySpaceManager.java       |  47 +++-
 .../hadoop/ozone/ksm/MetadataManager.java       |  71 ++++++
 .../hadoop/ozone/ksm/MetadataManagerImpl.java   | 133 +++++++++++
 .../apache/hadoop/ozone/ksm/VolumeManager.java  |   9 -
 .../hadoop/ozone/ksm/VolumeManagerImpl.java     |  67 ++----
 .../ozone/ksm/exceptions/KSMException.java      |   2 +
 .../hadoop/ozone/protocolPB/KSMPBHelper.java    |  76 +++++++
 ...ceManagerProtocolServerSideTranslatorPB.java | 163 +++++++++++++
 ...ceManagerProtocolServerSideTranslatorPB.java | 134 -----------
 .../web/storage/DistributedStorageHandler.java  |  68 ++++--
 .../hadoop/ozone/ksm/TestBucketManagerImpl.java | 138 +++++++++++
 21 files changed, 1271 insertions(+), 329 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/002bb5fa/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmBucketArgs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmBucketArgs.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmBucketArgs.java
new file mode 100644
index 0000000..1ef64d0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmBucketArgs.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ksm.helpers;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.BucketInfo;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.OzoneAclInfo;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A class that encapsulates Bucket Arguments.
+ */
+public final class KsmBucketArgs {
+  /**
+   * Name of the volume in which the bucket belongs to.
+   */
+  private final String volumeName;
+  /**
+   * Name of the bucket.
+   */
+  private final String bucketName;
+  /**
+   * ACL's that are to be added for the bucket.
+   */
+  private List<OzoneAclInfo> addAcls;
+  /**
+   * ACL's that are to be removed from the bucket.
+   */
+  private List<OzoneAclInfo> removeAcls;
+  /**
+   * Bucket Version flag.
+   */
+  private boolean isVersionEnabled;
+  /**
+   * Type of storage to be used for this bucket.
+   * [RAM_DISK, SSD, DISK, ARCHIVE]
+   */
+  private StorageType storageType;
+
+  /**
+   * Private constructor, constructed via builder.
+   * @param volumeName - Volume name.
+   * @param bucketName - Bucket name.
+   * @param addAcls - ACL's to be added.
+   * @param removeAcls - ACL's to be removed.
+   * @param isVersionEnabled - Bucket version flag.
+   * @param storageType - Storage type to be used.
+   */
+  private KsmBucketArgs(String volumeName, String bucketName,
+      List<OzoneAclInfo> addAcls, List<OzoneAclInfo> removeAcls,
+      boolean isVersionEnabled, StorageType storageType) {
+    this.volumeName = volumeName;
+    this.bucketName = bucketName;
+    this.addAcls = addAcls;
+    this.removeAcls = removeAcls;
+    this.isVersionEnabled = isVersionEnabled;
+    this.storageType = storageType;
+  }
+
+  /**
+   * Returns the Volume Name.
+   * @return String.
+   */
+  public String getVolumeName() {
+    return volumeName;
+  }
+
+  /**
+   * Returns the Bucket Name.
+   * @return String
+   */
+  public String getBucketName() {
+    return bucketName;
+  }
+
+  /**
+   * Returns the ACL's that are to be added.
+   * @return List<OzoneAcl>
+   */
+  public List<OzoneAclInfo> getAddAcls() {
+    return addAcls;
+  }
+
+  /**
+   * Returns the ACL's that are to be removed.
+   * @return List<OzoneAcl>
+   */
+  public List<OzoneAclInfo> getRemoveAcls() {
+    return removeAcls;
+  }
+
+  /**
+   * Returns true if bucket version is enabled, else false.
+   * @return isVersionEnabled
+   */
+  public boolean getIsVersionEnabled() {
+    return isVersionEnabled;
+  }
+
+  /**
+   * Returns the type of storage to be used.
+   * @return StorageType
+   */
+  public StorageType getStorageType() {
+    return storageType;
+  }
+
+  /**
+   * Returns new builder class that builds a KsmBucketArgs.
+   *
+   * @return Builder
+   */
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  /**
+   * Builder for KsmBucketArgs.
+   */
+  public static class Builder {
+    private String volumeName;
+    private String bucketName;
+    private List<OzoneAclInfo> addAcls;
+    private List<OzoneAclInfo> removeAcls;
+    private boolean isVersionEnabled;
+    private StorageType storageType;
+
+    Builder() {
+      addAcls = new LinkedList<>();
+      removeAcls = new LinkedList<>();
+    }
+
+    public Builder setVolumeName(String volume) {
+      this.volumeName = volume;
+      return this;
+    }
+
+    public Builder setBucketName(String bucket) {
+      this.bucketName = bucket;
+      return this;
+    }
+
+    public Builder addAddAcl(OzoneAclInfo acl) {
+      this.addAcls.add(acl);
+      return this;
+    }
+
+    public Builder addRemoveAcl(OzoneAclInfo acl) {
+      this.removeAcls.add(acl);
+      return this;
+    }
+
+    public Builder setIsVersionEnabled(boolean versionFlag) {
+      this.isVersionEnabled = versionFlag;
+      return this;
+    }
+
+    public Builder setStorageType(StorageType storage) {
+      this.storageType = storage;
+      return this;
+    }
+
+    /**
+     * Constructs the KsmBucketArgs.
+     * @return instance of KsmBucketArgs.
+     */
+    public KsmBucketArgs build() {
+      Preconditions.checkNotNull(volumeName);
+      Preconditions.checkNotNull(bucketName);
+      Preconditions.checkNotNull(isVersionEnabled);
+      return new KsmBucketArgs(volumeName, bucketName, addAcls, removeAcls,
+          isVersionEnabled, storageType);
+    }
+  }
+
+  /**
+   * Creates BucketInfo protobuf from KsmBucketArgs.
+   */
+  public BucketInfo getProtobuf() {
+    return BucketInfo.newBuilder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .addAllAddAcls(addAcls)
+        .addAllRemoveAcls(removeAcls)
+        .setIsVersionEnabled(isVersionEnabled)
+        .setStorageType(PBHelperClient.convertStorageType(storageType))
+        .build();
+  }
+
+  /**
+   * Parses BucketInfo protobuf and creates KsmBucketArgs.
+   * @param bucketInfo
+   * @return instance of KsmBucketArgs
+   */
+  public static KsmBucketArgs getFromProtobuf(BucketInfo bucketInfo) {
+    return new KsmBucketArgs(
+        bucketInfo.getVolumeName(),
+        bucketInfo.getBucketName(),
+        bucketInfo.getAddAclsList(),
+        bucketInfo.getRemoveAclsList(),
+        bucketInfo.getIsVersionEnabled(),
+        PBHelperClient.convertStorageType(
+            bucketInfo.getStorageType()));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/002bb5fa/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeySpaceManagerProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeySpaceManagerProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeySpaceManagerProtocol.java
new file mode 100644
index 0000000..cd5d0c9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeySpaceManagerProtocol.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ksm.protocol;
+
+import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
+import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Protocol to talk to KSM.
+ */
+public interface KeySpaceManagerProtocol {
+
+  /**
+   * Creates a volume.
+   * @param args - Arguments to create Volume.
+   * @throws IOException
+   */
+  void createVolume(KsmVolumeArgs args) throws IOException;
+
+  /**
+   * Changes the owner of a volume.
+   * @param volume  - Name of the volume.
+   * @param owner - Name of the owner.
+   * @throws IOException
+   */
+  void setOwner(String volume, String owner) throws IOException;
+
+  /**
+   * Changes the Quota on a volume.
+   * @param volume - Name of the volume.
+   * @param quota - Quota in bytes.
+   * @throws IOException
+   */
+  void setQuota(String volume, long quota) throws IOException;
+
+  /**
+   * Checks if the specified user can access this volume.
+   * @param volume - volume
+   * @param userName - user name
+   * @throws IOException
+   */
+  void checkVolumeAccess(String volume, String userName) throws IOException;
+
+  /**
+   * Gets the volume information.
+   * @param volume - Volume name.s
+   * @return VolumeArgs or exception is thrown.
+   * @throws IOException
+   */
+  KsmVolumeArgs getVolumeInfo(String volume) throws IOException;
+
+  /**
+   * Deletes the an exisiting empty volume.
+   * @param volume - Name of the volume.
+   * @throws IOException
+   */
+  void deleteVolume(String volume) throws IOException;
+
+  /**
+   * Lists volume owned by a specific user.
+   * @param userName - user name
+   * @param prefix  - Filter prefix -- Return only entries that match this.
+   * @param prevKey - Previous key -- List starts from the next from the prevkey
+   * @param maxKeys - Max number of keys to return.
+   * @return List of Volumes.
+   * @throws IOException
+   */
+  List<KsmVolumeArgs> listVolumeByUser(String userName, String prefix, String
+      prevKey, long maxKeys) throws IOException;
+
+  /**
+   * Lists volume all volumes in the cluster.
+   * @param prefix  - Filter prefix -- Return only entries that match this.
+   * @param prevKey - Previous key -- List starts from the next from the prevkey
+   * @param maxKeys - Max number of keys to return.
+   * @return List of Volumes.
+   * @throws IOException
+   */
+  List<KsmVolumeArgs> listAllVolumes(String prefix, String
+      prevKey, long maxKeys) throws IOException;
+
+  /**
+   * Creates a bucket.
+   * @param args - Arguments to create Bucket.
+   * @throws IOException
+   */
+  void createBucket(KsmBucketArgs args) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/002bb5fa/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeyspaceManagerProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeyspaceManagerProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeyspaceManagerProtocol.java
deleted file mode 100644
index 546b6c3..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeyspaceManagerProtocol.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ksm.protocol;
-
-import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Protocol to talk to KSM.
- */
-public interface KeyspaceManagerProtocol {
-
-  /**
-   * Creates a volume.
-   * @param args - Arguments to create Volume.
-   * @throws IOException
-   */
-  void createVolume(KsmVolumeArgs args) throws IOException;
-
-  /**
-   * Changes the owner of a volume.
-   * @param volume  - Name of the volume.
-   * @param owner - Name of the owner.
-   * @throws IOException
-   */
-  void setOwner(String volume, String owner) throws IOException;
-
-  /**
-   * Changes the Quota on a volume.
-   * @param volume - Name of the volume.
-   * @param quota - Quota in bytes.
-   * @throws IOException
-   */
-  void setQuota(String volume, long quota) throws IOException;
-
-  /**
-   * Checks if the specified user can access this volume.
-   * @param volume - volume
-   * @param userName - user name
-   * @throws IOException
-   */
-  void checkVolumeAccess(String volume, String userName) throws IOException;
-
-  /**
-   * Gets the volume information.
-   * @param volume - Volume name.s
-   * @return VolumeArgs or exception is thrown.
-   * @throws IOException
-   */
-  KsmVolumeArgs getVolumeInfo(String volume) throws IOException;
-
-  /**
-   * Deletes the an exisiting empty volume.
-   * @param volume - Name of the volume.
-   * @throws IOException
-   */
-  void deleteVolume(String volume) throws IOException;
-
-  /**
-   * Lists volume owned by a specific user.
-   * @param userName - user name
-   * @param prefix  - Filter prefix -- Return only entries that match this.
-   * @param prevKey - Previous key -- List starts from the next from the prevkey
-   * @param maxKeys - Max number of keys to return.
-   * @return List of Volumes.
-   * @throws IOException
-   */
-  List<KsmVolumeArgs> listVolumeByUser(String userName, String prefix, String
-      prevKey, long maxKeys) throws IOException;
-
-  /**
-   * Lists volume all volumes in the cluster.
-   * @param prefix  - Filter prefix -- Return only entries that match this.
-   * @param prevKey - Previous key -- List starts from the next from the prevkey
-   * @param maxKeys - Max number of keys to return.
-   * @return List of Volumes.
-   * @throws IOException
-   */
-  List<KsmVolumeArgs> listAllVolumes(String prefix, String
-      prevKey, long maxKeys) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/002bb5fa/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java
index beb8b06..da13426 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java
@@ -22,8 +22,15 @@ import com.google.protobuf.ServiceException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtocolTranslator;
+import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
 import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
-import org.apache.hadoop.ksm.protocol.KeyspaceManagerProtocol;
+import org.apache.hadoop.ksm.protocol.KeySpaceManagerProtocol;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.BucketInfo;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.CreateBucketRequest;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.CreateBucketResponse;
 import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.CreateVolumeRequest;
 import org.apache.hadoop.ozone.protocol.proto
@@ -38,12 +45,12 @@ import java.io.IOException;
 import java.util.List;
 
 /**
- *  The client side implementation of KeyspaceManagerProtocol.
+ *  The client side implementation of KeySpaceManagerProtocol.
  */
 
 @InterfaceAudience.Private
 public final class KeySpaceManagerProtocolClientSideTranslatorPB
-    implements KeyspaceManagerProtocol, ProtocolTranslator, Closeable {
+    implements KeySpaceManagerProtocol, ProtocolTranslator, Closeable {
 
   /**
    * RpcController is not used and hence is set to null.
@@ -200,6 +207,32 @@ public final class KeySpaceManagerProtocolClientSideTranslatorPB
   }
 
   /**
+   * Creates a bucket.
+   *
+   * @param args - Arguments to create Bucket.
+   * @throws IOException
+   */
+  @Override
+  public void createBucket(KsmBucketArgs args) throws IOException {
+    CreateBucketRequest.Builder req =
+        CreateBucketRequest.newBuilder();
+    BucketInfo bucketInfo = args.getProtobuf();
+    req.setBucketInfo(bucketInfo);
+
+    final CreateBucketResponse resp;
+    try {
+      resp = rpcProxy.createBucket(NULL_RPC_CONTROLLER,
+          req.build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+    if (resp.getStatus() != Status.OK) {
+      throw new IOException("Bucket creation failed, error: "
+          + resp.getStatus());
+    }
+  }
+
+  /**
    * Return the proxy object underlying this protocol translator.
    *
    * @return the proxy object underlying this protocol translator.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/002bb5fa/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolPB.java
index 1490008..8b960a9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolPB.java
@@ -19,15 +19,16 @@ package org.apache.hadoop.ksm.protocolPB;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.ipc.ProtocolInfo;
-import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyspaceManagerService;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.KeySpaceManagerService;
 
 /**
  * Protocol used to communicate with KSM.
  */
 @ProtocolInfo(protocolName =
-    "org.apache.hadoop.ozone.protocol.KeyspaceManagerProtocol",
+    "org.apache.hadoop.ozone.protocol.KeySpaceManagerProtocol",
     protocolVersion = 1)
 @InterfaceAudience.Private
 public interface KeySpaceManagerProtocolPB
-    extends KeyspaceManagerService.BlockingInterface {
+    extends KeySpaceManagerService.BlockingInterface {
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/002bb5fa/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto
index 4ce7275..ca3d022 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto
@@ -34,6 +34,7 @@ Ozone key space manager. Ozone KSM manages the namespace for ozone.
 This is similar to Namenode for Ozone.
 */
 
+import "hdfs.proto";
 import "Ozone.proto";
 
 enum Status {
@@ -44,8 +45,11 @@ enum Status {
     VOLUME_ALREADY_EXISTS = 5;
     USER_NOT_FOUND = 6;
     USER_TOO_MANY_VOLUMES = 7;
-    ACCESS_DENIED = 8;
-    INTERNAL_ERROR = 9;
+    BUCKET_NOT_FOUND = 8;
+    BUCKET_NOT_EMPTY = 9;
+    BUCKET_ALREADY_EXISTS = 10;
+    ACCESS_DENIED = 11;
+    INTERNAL_ERROR = 12;
 }
 
 
@@ -154,10 +158,42 @@ message ListVolumeResponse {
     repeated VolumeInfo volumeInfo = 2;
 }
 
+message BucketInfo {
+    required string volumeName = 1;
+    required string bucketName = 2;
+    repeated OzoneAclInfo addAcls = 3;
+    repeated OzoneAclInfo removeAcls = 4;
+    required bool isVersionEnabled = 5 [default = false];
+    optional StorageTypeProto storageType = 6 [default = DISK];
+}
+
+message OzoneAclInfo {
+    enum OzoneAclType {
+        USER = 1;
+        GROUP = 2;
+        WORLD = 3;
+    }
+    enum OzoneAclRights {
+        READ = 1;
+        WRITE = 2;
+        READ_WRITE = 3;
+    }
+    required OzoneAclType type = 1;
+    required string name = 2;
+    required OzoneAclRights rights = 3;
+}
+
+message CreateBucketRequest {
+    required BucketInfo bucketInfo = 1;
+}
+
+message CreateBucketResponse {
+    required Status status = 1;
+}
 /**
  The KSM service that takes care of Ozone namespace.
 */
-service KeyspaceManagerService {
+service KeySpaceManagerService {
 
     /**
         Creates a Volume.
@@ -193,4 +229,10 @@ service KeyspaceManagerService {
     */
     rpc listVolumes(ListVolumeRequest)
         returns (ListVolumeResponse);
+
+    /**
+        Creates a Bucket.
+    */
+    rpc createBucket(CreateBucketRequest)
+        returns(CreateBucketResponse);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/002bb5fa/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 5f960b2..2ecd67d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -93,6 +93,11 @@ public final class OzoneConsts {
   public static final String OZONE_HANDLER_DISTRIBUTED = "distributed";
   public static final String OZONE_HANDLER_LOCAL = "local";
 
+  /**
+   * Ozone metadata key delimiter.
+   */
+  public static final String DB_KEY_DELIMITER = "/";
+
   private OzoneConsts() {
     // Never Constructed
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/002bb5fa/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManager.java
new file mode 100644
index 0000000..b9dd551
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManager.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.ksm;
+
+import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
+import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
+
+/**
+ * BucketManager handles all the bucket level operations.
+ */
+public interface BucketManager {
+  /**
+   * Creates a bucket.
+   * @param args - KsmBucketArgs for creating bucket.
+   */
+  void createBucket(KsmBucketArgs args) throws KSMException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/002bb5fa/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManagerImpl.java
new file mode 100644
index 0000000..b0368ea
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManagerImpl.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.ksm;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
+import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
+import org.iq80.leveldb.DBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.ozone.OzoneConsts.DB_KEY_DELIMITER;
+
+/**
+ * KSM bucket manager.
+ */
+public class BucketManagerImpl implements BucketManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(BucketManagerImpl.class);
+
+  /**
+   * MetadataManager is used for accessing KSM MetadataDB and ReadWriteLock.
+   */
+  private final MetadataManager metadataManager;
+
+  /**
+   * Constructs BucketManager.
+   * @param metadataManager
+   */
+  public BucketManagerImpl(MetadataManager metadataManager){
+    this.metadataManager = metadataManager;
+  }
+
+  /**
+   * MetadataDB is maintained in MetadataManager and shared between
+   * BucketManager and VolumeManager. (and also by KeyManager)
+   *
+   * BucketManager uses MetadataDB to store bucket level information.
+   *
+   * Keys used in BucketManager for storing data into MetadataDB
+   * for BucketInfo:
+   * {volume/bucket} -> bucketInfo
+   *
+   * Work flow of create bucket:
+   *
+   * -> Check if the Volume exists in metadataDB, if not throw
+   * VolumeNotFoundException.
+   * -> Else check if the Bucket exists in metadataDB, if so throw
+   * BucketExistException
+   * -> Else update MetadataDB with VolumeInfo.
+   */
+
+  /**
+   * Creates a bucket.
+   * @param args - KsmBucketArgs.
+   */
+  @Override
+  public void createBucket(KsmBucketArgs args) throws KSMException {
+    Preconditions.checkNotNull(args);
+    metadataManager.writeLock().lock();
+    String volumeNameString = args.getVolumeName();
+    String bucketNameString = args.getBucketName();
+    try {
+      //bucket key: {volume/bucket}
+      String bucketKeyString = volumeNameString +
+          DB_KEY_DELIMITER + bucketNameString;
+
+      byte[] volumeName = DFSUtil.string2Bytes(volumeNameString);
+      byte[] bucketKey = DFSUtil.string2Bytes(bucketKeyString);
+
+      //Check if the volume exists
+      if(metadataManager.get(volumeName) == null) {
+        LOG.error("volume: {} not found ", volumeNameString);
+        throw new KSMException("Volume doesn't exist",
+            KSMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
+      }
+      //Check if bucket already exists
+      if(metadataManager.get(bucketKey) != null) {
+        LOG.error("bucket: {} already exists ", bucketNameString);
+        throw new KSMException("Bucket already exist",
+            KSMException.ResultCodes.FAILED_BUCKET_ALREADY_EXISTS);
+      }
+      metadataManager.put(bucketKey, args.getProtobuf().toByteArray());
+
+      LOG.info("created bucket: {} in volume: {}", bucketNameString,
+          volumeNameString);
+    } catch (DBException ex) {
+      LOG.error("Bucket creation failed for bucket:{} in volume:{}",
+          volumeNameString, bucketNameString, ex);
+      throw new KSMException(ex.getMessage(),
+          KSMException.ResultCodes.FAILED_INTERNAL_ERROR);
+    } finally {
+      metadataManager.writeLock().unlock();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/002bb5fa/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
index c75c8fc..359ddba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
@@ -29,9 +29,11 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 public class KSMMetrics {
   // KSM op metrics
   private @Metric MutableCounterLong numVolumeCreates;
+  private @Metric MutableCounterLong numBucketCreates;
 
   // Failure Metrics
   private @Metric MutableCounterLong numVolumeCreateFails;
+  private @Metric MutableCounterLong numBucketCreateFails;
 
   public KSMMetrics() {
   }
@@ -47,17 +49,36 @@ public class KSMMetrics {
     numVolumeCreates.incr();
   }
 
+  public void incNumBucketCreates() {
+    numBucketCreates.incr();
+  }
+
   public void incNumVolumeCreateFails() {
     numVolumeCreates.incr();
   }
 
+  public void incNumBucketCreateFails() {
+    numBucketCreateFails.incr();
+  }
+
   @VisibleForTesting
   public long getNumVolumeCreates() {
     return numVolumeCreates.value();
   }
 
   @VisibleForTesting
+  public long getNumBucketCreates() {
+    return numBucketCreates.value();
+  }
+
+  @VisibleForTesting
   public long getNumVolumeCreateFails() {
     return numVolumeCreateFails.value();
   }
+
+  @VisibleForTesting
+  public long getNumBucketCreateFails() {
+    return numBucketCreateFails.value();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/002bb5fa/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
index 2ffeee7..2578a9a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
@@ -23,13 +23,14 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
 import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
-import org.apache.hadoop.ksm.protocol.KeyspaceManagerProtocol;
+import org.apache.hadoop.ksm.protocol.KeySpaceManagerProtocol;
 import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB;
 import org.apache.hadoop.ozone.OzoneClientUtils;
 import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.ozone.protocolPB
-    .KeyspaceManagerProtocolServerSideTranslatorPB;
+    .KeySpaceManagerProtocolServerSideTranslatorPB;
 import org.apache.hadoop.ozone.scm.StorageContainerManager;
 import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
@@ -46,7 +47,7 @@ import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
 import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
     .OZONE_KSM_HANDLER_COUNT_KEY;
 import static org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.KeyspaceManagerService
+    .KeySpaceManagerProtocolProtos.KeySpaceManagerService
     .newReflectiveBlockingService;
 import static org.apache.hadoop.util.ExitUtil.terminate;
 
@@ -54,13 +55,15 @@ import static org.apache.hadoop.util.ExitUtil.terminate;
  * Ozone Keyspace manager is the metadata manager of ozone.
  */
 @InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"})
-public class KeySpaceManager implements KeyspaceManagerProtocol {
+public class KeySpaceManager implements KeySpaceManagerProtocol {
   private static final Logger LOG =
       LoggerFactory.getLogger(KeySpaceManager.class);
 
   private final RPC.Server ksmRpcServer;
   private final InetSocketAddress ksmRpcAddress;
+  private final MetadataManager metadataManager;
   private final VolumeManager volumeManager;
+  private final BucketManager bucketManager;
   private final KSMMetrics metrics;
 
   public KeySpaceManager(OzoneConfiguration conf) throws IOException {
@@ -71,7 +74,7 @@ public class KeySpaceManager implements KeyspaceManagerProtocol {
         ProtobufRpcEngine.class);
 
     BlockingService ksmService = newReflectiveBlockingService(
-        new KeyspaceManagerProtocolServerSideTranslatorPB(this));
+        new KeySpaceManagerProtocolServerSideTranslatorPB(this));
     final InetSocketAddress ksmNodeRpcAddr = OzoneClientUtils.
         getKsmAddress(conf);
     ksmRpcServer = startRpcServer(conf, ksmNodeRpcAddr,
@@ -79,7 +82,9 @@ public class KeySpaceManager implements KeyspaceManagerProtocol {
         handlerCount);
     ksmRpcAddress = updateListenAddress(conf,
         OZONE_KSM_ADDRESS_KEY, ksmNodeRpcAddr, ksmRpcServer);
-    volumeManager = new VolumeManagerImpl(this, conf);
+    metadataManager = new MetadataManagerImpl(conf);
+    volumeManager = new VolumeManagerImpl(metadataManager, conf);
+    bucketManager = new BucketManagerImpl(metadataManager);
     metrics = KSMMetrics.create();
   }
 
@@ -185,7 +190,7 @@ public class KeySpaceManager implements KeyspaceManagerProtocol {
   public void start() {
     LOG.info(buildRpcServerStartMessage("KeyspaceManager RPC server",
         ksmRpcAddress));
-    volumeManager.start();
+    metadataManager.start();
     ksmRpcServer.start();
   }
 
@@ -195,7 +200,7 @@ public class KeySpaceManager implements KeyspaceManagerProtocol {
   public void stop() {
     try {
       ksmRpcServer.stop();
-      volumeManager.stop();
+      metadataManager.stop();
     } catch (IOException e) {
       LOG.info("Key Space Manager stop failed.", e);
     }
@@ -221,8 +226,13 @@ public class KeySpaceManager implements KeyspaceManagerProtocol {
    */
   @Override
   public void createVolume(KsmVolumeArgs args) throws IOException {
-    metrics.incNumVolumeCreates();
-    volumeManager.createVolume(args);
+    try {
+      metrics.incNumVolumeCreates();
+      volumeManager.createVolume(args);
+    } catch (Exception ex) {
+      metrics.incNumVolumeCreateFails();
+      throw ex;
+    }
   }
 
   /**
@@ -317,4 +327,21 @@ public class KeySpaceManager implements KeyspaceManagerProtocol {
       maxKeys) throws IOException {
     return null;
   }
+
+  /**
+   * Creates a bucket.
+   *
+   * @param args - Arguments to create Bucket.
+   * @throws IOException
+   */
+  @Override
+  public void createBucket(KsmBucketArgs args) throws IOException {
+    try {
+      metrics.incNumBucketCreates();
+      bucketManager.createBucket(args);
+    } catch (Exception ex) {
+      metrics.incNumBucketCreateFails();
+      throw ex;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/002bb5fa/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java
new file mode 100644
index 0000000..71269b2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.ksm;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * KSM metadata manager interface.
+ */
+public interface MetadataManager {
+  /**
+   * Start metadata manager.
+   */
+  void start();
+
+  /**
+   * Stop metadata manager.
+   */
+  void stop() throws IOException;
+
+  /**
+   * Returns the read lock used on Metadata DB.
+   * @return readLock
+   */
+  Lock readLock();
+
+  /**
+   * Returns the write lock used on Metadata DB.
+   * @return writeLock
+   */
+  Lock writeLock();
+
+  /**
+   * Returns the value associated with this key.
+   * @param key - key
+   * @return value
+   */
+  byte[] get(byte[] key);
+
+  /**
+   * Puts a Key into Metadata DB.
+   * @param key   - key
+   * @param value - value
+   */
+  void put(byte[] key, byte[] value);
+
+  /**
+   * Performs a batch Put to Metadata DB.
+   * Can be used to do multiple puts atomically.
+   * @param list - list of Map.Entry
+   */
+  void batchPut(List<Map.Entry<byte[], byte[]>> list) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/002bb5fa/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java
new file mode 100644
index 0000000..f4b0440
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.ksm;
+
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.utils.LevelDBStore;
+import org.iq80.leveldb.Options;
+import org.iq80.leveldb.WriteBatch;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.hadoop.ozone.OzoneConsts.KSM_DB_NAME;
+import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
+    .OZONE_KSM_DB_CACHE_SIZE_DEFAULT;
+import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
+    .OZONE_KSM_DB_CACHE_SIZE_MB;
+
+/**
+ * KSM metadata manager interface.
+ */
+public class MetadataManagerImpl implements  MetadataManager {
+
+  private final LevelDBStore store;
+  private final ReadWriteLock lock;
+
+
+  public MetadataManagerImpl(OzoneConfiguration conf) throws IOException {
+    File metaDir = OzoneUtils.getScmMetadirPath(conf);
+    final int cacheSize = conf.getInt(OZONE_KSM_DB_CACHE_SIZE_MB,
+        OZONE_KSM_DB_CACHE_SIZE_DEFAULT);
+    Options options = new Options();
+    options.cacheSize(cacheSize * OzoneConsts.MB);
+    File ksmDBFile = new File(metaDir.getPath(), KSM_DB_NAME);
+    this.store = new LevelDBStore(ksmDBFile, options);
+    this.lock = new ReentrantReadWriteLock();
+  }
+
+  /**
+   * Start metadata manager.
+   */
+  @Override
+  public void start() {
+
+  }
+
+  /**
+   * Stop metadata manager.
+   */
+  @Override
+  public void stop() throws IOException {
+    if (store != null) {
+      store.close();
+    }
+  }
+
+  /**
+   * Returns the read lock used on Metadata DB.
+   * @return readLock
+   */
+  @Override
+  public Lock readLock() {
+    return lock.readLock();
+  }
+
+  /**
+   * Returns the write lock used on Metadata DB.
+   * @return writeLock
+   */
+  @Override
+  public Lock writeLock() {
+    return lock.writeLock();
+  }
+
+  /**
+   * Returns the value associated with this key.
+   * @param key - key
+   * @return value
+   */
+  @Override
+  public byte[] get(byte[] key) {
+    return store.get(key);
+  }
+
+  /**
+   * Puts a Key into Metadata DB.
+   * @param key   - key
+   * @param value - value
+   */
+  @Override
+  public void put(byte[] key, byte[] value) {
+    store.put(key, value);
+  }
+
+  /**
+   * Performs a batch Put to Metadata DB.
+   * Can be used to do multiple puts atomically.
+   * @param list - list of Map.Entry
+   */
+  @Override
+  public void batchPut(List<Map.Entry<byte[], byte[]>> list)
+      throws IOException {
+    WriteBatch batch = store.createWriteBatch();
+    list.forEach(entry -> batch.put(entry.getKey(), entry.getValue()));
+    try {
+      store.commitWriteBatch(batch);
+    } finally {
+      store.closeWriteBatch(batch);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/002bb5fa/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManager.java
index e5bb4bd..6c2f0a3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManager.java
@@ -24,15 +24,6 @@ import java.io.IOException;
  * KSM volume manager interface.
  */
 public interface VolumeManager {
-  /**
-   * Start volume manager.
-   */
-  void start();
-
-  /**
-   * Stop volume manager.
-   */
-  void stop() throws IOException;
 
   /**
    * Create a new volume.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/002bb5fa/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
index 1e63127..ff0d087 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
@@ -20,32 +20,21 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
 import org.apache.hadoop.ozone.OzoneConfiguration;
-import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
 import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.VolumeList;
 import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.VolumeInfo;
-import org.apache.hadoop.ozone.web.utils.OzoneUtils;
-import org.apache.hadoop.utils.LevelDBStore;
 import org.iq80.leveldb.DBException;
-import org.iq80.leveldb.Options;
-import org.iq80.leveldb.WriteBatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.io.IOException;
+import java.util.AbstractMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.Map;
 
-import static org.apache.hadoop.ozone.OzoneConsts.KSM_DB_NAME;
-import static org.apache.hadoop.ozone.ksm
-    .KSMConfigKeys.OZONE_KSM_DB_CACHE_SIZE_DEFAULT;
-import static org.apache.hadoop.ozone.ksm
-    .KSMConfigKeys.OZONE_KSM_DB_CACHE_SIZE_MB;
 import static org.apache.hadoop.ozone.ksm
     .KSMConfigKeys.OZONE_KSM_USER_MAX_VOLUME_DEFAULT;
 import static org.apache.hadoop.ozone.ksm
@@ -60,9 +49,7 @@ public class VolumeManagerImpl implements VolumeManager {
   private static final Logger LOG =
       LoggerFactory.getLogger(VolumeManagerImpl.class);
 
-  private final KeySpaceManager ksm;
-  private final LevelDBStore store;
-  private final ReadWriteLock lock;
+  private final MetadataManager metadataManager;
   private final int maxUserVolumeCount;
 
   /**
@@ -70,30 +57,13 @@ public class VolumeManagerImpl implements VolumeManager {
    * @param conf - Ozone configuration.
    * @throws IOException
    */
-  public VolumeManagerImpl(KeySpaceManager ksm, OzoneConfiguration conf)
-      throws IOException {
-    File metaDir = OzoneUtils.getScmMetadirPath(conf);
-    final int cacheSize = conf.getInt(OZONE_KSM_DB_CACHE_SIZE_MB,
-        OZONE_KSM_DB_CACHE_SIZE_DEFAULT);
-    Options options = new Options();
-    options.cacheSize(cacheSize * OzoneConsts.MB);
-    File ksmDBFile = new File(metaDir.getPath(), KSM_DB_NAME);
-    this.ksm = ksm;
-    this.store = new LevelDBStore(ksmDBFile, options);
-    lock = new ReentrantReadWriteLock();
+  public VolumeManagerImpl(MetadataManager metadataManager,
+      OzoneConfiguration conf) throws IOException {
+    this.metadataManager = metadataManager;
     this.maxUserVolumeCount = conf.getInt(OZONE_KSM_USER_MAX_VOLUME,
         OZONE_KSM_USER_MAX_VOLUME_DEFAULT);
   }
 
-  @Override
-  public void start() {
-  }
-
-  @Override
-  public void stop() throws IOException {
-    store.close();
-  }
-
   /**
    * Creates a volume.
    * @param args - KsmVolumeArgs.
@@ -101,10 +71,11 @@ public class VolumeManagerImpl implements VolumeManager {
   @Override
   public void createVolume(KsmVolumeArgs args) throws IOException {
     Preconditions.checkNotNull(args);
-    lock.writeLock().lock();
-    WriteBatch batch = store.createWriteBatch();
+    metadataManager.writeLock().lock();
+    List<Map.Entry<byte[], byte[]>> batch = new LinkedList<>();
     try {
-      byte[] volumeName = store.get(DFSUtil.string2Bytes(args.getVolume()));
+      byte[] volumeName = metadataManager.
+          get(DFSUtil.string2Bytes(args.getVolume()));
 
       // Check of the volume already exists
       if(volumeName != null) {
@@ -114,7 +85,8 @@ public class VolumeManagerImpl implements VolumeManager {
 
       // Next count the number of volumes for the user
       String dbUserName = "$" + args.getOwnerName();
-      byte[] volumeList  = store.get(DFSUtil.string2Bytes(dbUserName));
+      byte[] volumeList  = metadataManager
+          .get(DFSUtil.string2Bytes(dbUserName));
       List prevVolList;
       if (volumeList != null) {
         VolumeList vlist = VolumeList.parseFrom(volumeList);
@@ -128,26 +100,25 @@ public class VolumeManagerImpl implements VolumeManager {
         throw new KSMException(ResultCodes.FAILED_TOO_MANY_USER_VOLUMES);
       }
 
-      // Commit the volume information to leveldb
+      // Commit the volume information to metadataManager
       VolumeInfo volumeInfo = args.getProtobuf();
-      batch.put(DFSUtil.string2Bytes(args.getVolume()),
-                                     volumeInfo.toByteArray());
+      batch.add(new AbstractMap.SimpleEntry<>(
+          DFSUtil.string2Bytes(args.getVolume()), volumeInfo.toByteArray()));
 
       prevVolList.add(args.getVolume());
       VolumeList newVolList = VolumeList.newBuilder()
               .addAllVolumeNames(prevVolList).build();
-      batch.put(DFSUtil.string2Bytes(dbUserName), newVolList.toByteArray());
-      store.commitWriteBatch(batch);
+      batch.add(new AbstractMap.SimpleEntry<>(
+          DFSUtil.string2Bytes(dbUserName), newVolList.toByteArray()));
+      metadataManager.batchPut(batch);
       LOG.info("created volume:{} user:{}",
                                   args.getVolume(), args.getOwnerName());
     } catch (IOException | DBException ex) {
-      ksm.getMetrics().incNumVolumeCreateFails();
       LOG.error("Volume creation failed for user:{} volname:{}",
                                 args.getOwnerName(), args.getVolume(), ex);
       throw ex;
     } finally {
-      store.closeWriteBatch(batch);
-      lock.writeLock().unlock();
+      metadataManager.writeLock().unlock();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/002bb5fa/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java
index 1a1b3a9..e1b90c3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java
@@ -98,6 +98,8 @@ public class KSMException extends IOException {
   public enum ResultCodes {
     FAILED_TOO_MANY_USER_VOLUMES,
     FAILED_VOLUME_ALREADY_EXISTS,
+    FAILED_VOLUME_NOT_FOUND,
+    FAILED_BUCKET_ALREADY_EXISTS,
     FAILED_INTERNAL_ERROR
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/002bb5fa/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KSMPBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KSMPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KSMPBHelper.java
new file mode 100644
index 0000000..3797bfa
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KSMPBHelper.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.protocolPB;
+
+import org.apache.hadoop.ozone.web.request.OzoneAcl;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.OzoneAclInfo;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.OzoneAclInfo.OzoneAclType;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.OzoneAclInfo.OzoneAclRights;
+
+/**
+ * Utilities for converting protobuf classes.
+ */
+public final class KSMPBHelper {
+
+  private KSMPBHelper() {
+    /** Hidden constructor */
+  }
+
+  /**
+   * Returns protobuf's OzoneAclInfo of the current instance.
+   * @return OzoneAclInfo
+   */
+  public static OzoneAclInfo convertOzoneAcl(OzoneAcl acl) {
+    OzoneAclInfo.OzoneAclType aclType;
+    switch(acl.getType()) {
+    case USER:
+      aclType = OzoneAclType.USER;
+      break;
+    case GROUP:
+      aclType = OzoneAclType.GROUP;
+      break;
+    case WORLD:
+      aclType = OzoneAclType.WORLD;
+      break;
+    default:
+      throw new IllegalArgumentException("ACL type is not recognized");
+    }
+    OzoneAclInfo.OzoneAclRights aclRights;
+    switch(acl.getRights()) {
+    case READ:
+      aclRights = OzoneAclRights.READ;
+      break;
+    case WRITE:
+      aclRights = OzoneAclRights.WRITE;
+      break;
+    case READ_WRITE:
+      aclRights = OzoneAclRights.READ_WRITE;
+      break;
+    default:
+      throw new IllegalArgumentException("ACL right is not recognized");
+    }
+
+    return OzoneAclInfo.newBuilder().setType(aclType)
+        .setName(acl.getName())
+        .setRights(aclRights)
+        .build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/002bb5fa/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java
new file mode 100644
index 0000000..85eca04
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.protocolPB;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
+import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
+import org.apache.hadoop.ksm.protocol.KeySpaceManagerProtocol;
+import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB;
+import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
+import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.CreateBucketRequest;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.CreateBucketResponse;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.CreateVolumeRequest;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.CreateVolumeResponse;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.SetVolumePropertyRequest;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.SetVolumePropertyResponse;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.CheckVolumeAccessRequest;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.CheckVolumeAccessResponse;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.InfoVolumeRequest;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.InfoVolumeResponse;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.DeleteVolumeRequest;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.DeleteVolumeResponse;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.ListVolumeRequest;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.ListVolumeResponse;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.Status;
+
+
+import java.io.IOException;
+
+/**
+ * This class is the server-side translator that forwards requests received on
+ * {@link org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB} to the
+ * KeySpaceManagerService server implementation.
+ */
+public class KeySpaceManagerProtocolServerSideTranslatorPB implements
+    KeySpaceManagerProtocolPB {
+  private final KeySpaceManagerProtocol impl;
+
+  /**
+   * Constructs an instance of the server handler.
+   *
+   * @param impl KeySpaceManagerProtocolPB
+   */
+  public KeySpaceManagerProtocolServerSideTranslatorPB(
+      KeySpaceManagerProtocol impl) {
+    this.impl = impl;
+  }
+
+  @Override
+  public CreateVolumeResponse createVolume(
+      RpcController controller, CreateVolumeRequest request)
+      throws ServiceException {
+    CreateVolumeResponse.Builder resp = CreateVolumeResponse.newBuilder();
+    resp.setStatus(Status.OK);
+    try {
+      impl.createVolume(KsmVolumeArgs.getFromProtobuf(request.getVolumeInfo()));
+    } catch (IOException e) {
+      if (e instanceof KSMException) {
+        KSMException ksmException = (KSMException)e;
+        if (ksmException.getResult() ==
+            ResultCodes.FAILED_VOLUME_ALREADY_EXISTS) {
+          resp.setStatus(Status.VOLUME_ALREADY_EXISTS);
+        } else if (ksmException.getResult() ==
+            ResultCodes.FAILED_TOO_MANY_USER_VOLUMES) {
+          resp.setStatus(Status.USER_TOO_MANY_VOLUMES);
+        }
+      } else {
+        resp.setStatus(Status.INTERNAL_ERROR);
+      }
+    }
+    return resp.build();
+  }
+
+  @Override
+  public SetVolumePropertyResponse setVolumeProperty(
+      RpcController controller, SetVolumePropertyRequest request)
+      throws ServiceException {
+    return null;
+  }
+
+  @Override
+  public CheckVolumeAccessResponse checkVolumeAccess(
+      RpcController controller, CheckVolumeAccessRequest request)
+      throws ServiceException {
+    return null;
+  }
+
+  @Override
+  public InfoVolumeResponse infoVolume(
+      RpcController controller, InfoVolumeRequest request)
+      throws ServiceException {
+    return null;
+  }
+
+  @Override
+  public DeleteVolumeResponse deleteVolume(
+      RpcController controller, DeleteVolumeRequest request)
+      throws ServiceException {
+    return null;
+  }
+
+  @Override
+  public ListVolumeResponse listVolumes(
+      RpcController controller, ListVolumeRequest request)
+      throws ServiceException {
+    return null;
+  }
+
+  @Override
+  public CreateBucketResponse createBucket(
+      RpcController controller, CreateBucketRequest
+      request) throws ServiceException {
+    CreateBucketResponse.Builder resp =
+        CreateBucketResponse.newBuilder();
+    try {
+      impl.createBucket(KsmBucketArgs.getFromProtobuf(
+          request.getBucketInfo()));
+      resp.setStatus(Status.OK);
+    } catch (KSMException ksmEx) {
+      if (ksmEx.getResult() ==
+          ResultCodes.FAILED_VOLUME_NOT_FOUND) {
+        resp.setStatus(Status.VOLUME_NOT_FOUND);
+      } else if (ksmEx.getResult() ==
+          ResultCodes.FAILED_BUCKET_ALREADY_EXISTS) {
+        resp.setStatus(Status.BUCKET_ALREADY_EXISTS);
+      }
+    } catch(IOException ex) {
+      resp.setStatus(Status.INTERNAL_ERROR);
+    }
+    return resp.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/002bb5fa/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeyspaceManagerProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeyspaceManagerProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeyspaceManagerProtocolServerSideTranslatorPB.java
deleted file mode 100644
index aa52c17..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeyspaceManagerProtocolServerSideTranslatorPB.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.ozone.protocolPB;
-
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
-import org.apache.hadoop.ksm.protocol.KeyspaceManagerProtocol;
-import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB;
-import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
-import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.CreateVolumeRequest;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.CreateVolumeResponse;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.SetVolumePropertyRequest;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.SetVolumePropertyResponse;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.CheckVolumeAccessRequest;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.CheckVolumeAccessResponse;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.InfoVolumeRequest;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.InfoVolumeResponse;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.DeleteVolumeRequest;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.DeleteVolumeResponse;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.ListVolumeRequest;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.ListVolumeResponse;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.Status;
-
-
-import java.io.IOException;
-
-/**
- * This class is the server-side translator that forwards requests received on
- * {@link org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB} to the
- * KeyspaceManagerService server implementation.
- */
-public class KeyspaceManagerProtocolServerSideTranslatorPB implements
-    KeySpaceManagerProtocolPB {
-  private final KeyspaceManagerProtocol impl;
-
-  /**
-   * Constructs an instance of the server handler.
-   *
-   * @param impl KeySpaceManagerProtocolPB
-   */
-  public KeyspaceManagerProtocolServerSideTranslatorPB(
-      KeyspaceManagerProtocol impl) {
-    this.impl = impl;
-  }
-
-  @Override
-  public CreateVolumeResponse createVolume(
-      RpcController controller, CreateVolumeRequest request)
-      throws ServiceException {
-    CreateVolumeResponse.Builder resp = CreateVolumeResponse.newBuilder();
-    resp.setStatus(Status.OK);
-    try {
-      impl.createVolume(KsmVolumeArgs.getFromProtobuf(request.getVolumeInfo()));
-    } catch (IOException e) {
-      if (e instanceof KSMException) {
-        KSMException ksmException = (KSMException)e;
-        if (ksmException.getResult() ==
-            ResultCodes.FAILED_VOLUME_ALREADY_EXISTS) {
-          resp.setStatus(Status.VOLUME_ALREADY_EXISTS);
-        } else if (ksmException.getResult() ==
-            ResultCodes.FAILED_TOO_MANY_USER_VOLUMES) {
-          resp.setStatus(Status.USER_TOO_MANY_VOLUMES);
-        }
-      } else {
-        resp.setStatus(Status.INTERNAL_ERROR);
-      }
-    }
-    return resp.build();
-  }
-
-  @Override
-  public SetVolumePropertyResponse setVolumeProperty(
-      RpcController controller, SetVolumePropertyRequest request)
-      throws ServiceException {
-    return null;
-  }
-
-  @Override
-  public CheckVolumeAccessResponse checkVolumeAccess(
-      RpcController controller, CheckVolumeAccessRequest request)
-      throws ServiceException {
-    return null;
-  }
-
-  @Override
-  public InfoVolumeResponse infoVolume(
-      RpcController controller, InfoVolumeRequest request)
-      throws ServiceException {
-    return null;
-  }
-
-  @Override
-  public DeleteVolumeResponse deleteVolume(
-      RpcController controller, DeleteVolumeRequest request)
-      throws ServiceException {
-    return null;
-  }
-
-  @Override
-  public ListVolumeResponse listVolumes(
-      RpcController controller, ListVolumeRequest request)
-      throws ServiceException {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/002bb5fa/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
index e96d3d1..7a9bd4e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
@@ -18,20 +18,29 @@
 
 package org.apache.hadoop.ozone.web.storage;
 
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyResponseProto;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
+import org.apache.hadoop.hdfs.ozone.protocol.proto
+    .ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdfs.ozone.protocol.proto
+    .ContainerProtos.GetKeyResponseProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto
+    .ContainerProtos.KeyData;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset
+    .LengthInputStream;
+import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
 import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
-import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ksm.protocolPB
+    .KeySpaceManagerProtocolClientSideTranslatorPB;
 import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.OzoneConsts.Versioning;
+import org.apache.hadoop.ozone.protocolPB.KSMPBHelper;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.scm.ScmConfigKeys;
 import org.apache.hadoop.scm.XceiverClientManager;
 import org.apache.hadoop.scm.protocol.LocatedContainer;
-import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.scm.protocolPB
+    .StorageContainerLocationProtocolClientSideTranslatorPB;
 import org.apache.hadoop.ozone.web.exceptions.OzoneException;
 import org.apache.hadoop.ozone.web.handlers.BucketArgs;
 import org.apache.hadoop.ozone.web.handlers.KeyArgs;
@@ -59,7 +68,6 @@ import java.util.List;
 
 import static org.apache.hadoop.ozone.web.storage.OzoneContainerTranslation.*;
 import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.getKey;
-import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.putKey;
 
 /**
  * A {@link StorageHandler} implementation that distributes object storage
@@ -167,22 +175,38 @@ public final class DistributedStorageHandler implements StorageHandler {
   @Override
   public void createBucket(final BucketArgs args)
       throws IOException, OzoneException {
-    String containerKey = buildContainerKey(args.getVolumeName(),
-        args.getBucketName());
-    XceiverClientSpi xceiverClient = acquireXceiverClient(containerKey);
-    try {
-      BucketInfo bucket = new BucketInfo();
-      bucket.setVolumeName(args.getVolumeName());
-      bucket.setBucketName(args.getBucketName());
-      bucket.setAcls(args.getAddAcls());
-      bucket.setVersioning(args.getVersioning());
-      bucket.setStorageType(args.getStorageType());
-      KeyData containerKeyData = fromBucketToContainerKeyData(
-          xceiverClient.getPipeline().getContainerName(), containerKey, bucket);
-      putKey(xceiverClient, containerKeyData, args.getRequestID());
-    } finally {
-      xceiverClientManager.releaseClient(xceiverClient);
+    KsmBucketArgs.Builder builder = KsmBucketArgs.newBuilder();
+    args.getAddAcls().forEach(acl ->
+        builder.addAddAcl(KSMPBHelper.convertOzoneAcl(acl)));
+    args.getRemoveAcls().forEach(acl ->
+        builder.addRemoveAcl(KSMPBHelper.convertOzoneAcl(acl)));
+    builder.setVolumeName(args.getVolumeName())
+        .setBucketName(args.getBucketName())
+        .setIsVersionEnabled(getBucketVersioningProtobuf(
+            args.getVersioning()))
+        .setStorageType(args.getStorageType());
+    keySpaceManagerClient.createBucket(builder.build());
+  }
+
+  /**
+   * Converts OzoneConts.Versioning enum to boolean.
+   *
+   * @param version
+   * @return corresponding boolean value
+   */
+  private boolean getBucketVersioningProtobuf(
+      Versioning version) {
+    if(version != null) {
+      switch(version) {
+      case ENABLED:
+        return true;
+      case NOT_DEFINED:
+      case DISABLED:
+      default:
+        return false;
+      }
     }
+    return false;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/002bb5fa/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestBucketManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestBucketManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestBucketManagerImpl.java
new file mode 100644
index 0000000..5bd9ec6
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestBucketManagerImpl.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.ksm;
+
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
+import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
+import org.apache.hadoop.ozone.ksm.exceptions
+    .KSMException.ResultCodes;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.mockito.Mockito.any;
+
+/**
+ * Tests BucketManagerImpl, mocks MetadataManager for testing.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class TestBucketManagerImpl {
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private MetadataManager getMetadataManagerMock(String... volumesToCreate)
+      throws IOException {
+    MetadataManager metadataManager = Mockito.mock(MetadataManager.class);
+    Map<String, byte[]> metadataDB = new HashMap<>();
+    ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    Mockito.when(metadataManager.writeLock()).thenReturn(lock.writeLock());
+    Mockito.when(metadataManager.readLock()).thenReturn(lock.readLock());
+    Mockito.doAnswer(
+        new Answer<Void>() {
+          @Override
+          public Void answer(InvocationOnMock invocation) throws Throwable {
+            metadataDB.put(DFSUtil.bytes2String(
+                (byte[])invocation.getArguments()[0]),
+                (byte[])invocation.getArguments()[1]);
+            return null;
+          }
+        }).when(metadataManager).put(any(byte[].class), any(byte[].class));
+
+    Mockito.when(metadataManager.get(any(byte[].class))).thenAnswer(
+        (InvocationOnMock invocation) ->
+            metadataDB.get(DFSUtil.bytes2String(
+                (byte[])invocation.getArguments()[0]))
+    );
+    for(String volumeName : volumesToCreate) {
+      byte[] dummyVolumeInfo = DFSUtil.string2Bytes(volumeName);
+      metadataDB.put(volumeName, dummyVolumeInfo);
+    }
+    return metadataManager;
+  }
+
+  @Test
+  public void testCreateBucketWithoutVolume() throws IOException {
+    thrown.expectMessage("Volume doesn't exist");
+    MetadataManager metaMgr = getMetadataManagerMock();
+    try {
+      BucketManager bucketManager = new BucketManagerImpl(metaMgr);
+      KsmBucketArgs bucketArgs = KsmBucketArgs.newBuilder()
+          .setVolumeName("sampleVol")
+          .setBucketName("bucketOne")
+          .setStorageType(StorageType.DISK)
+          .setIsVersionEnabled(false)
+          .build();
+      bucketManager.createBucket(bucketArgs);
+    } catch(KSMException ksmEx) {
+      Assert.assertEquals(ResultCodes.FAILED_VOLUME_NOT_FOUND,
+          ksmEx.getResult());
+      throw ksmEx;
+    }
+  }
+
+  @Test
+  public void testCreateBucket() throws IOException {
+    MetadataManager metaMgr = getMetadataManagerMock("sampleVol");
+    BucketManager bucketManager = new BucketManagerImpl(metaMgr);
+    KsmBucketArgs bucketArgs = KsmBucketArgs.newBuilder()
+        .setVolumeName("sampleVol")
+        .setBucketName("bucketOne")
+        .setStorageType(StorageType.DISK)
+        .setIsVersionEnabled(false)
+        .build();
+    bucketManager.createBucket(bucketArgs);
+    //TODO: Use BucketManagerImpl#getBucketInfo to verify creation of bucket.
+    Assert.assertNotNull(metaMgr
+        .get(DFSUtil.string2Bytes("sampleVol/bucketOne")));
+  }
+
+  @Test
+  public void testCreateAlreadyExistingBucket() throws IOException {
+    thrown.expectMessage("Bucket already exist");
+    MetadataManager metaMgr = getMetadataManagerMock("sampleVol");
+    try {
+      BucketManager bucketManager = new BucketManagerImpl(metaMgr);
+      KsmBucketArgs bucketArgs = KsmBucketArgs.newBuilder()
+          .setVolumeName("sampleVol")
+          .setBucketName("bucketOne")
+          .setStorageType(StorageType.DISK)
+          .setIsVersionEnabled(false)
+          .build();
+      bucketManager.createBucket(bucketArgs);
+      bucketManager.createBucket(bucketArgs);
+    } catch(KSMException ksmEx) {
+      Assert.assertEquals(ResultCodes.FAILED_BUCKET_ALREADY_EXISTS,
+          ksmEx.getResult());
+      throw ksmEx;
+    }
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org