You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2017/05/22 20:50:54 UTC

hadoop git commit: HDFS-11770. Ozone: KSM: Add setVolumeProperty. Contributed by Mukul Kumar Singh.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 e0704c059 -> 3ff857f63


HDFS-11770. Ozone: KSM: Add setVolumeProperty. Contributed by Mukul Kumar Singh.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3ff857f6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3ff857f6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3ff857f6

Branch: refs/heads/HDFS-7240
Commit: 3ff857f63e801177ee9525ed706e6124f8e515a9
Parents: e0704c0
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Mon May 22 13:50:13 2017 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Mon May 22 13:50:35 2017 -0700

----------------------------------------------------------------------
 .../ksm/protocol/KeySpaceManagerProtocol.java   |   4 +-
 ...ceManagerProtocolClientSideTranslatorPB.java |  57 ++++-
 .../server/datanode/ObjectStoreHandler.java     |  11 +-
 .../org/apache/hadoop/ozone/OzoneConsts.java    |  12 +-
 .../hadoop/ozone/ksm/BucketManagerImpl.java     |  25 +--
 .../org/apache/hadoop/ozone/ksm/KSMMetrics.java |  40 ++++
 .../hadoop/ozone/ksm/KeySpaceManager.java       |  38 ++--
 .../hadoop/ozone/ksm/MetadataManager.java       |  31 ++-
 .../hadoop/ozone/ksm/MetadataManagerImpl.java   |  50 +++++
 .../apache/hadoop/ozone/ksm/VolumeManager.java  |  26 +++
 .../hadoop/ozone/ksm/VolumeManagerImpl.java     | 210 ++++++++++++++++---
 .../ozone/ksm/exceptions/KSMException.java      |   1 +
 ...ceManagerProtocolServerSideTranslatorPB.java |  78 ++++---
 .../hadoop/ozone/web/request/OzoneQuota.java    |  37 +++-
 .../web/storage/DistributedStorageHandler.java  |  30 +--
 .../hadoop/ozone/ksm/TestBucketManagerImpl.java |  19 +-
 .../hadoop/ozone/ksm/TestKeySpaceManager.java   | 166 +++++++++++++++
 .../ozone/web/TestDistributedOzoneVolumes.java  |  19 +-
 18 files changed, 728 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ff857f6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeySpaceManagerProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeySpaceManagerProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeySpaceManagerProtocol.java
index cd5d0c9..67a2c30 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeySpaceManagerProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeySpaceManagerProtocol.java
@@ -60,14 +60,14 @@ public interface KeySpaceManagerProtocol {
 
   /**
    * Gets the volume information.
-   * @param volume - Volume name.s
+   * @param volume - Volume name.
    * @return VolumeArgs or exception is thrown.
    * @throws IOException
    */
   KsmVolumeArgs getVolumeInfo(String volume) throws IOException;
 
   /**
-   * Deletes the an exisiting empty volume.
+   * Deletes an existing empty volume.
    * @param volume - Name of the volume.
    * @throws IOException
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ff857f6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java
index da13426..b7904b8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java
@@ -36,6 +36,14 @@ import org.apache.hadoop.ozone.protocol.proto
 import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.CreateVolumeResponse;
 import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.SetVolumePropertyRequest;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.SetVolumePropertyResponse;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.InfoVolumeRequest;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.InfoVolumeResponse;
+import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.VolumeInfo;
 import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.Status;
@@ -108,7 +116,8 @@ public final class KeySpaceManagerProtocolClientSideTranslatorPB
     }
 
     if (resp.getStatus() != Status.OK) {
-      throw new IOException("Volume creation failed error" + resp.getStatus());
+      throw new
+          IOException("Volume creation failed, error:" + resp.getStatus());
     }
   }
 
@@ -121,7 +130,19 @@ public final class KeySpaceManagerProtocolClientSideTranslatorPB
    */
   @Override
   public void setOwner(String volume, String owner) throws IOException {
-
+    SetVolumePropertyRequest.Builder req =
+        SetVolumePropertyRequest.newBuilder();
+    req.setVolumeName(volume).setOwnerName(owner);
+    final SetVolumePropertyResponse resp;
+    try {
+      resp = rpcProxy.setVolumeProperty(NULL_RPC_CONTROLLER, req.build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+    if (resp.getStatus() != Status.OK) {
+      throw new
+          IOException("Volume owner change failed, error:" + resp.getStatus());
+    }
   }
 
   /**
@@ -133,7 +154,19 @@ public final class KeySpaceManagerProtocolClientSideTranslatorPB
    */
   @Override
   public void setQuota(String volume, long quota) throws IOException {
-
+    SetVolumePropertyRequest.Builder req =
+        SetVolumePropertyRequest.newBuilder();
+    req.setVolumeName(volume).setQuotaInBytes(quota);
+    final SetVolumePropertyResponse resp;
+    try {
+      resp = rpcProxy.setVolumeProperty(NULL_RPC_CONTROLLER, req.build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+    if (resp.getStatus() != Status.OK) {
+      throw new
+          IOException("Volume quota change failed, error:" + resp.getStatus());
+    }
   }
 
   /**
@@ -152,17 +185,29 @@ public final class KeySpaceManagerProtocolClientSideTranslatorPB
   /**
    * Gets the volume information.
    *
-   * @param volume - Volume name.s
+   * @param volume - Volume name.
    * @return KsmVolumeArgs or exception is thrown.
    * @throws IOException
    */
   @Override
   public KsmVolumeArgs getVolumeInfo(String volume) throws IOException {
-    return null;
+    InfoVolumeRequest.Builder req = InfoVolumeRequest.newBuilder();
+    req.setVolumeName(volume);
+    final InfoVolumeResponse resp;
+    try {
+      resp = rpcProxy.infoVolume(NULL_RPC_CONTROLLER, req.build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+    if (resp.getStatus() != Status.OK) {
+      throw new
+          IOException("Info Volume failed, error:" + resp.getStatus());
+    }
+    return KsmVolumeArgs.getFromProtobuf(resp.getVolumeInfo());
   }
 
   /**
-   * Deletes the an exisiting empty volume.
+   * Deletes an existing empty volume.
    *
    * @param volume - Name of the volume.
    * @throws IOException

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ff857f6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java
index f44b3aa..0f1264b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java
@@ -69,6 +69,7 @@ public final class ObjectStoreHandler implements Closeable {
       keySpaceManagerClient;
   private final StorageContainerLocationProtocolClientSideTranslatorPB
       storageContainerLocationClient;
+  private final StorageHandler storageHandler;
 
   /**
    * Creates a new ObjectStoreHandler.
@@ -83,7 +84,6 @@ public final class ObjectStoreHandler implements Closeable {
         OZONE_HANDLER_TYPE_KEY, shType);
     boolean ozoneTrace = conf.getBoolean(OZONE_TRACE_ENABLED_KEY,
         OZONE_TRACE_ENABLED_DEFAULT);
-    final StorageHandler storageHandler;
 
     // Initialize Jersey container for object store web application.
     if (OzoneConsts.OZONE_HANDLER_DISTRIBUTED.equalsIgnoreCase(shType)) {
@@ -147,6 +147,15 @@ public final class ObjectStoreHandler implements Closeable {
     return this.objectStoreJerseyContainer;
   }
 
+  /**
+   * Returns the storage handler.
+   *
+   * @return returns the storage handler
+   */
+  public StorageHandler getStorageHandler() {
+    return this.storageHandler;
+  }
+
   @Override
   public void close() {
     LOG.info("Closing ObjectStoreHandler.");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ff857f6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 2ecd67d..a783249 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -94,9 +94,17 @@ public final class OzoneConsts {
   public static final String OZONE_HANDLER_LOCAL = "local";
 
   /**
-   * Ozone metadata key delimiter.
+   * KSM LevelDB prefixes.
    */
-  public static final String DB_KEY_DELIMITER = "/";
+  public static final String KSM_VOLUME_PREFIX = "/";
+  public static final String KSM_BUCKET_PREFIX = KSM_VOLUME_PREFIX;
+  public static final String KSM_KEY_PREFIX = KSM_VOLUME_PREFIX;
+  public static final String KSM_USER_PREFIX = "$";
+
+  /**
+   * Max KSM Quota size of 1024 PB.
+   */
+  public static final long MAX_QUOTA_IN_BYTES = 1024L * 1024 * TB;
 
   private OzoneConsts() {
     // Never Constructed

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ff857f6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManagerImpl.java
index b0368ea..5e975ee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManagerImpl.java
@@ -17,15 +17,12 @@
 package org.apache.hadoop.ozone.ksm;
 
 import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
 import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
 import org.iq80.leveldb.DBException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.hadoop.ozone.OzoneConsts.DB_KEY_DELIMITER;
-
 /**
  * KSM bucket manager.
  */
@@ -73,35 +70,31 @@ public class BucketManagerImpl implements BucketManager {
   public void createBucket(KsmBucketArgs args) throws KSMException {
     Preconditions.checkNotNull(args);
     metadataManager.writeLock().lock();
-    String volumeNameString = args.getVolumeName();
-    String bucketNameString = args.getBucketName();
+    String volumeName = args.getVolumeName();
+    String bucketName = args.getBucketName();
     try {
       //bucket key: {volume/bucket}
-      String bucketKeyString = volumeNameString +
-          DB_KEY_DELIMITER + bucketNameString;
-
-      byte[] volumeName = DFSUtil.string2Bytes(volumeNameString);
-      byte[] bucketKey = DFSUtil.string2Bytes(bucketKeyString);
+      byte[] volumeKey = metadataManager.getVolumeKey(volumeName);
+      byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
 
       //Check if the volume exists
-      if(metadataManager.get(volumeName) == null) {
-        LOG.error("volume: {} not found ", volumeNameString);
+      if(metadataManager.get(volumeKey) == null) {
+        LOG.error("volume: {} not found ", volumeName);
         throw new KSMException("Volume doesn't exist",
             KSMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
       }
       //Check if bucket already exists
       if(metadataManager.get(bucketKey) != null) {
-        LOG.error("bucket: {} already exists ", bucketNameString);
+        LOG.error("bucket: {} already exists ", bucketName);
         throw new KSMException("Bucket already exist",
             KSMException.ResultCodes.FAILED_BUCKET_ALREADY_EXISTS);
       }
       metadataManager.put(bucketKey, args.getProtobuf().toByteArray());
 
-      LOG.info("created bucket: {} in volume: {}", bucketNameString,
-          volumeNameString);
+      LOG.debug("created bucket: {} in volume: {}", bucketName, volumeName);
     } catch (DBException ex) {
       LOG.error("Bucket creation failed for bucket:{} in volume:{}",
-          volumeNameString, bucketNameString, ex);
+          bucketName, volumeName, ex);
       throw new KSMException(ex.getMessage(),
           KSMException.ResultCodes.FAILED_INTERNAL_ERROR);
     } finally {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ff857f6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
index 359ddba..4ebc3c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
@@ -29,10 +29,14 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 public class KSMMetrics {
   // KSM op metrics
   private @Metric MutableCounterLong numVolumeCreates;
+  private @Metric MutableCounterLong numVolumeModifies;
+  private @Metric MutableCounterLong numVolumeInfos;
   private @Metric MutableCounterLong numBucketCreates;
 
   // Failure Metrics
   private @Metric MutableCounterLong numVolumeCreateFails;
+  private @Metric MutableCounterLong numVolumeModifyFails;
+  private @Metric MutableCounterLong numVolumeInfoFails;
   private @Metric MutableCounterLong numBucketCreateFails;
 
   public KSMMetrics() {
@@ -49,6 +53,14 @@ public class KSMMetrics {
     numVolumeCreates.incr();
   }
 
+  public void incNumVolumeModifies() {
+    numVolumeModifies.incr();
+  }
+
+  public void incNumVolumeInfos() {
+    numVolumeInfos.incr();
+  }
+
   public void incNumBucketCreates() {
     numBucketCreates.incr();
   }
@@ -57,6 +69,14 @@ public class KSMMetrics {
     numVolumeCreates.incr();
   }
 
+  public void incNumVolumeModifyFails() {
+    numVolumeModifyFails.incr();
+  }
+
+  public void incNumVolumeInfoFails() {
+    numVolumeInfoFails.incr();
+  }
+
   public void incNumBucketCreateFails() {
     numBucketCreateFails.incr();
   }
@@ -67,6 +87,16 @@ public class KSMMetrics {
   }
 
   @VisibleForTesting
+  public long getNumVolumeModifies() {
+    return numVolumeModifies.value();
+  }
+
+  @VisibleForTesting
+  public long getNumVolumeInfos() {
+    return numVolumeInfos.value();
+  }
+
+  @VisibleForTesting
   public long getNumBucketCreates() {
     return numBucketCreates.value();
   }
@@ -77,6 +107,16 @@ public class KSMMetrics {
   }
 
   @VisibleForTesting
+  public long getNumVolumeModifyFails() {
+    return numVolumeModifyFails.value();
+  }
+
+  @VisibleForTesting
+  public long getNumVolumeInfoFails() {
+    return numVolumeInfoFails.value();
+  }
+
+  @VisibleForTesting
   public long getNumBucketCreateFails() {
     return numBucketCreateFails.value();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ff857f6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
index 2578a9a..aee1e0c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
@@ -17,7 +17,6 @@
 
 package org.apache.hadoop.ozone.ksm;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.BlockingService;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -122,15 +121,6 @@ public class KeySpaceManager implements KeySpaceManagerProtocol {
   }
 
   /**
-   * Returns listening address of Key Space Manager RPC server.
-   *
-   * @return listen address of Key Space Manager RPC server
-   */
-  @VisibleForTesting
-  public InetSocketAddress getClientRpcAddress() {
-    return ksmRpcAddress;
-  }
-  /**
    * Main entry point for starting KeySpaceManager.
    *
    * @param argv arguments
@@ -244,7 +234,13 @@ public class KeySpaceManager implements KeySpaceManagerProtocol {
    */
   @Override
   public void setOwner(String volume, String owner) throws IOException {
-
+    try {
+      metrics.incNumVolumeModifies();
+      volumeManager.setOwner(volume, owner);
+    } catch (Exception ex) {
+      metrics.incNumVolumeModifyFails();
+      throw ex;
+    }
   }
 
   /**
@@ -256,7 +252,13 @@ public class KeySpaceManager implements KeySpaceManagerProtocol {
    */
   @Override
   public void setQuota(String volume, long quota) throws IOException {
-
+    try {
+      metrics.incNumVolumeModifies();
+      volumeManager.setQuota(volume, quota);
+    } catch (Exception ex) {
+      metrics.incNumVolumeModifyFails();
+      throw ex;
+    }
   }
 
   /**
@@ -275,17 +277,23 @@ public class KeySpaceManager implements KeySpaceManagerProtocol {
   /**
    * Gets the volume information.
    *
-   * @param volume - Volume name.s
+   * @param volume - Volume name.
    * @return VolumeArgs or exception is thrown.
    * @throws IOException
    */
   @Override
   public KsmVolumeArgs getVolumeInfo(String volume) throws IOException {
-    return null;
+    try {
+      metrics.incNumVolumeInfos();
+      return volumeManager.getVolumeInfo(volume);
+    } catch (Exception ex) {
+      metrics.incNumVolumeInfoFails();
+      throw ex;
+    }
   }
 
   /**
-   * Deletes the an exisiting empty volume.
+   * Deletes an existing empty volume.
    *
    * @param volume - Name of the volume.
    * @throws IOException

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ff857f6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java
index 71269b2..407d46a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java
@@ -62,10 +62,37 @@ public interface MetadataManager {
   void put(byte[] key, byte[] value);
 
   /**
+   * Performs batch Put and Delete to Metadata DB.
+   * Can be used to do multiple puts and deletes atomically.
+   * @param putList - list of Key/Value to put into DB
+   * @param delList - list of Key to delete from DB
+   */
+  void batchPutDelete(List<Map.Entry<byte[], byte[]>> putList,
+                      List<byte[]> delList) throws IOException;
+
+  /**
    * Performs a batch Put to Metadata DB.
    * Can be used to do multiple puts atomically.
-   * @param list - list of Map.Entry
+   * @param putList - list of Key/Value to put into DB
+   */
+  void batchPut(List<Map.Entry<byte[], byte[]>> putList) throws IOException;
+
+  /**
+   * Given a volume return the corresponding DB key.
+   * @param volume - Volume name
    */
-  void batchPut(List<Map.Entry<byte[], byte[]>> list) throws IOException;
+  byte[] getVolumeKey(String volume);
 
+  /**
+   * Given a user return the corresponding DB key.
+   * @param user - User name
+   */
+  byte[] getUserKey(String user);
+
+  /**
+   * Given a volume and bucket, return the corresponding DB key.
+   * @param volume - User name
+   * @param bucket - Bucket name
+   */
+  byte[] getBucketKey(String volume, String bucket);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ff857f6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java
index f4b0440..0a91bc0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.hadoop.ozone.ksm;
 
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.web.utils.OzoneUtils;
@@ -76,6 +77,35 @@ public class MetadataManagerImpl implements  MetadataManager {
   }
 
   /**
+   * Given a volume return the corresponding DB key.
+   * @param volume - Volume name
+   */
+  public byte[] getVolumeKey(String volume) {
+    String dbVolumeName = OzoneConsts.KSM_VOLUME_PREFIX + volume;
+    return DFSUtil.string2Bytes(dbVolumeName);
+  }
+
+  /**
+   * Given a user return the corresponding DB key.
+   * @param user - User name
+   */
+  public byte[] getUserKey(String user) {
+    String dbUserName = OzoneConsts.KSM_USER_PREFIX + user;
+    return DFSUtil.string2Bytes(dbUserName);
+  }
+
+  /**
+   * Given a volume and bucket, return the corresponding DB key.
+   * @param volume - User name
+   * @param bucket - Bucket name
+   */
+  public byte[] getBucketKey(String volume, String bucket) {
+    String bucketKeyString = OzoneConsts.KSM_VOLUME_PREFIX + volume
+        + OzoneConsts.KSM_BUCKET_PREFIX + bucket;
+    return DFSUtil.string2Bytes(bucketKeyString);
+  }
+
+  /**
    * Returns the read lock used on Metadata DB.
    * @return readLock
    */
@@ -114,6 +144,26 @@ public class MetadataManagerImpl implements  MetadataManager {
   }
 
   /**
+   * Performs a batch Put and Delete from Metadata DB.
+   * Can be used to do multiple puts and deletes atomically.
+   * @param putList - list of key and value pairs to put to Metadata DB.
+   * @param delList - list of keys to delete from Metadata DB.
+   */
+  @Override
+  public void batchPutDelete(List<Map.Entry<byte[], byte[]>> putList,
+                             List<byte[]> delList)
+      throws IOException {
+    WriteBatch batch = store.createWriteBatch();
+    putList.forEach(entry -> batch.put(entry.getKey(), entry.getValue()));
+    delList.forEach(entry -> batch.delete(entry));
+    try {
+      store.commitWriteBatch(batch);
+    } finally {
+      store.closeWriteBatch(batch);
+    }
+  }
+
+  /**
    * Performs a batch Put to Metadata DB.
    * Can be used to do multiple puts atomically.
    * @param list - list of Map.Entry

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ff857f6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManager.java
index 6c2f0a3..489646d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManager.java
@@ -30,4 +30,30 @@ public interface VolumeManager {
    * @param args - Volume args to create a volume
    */
   void createVolume(KsmVolumeArgs args) throws IOException;
+
+  /**
+   * Changes the owner of a volume.
+   *
+   * @param volume - Name of the volume.
+   * @param owner - Name of the owner.
+   * @throws IOException
+   */
+  void setOwner(String volume, String owner) throws IOException;
+
+  /**
+   * Changes the Quota on a volume.
+   *
+   * @param volume - Name of the volume.
+   * @param quota - Quota in bytes.
+   * @throws IOException
+   */
+  void setQuota(String volume, long quota) throws IOException;
+
+  /**
+   * Gets the volume information.
+   * @param volume - Volume name.
+   * @return VolumeArgs or exception is thrown.
+   * @throws IOException
+   */
+  KsmVolumeArgs getVolumeInfo(String volume) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ff857f6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
index ff0d087..7b96da7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
@@ -17,7 +17,6 @@
 package org.apache.hadoop.ozone.ksm;
 
 import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
 import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
@@ -64,6 +63,62 @@ public class VolumeManagerImpl implements VolumeManager {
         OZONE_KSM_USER_MAX_VOLUME_DEFAULT);
   }
 
+  // Helpers to add and delete volume from user list
+  private void addVolumeToOwnerList(String volume, String owner,
+                                    List<Map.Entry<byte[], byte[]>> putBatch)
+      throws IOException {
+    // Get the volume list
+    byte[] dbUserKey = metadataManager.getUserKey(owner);
+    byte[] volumeList  = metadataManager.get(dbUserKey);
+    List<String> prevVolList = new LinkedList<>();
+    if (volumeList != null) {
+      VolumeList vlist = VolumeList.parseFrom(volumeList);
+      prevVolList.addAll(vlist.getVolumeNamesList());
+    }
+
+    // Check the volume count
+    if (prevVolList.size() >= maxUserVolumeCount) {
+      LOG.error("Too many volumes for user:{}", owner);
+      throw new KSMException(ResultCodes.FAILED_TOO_MANY_USER_VOLUMES);
+    }
+
+    // Add the new volume to the list
+    prevVolList.add(volume);
+    VolumeList newVolList = VolumeList.newBuilder()
+        .addAllVolumeNames(prevVolList).build();
+    putBatch.add(batchEntry(dbUserKey, newVolList.toByteArray()));
+  }
+
+  private void delVolumeFromOwnerList(String volume, String owner,
+                                      List<Map.Entry<byte[], byte[]>> putBatch,
+                                      List<byte[]> deleteBatch)
+      throws IOException {
+    // Get the volume list
+    byte[] dbUserKey = metadataManager.getUserKey(owner);
+    byte[] volumeList  = metadataManager.get(dbUserKey);
+    List<String> prevVolList = new LinkedList<>();
+    if (volumeList != null) {
+      VolumeList vlist = VolumeList.parseFrom(volumeList);
+      prevVolList.addAll(vlist.getVolumeNamesList());
+    } else {
+      throw new KSMException(ResultCodes.FAILED_USER_NOT_FOUND);
+    }
+
+    // Remove the volume from the list
+    prevVolList.remove(volume);
+    if (prevVolList.size() == 0) {
+      deleteBatch.add(dbUserKey);
+    } else {
+      VolumeList newVolList = VolumeList.newBuilder()
+          .addAllVolumeNames(prevVolList).build();
+      putBatch.add(batchEntry(dbUserKey, newVolList.toByteArray()));
+    }
+  }
+
+  private Map.Entry<byte[], byte[]> batchEntry(byte[] key, byte[] value) {
+    return new AbstractMap.SimpleEntry<>(key, value);
+  }
+
   /**
    * Creates a volume.
    * @param args - KsmVolumeArgs.
@@ -74,42 +129,21 @@ public class VolumeManagerImpl implements VolumeManager {
     metadataManager.writeLock().lock();
     List<Map.Entry<byte[], byte[]>> batch = new LinkedList<>();
     try {
-      byte[] volumeName = metadataManager.
-          get(DFSUtil.string2Bytes(args.getVolume()));
+      byte[] dbVolumeKey = metadataManager.getVolumeKey(args.getVolume());
+      byte[] volumeInfo = metadataManager.get(dbVolumeKey);
 
       // Check of the volume already exists
-      if(volumeName != null) {
+      if (volumeInfo != null) {
         LOG.error("volume:{} already exists", args.getVolume());
         throw new KSMException(ResultCodes.FAILED_VOLUME_ALREADY_EXISTS);
       }
 
-      // Next count the number of volumes for the user
-      String dbUserName = "$" + args.getOwnerName();
-      byte[] volumeList  = metadataManager
-          .get(DFSUtil.string2Bytes(dbUserName));
-      List prevVolList;
-      if (volumeList != null) {
-        VolumeList vlist = VolumeList.parseFrom(volumeList);
-        prevVolList = vlist.getVolumeNamesList();
-      } else {
-        prevVolList = new LinkedList();
-      }
-
-      if (prevVolList.size() >= maxUserVolumeCount) {
-        LOG.error("Too many volumes for user:{}", args.getOwnerName());
-        throw new KSMException(ResultCodes.FAILED_TOO_MANY_USER_VOLUMES);
-      }
-
-      // Commit the volume information to metadataManager
-      VolumeInfo volumeInfo = args.getProtobuf();
-      batch.add(new AbstractMap.SimpleEntry<>(
-          DFSUtil.string2Bytes(args.getVolume()), volumeInfo.toByteArray()));
+      // Write the vol info
+      VolumeInfo newVolumeInfo = args.getProtobuf();
+      batch.add(batchEntry(dbVolumeKey, newVolumeInfo.toByteArray()));
 
-      prevVolList.add(args.getVolume());
-      VolumeList newVolList = VolumeList.newBuilder()
-              .addAllVolumeNames(prevVolList).build();
-      batch.add(new AbstractMap.SimpleEntry<>(
-          DFSUtil.string2Bytes(dbUserName), newVolList.toByteArray()));
+      // Add volume to user list
+      addVolumeToOwnerList(args.getVolume(), args.getOwnerName(), batch);
       metadataManager.batchPut(batch);
       LOG.info("created volume:{} user:{}",
                                   args.getVolume(), args.getOwnerName());
@@ -121,4 +155,120 @@ public class VolumeManagerImpl implements VolumeManager {
       metadataManager.writeLock().unlock();
     }
   }
+
+  /**
+   * Changes the owner of a volume.
+   *
+   * @param volume - Name of the volume.
+   * @param owner - Name of the owner.
+   * @throws IOException
+   */
+  @Override
+  public void setOwner(String volume, String owner) throws IOException {
+    Preconditions.checkNotNull(volume);
+    Preconditions.checkNotNull(owner);
+    List<Map.Entry<byte[], byte[]>> putbatch = new LinkedList<>();
+    List<byte[]> deletebatch = new LinkedList<>();
+    metadataManager.writeLock().lock();
+    try {
+      byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
+      byte[] volInfo = metadataManager.get(dbVolumeKey);
+      if (volInfo == null) {
+        throw  new KSMException(ResultCodes.FAILED_VOLUME_NOT_FOUND);
+      }
+
+      VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo);
+      KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(volumeInfo);
+      Preconditions.checkState(volume.equalsIgnoreCase(volumeInfo.getVolume()));
+
+      delVolumeFromOwnerList(volume, volumeArgs.getOwnerName(),
+          putbatch, deletebatch);
+      addVolumeToOwnerList(volume, owner, putbatch);
+
+      KsmVolumeArgs newVolumeArgs =
+          KsmVolumeArgs.newBuilder().setVolume(volumeArgs.getVolume())
+              .setAdminName(volumeArgs.getAdminName())
+              .setOwnerName(owner)
+              .setQuotaInBytes(volumeArgs.getQuotaInBytes())
+              .build();
+
+      VolumeInfo newVolumeInfo = newVolumeArgs.getProtobuf();
+      putbatch.add(batchEntry(dbVolumeKey, newVolumeInfo.toByteArray()));
+
+      metadataManager.batchPutDelete(putbatch, deletebatch);
+    } catch (IOException ex) {
+      LOG.error("Changing volume ownership failed for user:{} volume:{}",
+          owner, volume, ex);
+      throw ex;
+    } finally {
+      metadataManager.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Changes the Quota on a volume.
+   *
+   * @param volume - Name of the volume.
+   * @param quota - Quota in bytes.
+   * @throws IOException
+   */
+  public void setQuota(String volume, long quota) throws IOException {
+    Preconditions.checkNotNull(volume);
+    metadataManager.writeLock().lock();
+    try {
+      byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
+      byte[] volInfo = metadataManager.get(dbVolumeKey);
+      if (volInfo == null) {
+        throw  new KSMException(ResultCodes.FAILED_VOLUME_NOT_FOUND);
+      }
+
+      VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo);
+      KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(volumeInfo);
+      Preconditions.checkState(volume.equalsIgnoreCase(volumeInfo.getVolume()));
+
+      KsmVolumeArgs newVolumeArgs =
+          KsmVolumeArgs.newBuilder().setVolume(volumeArgs.getVolume())
+              .setAdminName(volumeArgs.getAdminName())
+              .setOwnerName(volumeArgs.getOwnerName())
+              .setQuotaInBytes(quota)
+              .build();
+
+      VolumeInfo newVolumeInfo = newVolumeArgs.getProtobuf();
+      metadataManager.put(dbVolumeKey, newVolumeInfo.toByteArray());
+    } catch (IOException ex) {
+      LOG.error("Changing volume quota failed for volume:{} quota:{}",
+          volume, quota, ex);
+      throw ex;
+    } finally {
+      metadataManager.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Gets the volume information.
+   * @param volume - Volume name.
+   * @return VolumeArgs or exception is thrown.
+   * @throws IOException
+   */
+  public KsmVolumeArgs getVolumeInfo(String volume) throws IOException {
+    Preconditions.checkNotNull(volume);
+    metadataManager.readLock().lock();
+    try {
+      byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
+      byte[] volInfo = metadataManager.get(dbVolumeKey);
+      if (volInfo == null) {
+        throw  new KSMException(ResultCodes.FAILED_VOLUME_NOT_FOUND);
+      }
+
+      VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo);
+      KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(volumeInfo);
+      Preconditions.checkState(volume.equalsIgnoreCase(volumeInfo.getVolume()));
+      return volumeArgs;
+    } catch (IOException ex) {
+      LOG.error("Info volume failed for volume:{}", volume, ex);
+      throw ex;
+    } finally {
+      metadataManager.readLock().unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ff857f6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java
index e1b90c3..7d9da03 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java
@@ -99,6 +99,7 @@ public class KSMException extends IOException {
     FAILED_TOO_MANY_USER_VOLUMES,
     FAILED_VOLUME_ALREADY_EXISTS,
     FAILED_VOLUME_NOT_FOUND,
+    FAILED_USER_NOT_FOUND,
     FAILED_BUCKET_ALREADY_EXISTS,
     FAILED_INTERNAL_ERROR
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ff857f6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java
index 85eca04..32f9a70 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
 import org.apache.hadoop.ksm.protocol.KeySpaceManagerProtocol;
 import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB;
 import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
-import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes;
 import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.CreateBucketRequest;
 import org.apache.hadoop.ozone.protocol.proto
@@ -77,6 +76,29 @@ public class KeySpaceManagerProtocolServerSideTranslatorPB implements
     this.impl = impl;
   }
 
+  // Convert and exception to corresponding status code
+  private Status exceptionToResponseStatus(IOException ex) {
+    if (ex instanceof KSMException) {
+      KSMException ksmException = (KSMException)ex;
+      switch (ksmException.getResult()) {
+      case FAILED_VOLUME_ALREADY_EXISTS:
+        return Status.VOLUME_ALREADY_EXISTS;
+      case FAILED_TOO_MANY_USER_VOLUMES:
+        return Status.USER_TOO_MANY_VOLUMES;
+      case FAILED_VOLUME_NOT_FOUND:
+        return Status.VOLUME_NOT_FOUND;
+      case FAILED_USER_NOT_FOUND:
+        return Status.USER_NOT_FOUND;
+      case FAILED_BUCKET_ALREADY_EXISTS:
+        return Status.BUCKET_ALREADY_EXISTS;
+      default:
+        return Status.INTERNAL_ERROR;
+      }
+    } else {
+      return Status.INTERNAL_ERROR;
+    }
+  }
+
   @Override
   public CreateVolumeResponse createVolume(
       RpcController controller, CreateVolumeRequest request)
@@ -86,18 +108,7 @@ public class KeySpaceManagerProtocolServerSideTranslatorPB implements
     try {
       impl.createVolume(KsmVolumeArgs.getFromProtobuf(request.getVolumeInfo()));
     } catch (IOException e) {
-      if (e instanceof KSMException) {
-        KSMException ksmException = (KSMException)e;
-        if (ksmException.getResult() ==
-            ResultCodes.FAILED_VOLUME_ALREADY_EXISTS) {
-          resp.setStatus(Status.VOLUME_ALREADY_EXISTS);
-        } else if (ksmException.getResult() ==
-            ResultCodes.FAILED_TOO_MANY_USER_VOLUMES) {
-          resp.setStatus(Status.USER_TOO_MANY_VOLUMES);
-        }
-      } else {
-        resp.setStatus(Status.INTERNAL_ERROR);
-      }
+      resp.setStatus(exceptionToResponseStatus(e));
     }
     return resp.build();
   }
@@ -106,7 +117,23 @@ public class KeySpaceManagerProtocolServerSideTranslatorPB implements
   public SetVolumePropertyResponse setVolumeProperty(
       RpcController controller, SetVolumePropertyRequest request)
       throws ServiceException {
-    return null;
+    SetVolumePropertyResponse.Builder resp =
+        SetVolumePropertyResponse.newBuilder();
+    resp.setStatus(Status.OK);
+    String volume = request.getVolumeName();
+
+    try {
+      if (request.hasQuotaInBytes()) {
+        long quota = request.getQuotaInBytes();
+        impl.setQuota(volume, quota);
+      } else {
+        String owner = request.getOwnerName();
+        impl.setOwner(volume, owner);
+      }
+    } catch (IOException e) {
+      resp.setStatus(exceptionToResponseStatus(e));
+    }
+    return resp.build();
   }
 
   @Override
@@ -120,7 +147,16 @@ public class KeySpaceManagerProtocolServerSideTranslatorPB implements
   public InfoVolumeResponse infoVolume(
       RpcController controller, InfoVolumeRequest request)
       throws ServiceException {
-    return null;
+    InfoVolumeResponse.Builder resp = InfoVolumeResponse.newBuilder();
+    resp.setStatus(Status.OK);
+    String volume = request.getVolumeName();
+    try {
+      KsmVolumeArgs ret = impl.getVolumeInfo(volume);
+      resp.setVolumeInfo(ret.getProtobuf());
+    } catch (IOException e) {
+      resp.setStatus(exceptionToResponseStatus(e));
+    }
+    return resp.build();
   }
 
   @Override
@@ -147,16 +183,8 @@ public class KeySpaceManagerProtocolServerSideTranslatorPB implements
       impl.createBucket(KsmBucketArgs.getFromProtobuf(
           request.getBucketInfo()));
       resp.setStatus(Status.OK);
-    } catch (KSMException ksmEx) {
-      if (ksmEx.getResult() ==
-          ResultCodes.FAILED_VOLUME_NOT_FOUND) {
-        resp.setStatus(Status.VOLUME_NOT_FOUND);
-      } else if (ksmEx.getResult() ==
-          ResultCodes.FAILED_BUCKET_ALREADY_EXISTS) {
-        resp.setStatus(Status.BUCKET_ALREADY_EXISTS);
-      }
-    } catch(IOException ex) {
-      resp.setStatus(Status.INTERNAL_ERROR);
+    } catch (IOException e) {
+      resp.setStatus(exceptionToResponseStatus(e));
     }
     return resp.build();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ff857f6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/request/OzoneQuota.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/request/OzoneQuota.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/request/OzoneQuota.java
index 501b239..c73bd6d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/request/OzoneQuota.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/request/OzoneQuota.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.web.request;
 
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.web.headers.Header;
 import org.codehaus.jackson.annotate.JsonIgnore;
 
@@ -29,10 +30,6 @@ import org.codehaus.jackson.annotate.JsonIgnore;
  */
 @InterfaceAudience.Private
 public class OzoneQuota {
-  private static final long MB_IN_BYTES = 1048576L;
-  private static final long GB_IN_BYTES = 1073741824L;
-  private static final long TB_IN_BYTES = 1099511627776L;
-
 
   private Units unit;
   private int size;
@@ -179,14 +176,40 @@ public class OzoneQuota {
     case BYTES:
       return this.getSize();
     case MB:
-      return this.getSize() * MB_IN_BYTES;
+      return this.getSize() * OzoneConsts.MB;
     case GB:
-      return this.getSize() * GB_IN_BYTES;
+      return this.getSize() * OzoneConsts.GB;
     case TB:
-      return this.getSize() * TB_IN_BYTES;
+      return this.getSize() * OzoneConsts.TB;
     case UNDEFINED:
     default:
       return -1;
     }
   }
+
+  /**
+   * Returns OzoneQuota corresponding to size in bytes.
+   *
+   * @param sizeInBytes size in bytes to be converted
+   *
+   * @return OzoneQuota object
+   */
+  public static OzoneQuota getOzoneQuota(long sizeInBytes) {
+    long size;
+    Units unit;
+    if (sizeInBytes % OzoneConsts.TB == 0) {
+      size = sizeInBytes / OzoneConsts.TB;
+      unit = Units.TB;
+    } else if (sizeInBytes % OzoneConsts.GB == 0) {
+      size = sizeInBytes / OzoneConsts.GB;
+      unit = Units.GB;
+    } else if (sizeInBytes % OzoneConsts.MB == 0) {
+      size = sizeInBytes / OzoneConsts.MB;
+      unit = Units.MB;
+    } else {
+      size = sizeInBytes;
+      unit = Units.BYTES;
+    }
+    return new OzoneQuota((int)size, unit);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ff857f6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
index 7a9bd4e..f09efb4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.OzoneConsts.Versioning;
 import org.apache.hadoop.ozone.protocolPB.KSMPBHelper;
+import org.apache.hadoop.ozone.web.request.OzoneQuota;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.scm.ScmConfigKeys;
 import org.apache.hadoop.scm.XceiverClientManager;
@@ -115,7 +116,7 @@ public final class DistributedStorageHandler implements StorageHandler {
   @Override
   public void createVolume(VolumeArgs args) throws IOException, OzoneException {
     long quota = args.getQuota() == null ?
-        Long.MAX_VALUE : args.getQuota().sizeInBytes();
+        OzoneConsts.MAX_QUOTA_IN_BYTES : args.getQuota().sizeInBytes();
     KsmVolumeArgs volumeArgs = KsmVolumeArgs.newBuilder()
         .setAdminName(args.getAdminName())
         .setOwnerName(args.getUserName())
@@ -128,13 +129,15 @@ public final class DistributedStorageHandler implements StorageHandler {
   @Override
   public void setVolumeOwner(VolumeArgs args) throws
       IOException, OzoneException {
-    throw new UnsupportedOperationException("setVolumeOwner not implemented");
+    keySpaceManagerClient.setOwner(args.getVolumeName(), args.getUserName());
   }
 
   @Override
   public void setVolumeQuota(VolumeArgs args, boolean remove)
       throws IOException, OzoneException {
-    throw new UnsupportedOperationException("setVolumeQuota not implemented");
+    long quota = remove ? OzoneConsts.MAX_QUOTA_IN_BYTES :
+                                   args.getQuota().sizeInBytes();
+    keySpaceManagerClient.setQuota(args.getVolumeName(), quota);
   }
 
   @Override
@@ -158,18 +161,15 @@ public final class DistributedStorageHandler implements StorageHandler {
   @Override
   public VolumeInfo getVolumeInfo(VolumeArgs args)
       throws IOException, OzoneException {
-    String containerKey = buildContainerKey(args.getVolumeName());
-    XceiverClientSpi xceiverClient = acquireXceiverClient(containerKey);
-    try {
-      KeyData containerKeyData = containerKeyDataForRead(
-          xceiverClient.getPipeline().getContainerName(), containerKey);
-      GetKeyResponseProto response = getKey(xceiverClient, containerKeyData,
-          args.getRequestID());
-      return fromContainerKeyValueListToVolume(
-          response.getKeyData().getMetadataList());
-    } finally {
-      xceiverClientManager.releaseClient(xceiverClient);
-    }
+    KsmVolumeArgs volumeArgs =
+        keySpaceManagerClient.getVolumeInfo(args.getVolumeName());
+    //TODO: add support for createdOn and other fields in getVolumeInfo
+    VolumeInfo volInfo =
+        new VolumeInfo(volumeArgs.getVolume(), null,
+            volumeArgs.getAdminName());
+    volInfo.setOwner(new VolumeOwner(volumeArgs.getOwnerName()));
+    volInfo.setQuota(OzoneQuota.getOzoneQuota(volumeArgs.getQuotaInBytes()));
+    return volInfo;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ff857f6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestBucketManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestBucketManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestBucketManagerImpl.java
index 5bd9ec6..c67b22b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestBucketManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestBucketManagerImpl.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.ksm;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
+import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
 import org.apache.hadoop.ozone.ksm.exceptions
     .KSMException.ResultCodes;
@@ -56,6 +57,19 @@ public class TestBucketManagerImpl {
 
     Mockito.when(metadataManager.writeLock()).thenReturn(lock.writeLock());
     Mockito.when(metadataManager.readLock()).thenReturn(lock.readLock());
+    Mockito.when(metadataManager.getVolumeKey(any(String.class))).thenAnswer(
+        (InvocationOnMock invocation) ->
+            DFSUtil.string2Bytes(
+                OzoneConsts.KSM_VOLUME_PREFIX + invocation.getArguments()[0]));
+    Mockito.when(metadataManager
+        .getBucketKey(any(String.class), any(String.class))).thenAnswer(
+            (InvocationOnMock invocation) ->
+                DFSUtil.string2Bytes(
+                    OzoneConsts.KSM_VOLUME_PREFIX
+                        + invocation.getArguments()[0]
+                        + OzoneConsts.KSM_BUCKET_PREFIX
+                        + invocation.getArguments()[1]));
+
     Mockito.doAnswer(
         new Answer<Void>() {
           @Override
@@ -74,7 +88,8 @@ public class TestBucketManagerImpl {
     );
     for(String volumeName : volumesToCreate) {
       byte[] dummyVolumeInfo = DFSUtil.string2Bytes(volumeName);
-      metadataDB.put(volumeName, dummyVolumeInfo);
+      metadataDB.put(OzoneConsts.KSM_VOLUME_PREFIX + volumeName,
+                     dummyVolumeInfo);
     }
     return metadataManager;
   }
@@ -112,7 +127,7 @@ public class TestBucketManagerImpl {
     bucketManager.createBucket(bucketArgs);
     //TODO: Use BucketManagerImpl#getBucketInfo to verify creation of bucket.
     Assert.assertNotNull(metaMgr
-        .get(DFSUtil.string2Bytes("sampleVol/bucketOne")));
+        .get(DFSUtil.string2Bytes("/sampleVol/bucketOne")));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ff857f6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
new file mode 100644
index 0000000..5e35347
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.ksm;
+
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.web.exceptions.OzoneException;
+import org.apache.hadoop.ozone.web.handlers.UserArgs;
+import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
+import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
+import org.apache.hadoop.ozone.web.request.OzoneQuota;
+import org.apache.hadoop.ozone.web.response.VolumeInfo;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+import java.io.IOException;
+import java.util.Random;
+
+/**
+ * Test Key Space Manager operation in distributed handler scenario.
+ */
+public class TestKeySpaceManager {
+  private static MiniOzoneCluster cluster = null;
+  private static StorageHandler storageHandler;
+  private static UserArgs volUserArgs;
+  private static KSMMetrics ksmMetrics;
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true and
+   * OZONE_HANDLER_TYPE_KEY = "distributed"
+   *
+   * @throws IOException
+   */
+  @BeforeClass
+  public static void init() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
+        OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
+    cluster = new MiniOzoneCluster.Builder(conf)
+        .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
+    storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
+    volUserArgs = new UserArgs(null, null, null, null, null, null);
+    ksmMetrics = cluster.getKeySpaceManager().getMetrics();
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  // Create a volume and test its attribute after creating them
+  @Test(timeout = 60000)
+  public void testCreateVolume() throws IOException, OzoneException {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, volUserArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(createVolumeArgs);
+
+    VolumeArgs getVolumeArgs = new VolumeArgs(volumeName, volUserArgs);
+    VolumeInfo retVolumeinfo = storageHandler.getVolumeInfo(getVolumeArgs);
+    Assert.assertTrue(retVolumeinfo.getVolumeName().equals(volumeName));
+    Assert.assertTrue(retVolumeinfo.getOwner().getName().equals(userName));
+    Assert.assertEquals(0, ksmMetrics.getNumVolumeCreateFails());
+  }
+
+  // Create a volume and modify the volume owner and then test its attributes
+  @Test(timeout = 60000)
+  public void testChangeVolumeOwner() throws IOException, OzoneException {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, volUserArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(createVolumeArgs);
+
+    String newUserName = "user" + RandomStringUtils.randomNumeric(5);
+    createVolumeArgs.setUserName(newUserName);
+    storageHandler.setVolumeOwner(createVolumeArgs);
+
+    VolumeArgs getVolumeArgs = new VolumeArgs(volumeName, volUserArgs);
+    VolumeInfo retVolumeInfo = storageHandler.getVolumeInfo(getVolumeArgs);
+
+    Assert.assertTrue(retVolumeInfo.getVolumeName().equals(volumeName));
+    Assert.assertFalse(retVolumeInfo.getOwner().getName().equals(userName));
+    Assert.assertTrue(retVolumeInfo.getOwner().getName().equals(newUserName));
+    Assert.assertEquals(0, ksmMetrics.getNumVolumeCreateFails());
+    Assert.assertEquals(0, ksmMetrics.getNumVolumeInfoFails());
+  }
+
+  // Create a volume and modify the volume owner and then test its attributes
+  @Test(timeout = 60000)
+  public void testChangeVolumeQuota() throws IOException, OzoneException {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    Random rand = new Random();
+
+    // Create a new volume with a quota
+    OzoneQuota createQuota =
+        new OzoneQuota(rand.nextInt(100), OzoneQuota.Units.GB);
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, volUserArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+    createVolumeArgs.setQuota(createQuota);
+    storageHandler.createVolume(createVolumeArgs);
+
+    VolumeArgs getVolumeArgs = new VolumeArgs(volumeName, volUserArgs);
+    VolumeInfo retVolumeInfo = storageHandler.getVolumeInfo(getVolumeArgs);
+    Assert.assertEquals(retVolumeInfo.getQuota().sizeInBytes(),
+                                              createQuota.sizeInBytes());
+
+    // Set a new quota and test it
+    OzoneQuota setQuota =
+        new OzoneQuota(rand.nextInt(100), OzoneQuota.Units.GB);
+    createVolumeArgs.setQuota(setQuota);
+    storageHandler.setVolumeQuota(createVolumeArgs, false);
+    getVolumeArgs = new VolumeArgs(volumeName, volUserArgs);
+    retVolumeInfo = storageHandler.getVolumeInfo(getVolumeArgs);
+    Assert.assertEquals(retVolumeInfo.getQuota().sizeInBytes(),
+                                                setQuota.sizeInBytes());
+
+    // Remove the quota and test it again
+    storageHandler.setVolumeQuota(createVolumeArgs, true);
+    getVolumeArgs = new VolumeArgs(volumeName, volUserArgs);
+    retVolumeInfo = storageHandler.getVolumeInfo(getVolumeArgs);
+    Assert.assertEquals(retVolumeInfo.getQuota().sizeInBytes(),
+        OzoneConsts.MAX_QUOTA_IN_BYTES);
+    Assert.assertEquals(0, ksmMetrics.getNumVolumeCreateFails());
+    Assert.assertEquals(0, ksmMetrics.getNumVolumeInfoFails());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ff857f6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/TestDistributedOzoneVolumes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/TestDistributedOzoneVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/TestDistributedOzoneVolumes.java
index 43175a9..afc9ee3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/TestDistributedOzoneVolumes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/TestDistributedOzoneVolumes.java
@@ -89,9 +89,7 @@ public class TestDistributedOzoneVolumes extends TestOzoneHelper {
   public void testCreateVolumes() throws IOException {
     super.testCreateVolumes(port);
     Assert.assertEquals(cluster.getKeySpaceManager()
-                          .getMetrics().getNumVolumeCreates(), 1);
-    Assert.assertEquals(cluster.getKeySpaceManager()
-                          .getMetrics().getNumVolumeCreateFails(), 0);
+        .getMetrics().getNumVolumeCreateFails(), 0);
   }
 
   /**
@@ -99,8 +97,11 @@ public class TestDistributedOzoneVolumes extends TestOzoneHelper {
    *
    * @throws IOException
    */
+  @Test
   public void testCreateVolumesWithQuota() throws IOException {
     super.testCreateVolumesWithQuota(port);
+    Assert.assertEquals(cluster.getKeySpaceManager()
+        .getMetrics().getNumVolumeCreateFails(), 0);
   }
 
   /**
@@ -108,8 +109,11 @@ public class TestDistributedOzoneVolumes extends TestOzoneHelper {
    *
    * @throws IOException
    */
+  @Test
   public void testCreateVolumesWithInvalidQuota() throws IOException {
     super.testCreateVolumesWithInvalidQuota(port);
+    Assert.assertEquals(cluster.getKeySpaceManager()
+        .getMetrics().getNumVolumeCreateFails(), 0);
   }
 
   /**
@@ -119,8 +123,11 @@ public class TestDistributedOzoneVolumes extends TestOzoneHelper {
    *
    * @throws IOException
    */
+  @Test
   public void testCreateVolumesWithInvalidUser() throws IOException {
     super.testCreateVolumesWithInvalidUser(port);
+    Assert.assertEquals(cluster.getKeySpaceManager()
+        .getMetrics().getNumVolumeCreateFails(), 0);
   }
 
   /**
@@ -131,8 +138,11 @@ public class TestDistributedOzoneVolumes extends TestOzoneHelper {
    *
    * @throws IOException
    */
+  @Test
   public void testCreateVolumesWithOutAdminRights() throws IOException {
     super.testCreateVolumesWithOutAdminRights(port);
+    Assert.assertEquals(cluster.getKeySpaceManager()
+        .getMetrics().getNumVolumeCreateFails(), 0);
   }
 
   /**
@@ -140,8 +150,11 @@ public class TestDistributedOzoneVolumes extends TestOzoneHelper {
    *
    * @throws IOException
    */
+  @Test
   public void testCreateVolumesInLoop() throws IOException {
     super.testCreateVolumesInLoop(port);
+    Assert.assertEquals(cluster.getKeySpaceManager()
+        .getMetrics().getNumVolumeCreateFails(), 0);
   }
   /**
    * Get volumes owned by the user.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org