You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/06/03 00:16:15 UTC

hadoop git commit: HDFS-11771. Ozone: KSM: Add checkVolumeAccess. Contributed by Mukul Kumar Singh.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 776d2d4de -> c7dd72e2f


HDFS-11771. Ozone: KSM: Add checkVolumeAccess. Contributed by Mukul Kumar Singh.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c7dd72e2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c7dd72e2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c7dd72e2

Branch: refs/heads/HDFS-7240
Commit: c7dd72e2fed532d8ffc4645c2dd569b05b2a72ed
Parents: 776d2d4
Author: Anu Engineer <ae...@apache.org>
Authored: Fri Jun 2 17:04:10 2017 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Fri Jun 2 17:04:10 2017 -0700

----------------------------------------------------------------------
 .../hadoop/ksm/helpers/KsmOzoneAclMap.java      | 110 +++++++++++++++++++
 .../hadoop/ksm/helpers/KsmVolumeArgs.java       |  42 +++++--
 .../ksm/protocol/KeySpaceManagerProtocol.java   |   9 +-
 ...ceManagerProtocolClientSideTranslatorPB.java |  29 ++++-
 .../main/proto/KeySpaceManagerProtocol.proto    |   7 +-
 .../apache/hadoop/ozone/ksm/KSMConfigKeys.java  |  12 ++
 .../org/apache/hadoop/ozone/ksm/KSMMetrics.java |  20 ++++
 .../hadoop/ozone/ksm/KeySpaceManager.java       |  18 ++-
 .../apache/hadoop/ozone/ksm/VolumeManager.java  |  13 +++
 .../hadoop/ozone/ksm/VolumeManagerImpl.java     |  43 +++++++-
 ...ceManagerProtocolServerSideTranslatorPB.java |  16 ++-
 .../hadoop/ozone/web/exceptions/ErrorTable.java |   8 +-
 .../hadoop/ozone/web/handlers/UserArgs.java     |  19 ++++
 .../hadoop/ozone/web/handlers/VolumeArgs.java   |   6 +-
 .../ozone/web/handlers/VolumeHandler.java       |   1 +
 .../ozone/web/interfaces/StorageHandler.java    |  13 ++-
 .../web/localstorage/LocalStorageHandler.java   |   8 +-
 .../web/localstorage/OzoneMetadataManager.java  |  14 ++-
 .../hadoop/ozone/web/request/OzoneAcl.java      |  15 ++-
 .../web/storage/DistributedStorageHandler.java  |  31 ++++--
 .../hadoop/ozone/ksm/TestKeySpaceManager.java   |  45 ++++++++
 21 files changed, 419 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7dd72e2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmOzoneAclMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmOzoneAclMap.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmOzoneAclMap.java
new file mode 100644
index 0000000..1c4c9cb
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmOzoneAclMap.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ksm.helpers;
+
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.OzoneAclInfo;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.OzoneAclInfo.OzoneAclRights;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.OzoneAclInfo.OzoneAclType;
+
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+/**
+ * This helper class keeps a map of all user and their permissions.
+ */
+public class KsmOzoneAclMap {
+  // per Acl Type user:rights map
+  private ArrayList<Map<String, OzoneAclRights>> aclMaps;
+
+  KsmOzoneAclMap() {
+    aclMaps = new ArrayList<>();
+    for (OzoneAclType aclType : OzoneAclType.values()) {
+      aclMaps.add(aclType.ordinal(), new HashMap<>());
+    }
+  }
+
+  private Map<String, OzoneAclRights> getMap(OzoneAclType type) {
+    return aclMaps.get(type.ordinal());
+  }
+
+  // For a given acl type and user, get the stored acl
+  private OzoneAclRights getAcl(OzoneAclType type, String user) {
+    return getMap(type).get(user);
+  }
+
+  // Add a new acl to the map
+  public void addAcl(OzoneAclInfo acl) {
+    getMap(acl.getType()).put(acl.getName(), acl.getRights());
+  }
+
+  // for a given acl, check if the user has access rights
+  public boolean hasAccess(OzoneAclInfo acl) {
+    OzoneAclRights storedRights = getAcl(acl.getType(), acl.getName());
+    if (storedRights != null) {
+      switch (acl.getRights()) {
+      case READ:
+        return (storedRights == OzoneAclRights.READ)
+            || (storedRights == OzoneAclRights.READ_WRITE);
+      case WRITE:
+        return (storedRights == OzoneAclRights.WRITE)
+            || (storedRights == OzoneAclRights.READ_WRITE);
+      case READ_WRITE:
+        return (storedRights == OzoneAclRights.READ_WRITE);
+      default:
+        return false;
+      }
+    } else {
+      return false;
+    }
+  }
+
+  // Convert this map to OzoneAclInfo Protobuf List
+  public List<OzoneAclInfo> ozoneAclGetProtobuf() {
+    List<OzoneAclInfo> aclList = new LinkedList<>();
+    for (OzoneAclType type: OzoneAclType.values()) {
+      for (Map.Entry<String, OzoneAclRights> entry :
+          aclMaps.get(type.ordinal()).entrySet()) {
+        OzoneAclInfo aclInfo = OzoneAclInfo.newBuilder()
+            .setName(entry.getKey())
+            .setType(type)
+            .setRights(entry.getValue())
+            .build();
+        aclList.add(aclInfo);
+      }
+    }
+
+    return aclList;
+  }
+
+  // Create map from list of OzoneAclInfos
+  public static KsmOzoneAclMap ozoneAclGetFromProtobuf(
+      List<OzoneAclInfo> aclList) {
+    KsmOzoneAclMap aclMap = new KsmOzoneAclMap();
+    for (OzoneAclInfo acl : aclList) {
+      aclMap.addAcl(acl);
+    }
+    return aclMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7dd72e2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmVolumeArgs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmVolumeArgs.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmVolumeArgs.java
index 359c2d5..7831a87 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmVolumeArgs.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmVolumeArgs.java
@@ -19,9 +19,12 @@ package org.apache.hadoop.ksm.helpers;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.OzoneAclInfo;
+import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.VolumeInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.KeyValue;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -38,6 +41,7 @@ public final class KsmVolumeArgs {
   private final String volume;
   private final long quotaInBytes;
   private final Map<String, String> keyValueMap;
+  private final KsmOzoneAclMap aclMap;
 
   /**
    * Private constructor, constructed via builder.
@@ -45,15 +49,18 @@ public final class KsmVolumeArgs {
    * @param ownerName  - Volume owner's name
    * @param volume - volume name
    * @param quotaInBytes - Volume Quota in bytes.
-   * @param  keyValueMap - keyValue map.
+   * @param keyValueMap - keyValue map.
+   * @param aclMap - User to access rights map.
    */
   private KsmVolumeArgs(String adminName, String ownerName, String volume,
-                        long quotaInBytes, Map<String, String> keyValueMap) {
+                        long quotaInBytes, Map<String, String> keyValueMap,
+                        KsmOzoneAclMap aclMap) {
     this.adminName = adminName;
     this.ownerName = ownerName;
     this.volume = volume;
     this.quotaInBytes = quotaInBytes;
     this.keyValueMap = keyValueMap;
+    this.aclMap = aclMap;
   }
 
   /**
@@ -92,6 +99,9 @@ public final class KsmVolumeArgs {
     return keyValueMap;
   }
 
+  public KsmOzoneAclMap getAclMap() {
+    return aclMap;
+  }
   /**
    * Returns new builder class that builds a KsmVolumeArgs.
    *
@@ -110,12 +120,14 @@ public final class KsmVolumeArgs {
     private String volume;
     private long quotaInBytes;
     private Map<String, String> keyValueMap;
+    private KsmOzoneAclMap aclMap;
 
     /**
      * Constructs a builder.
      */
     Builder() {
       keyValueMap = new HashMap<>();
+      aclMap = new KsmOzoneAclMap();
     }
 
     public Builder setAdminName(String adminName) {
@@ -143,6 +155,11 @@ public final class KsmVolumeArgs {
       return this;
     }
 
+    public Builder addOzoneAcls(OzoneAclInfo acl) throws IOException {
+      aclMap.addAcl(acl);
+      return this;
+    }
+
     /**
      * Constructs a CreateVolumeArgument.
      * @return CreateVolumeArgs.
@@ -152,31 +169,36 @@ public final class KsmVolumeArgs {
       Preconditions.checkNotNull(ownerName);
       Preconditions.checkNotNull(volume);
       return new KsmVolumeArgs(adminName, ownerName, volume, quotaInBytes,
-          keyValueMap);
+          keyValueMap, aclMap);
     }
   }
 
   public VolumeInfo getProtobuf() {
-    List<KeyValue> list = new LinkedList<>();
+    List<KeyValue> metadataList = new LinkedList<>();
     for (Map.Entry<String, String> entry : keyValueMap.entrySet()) {
-      list.add(KeyValue.newBuilder().setKey(entry.getKey()).
+      metadataList.add(KeyValue.newBuilder().setKey(entry.getKey()).
           setValue(entry.getValue()).build());
     }
+    List<OzoneAclInfo> aclList = aclMap.ozoneAclGetProtobuf();
 
     return VolumeInfo.newBuilder()
         .setAdminName(adminName)
         .setOwnerName(ownerName)
         .setVolume(volume)
         .setQuotaInBytes(quotaInBytes)
-        .addAllMetadata(list)
+        .addAllMetadata(metadataList)
+        .addAllVolumeAcls(aclList)
         .build();
   }
 
   public static KsmVolumeArgs getFromProtobuf(VolumeInfo volInfo) {
+    Map<String, String> kvMap = volInfo.getMetadataList().stream()
+        .collect(Collectors.toMap(KeyValue::getKey,
+            KeyValue::getValue));
+    KsmOzoneAclMap aclMap =
+        KsmOzoneAclMap.ozoneAclGetFromProtobuf(volInfo.getVolumeAclsList());
+
     return new KsmVolumeArgs(volInfo.getAdminName(), volInfo.getOwnerName(),
-        volInfo.getVolume(), volInfo.getQuotaInBytes(),
-        volInfo.getMetadataList().stream()
-            .collect(Collectors.toMap(KeyValue::getKey,
-                KeyValue::getValue)));
+        volInfo.getVolume(), volInfo.getQuotaInBytes(), kvMap, aclMap);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7dd72e2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeySpaceManagerProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeySpaceManagerProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeySpaceManagerProtocol.java
index 6efcb9e..6c743e2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeySpaceManagerProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeySpaceManagerProtocol.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
 import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
 import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
 import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.OzoneAclInfo;
 import java.io.IOException;
 import java.util.List;
 
@@ -56,10 +58,13 @@ public interface KeySpaceManagerProtocol {
   /**
    * Checks if the specified user can access this volume.
    * @param volume - volume
-   * @param userName - user name
+   * @param userAcl - user acls which needs to be checked for access
+   * @return true if the user has required access for the volume,
+   *         false otherwise
    * @throws IOException
    */
-  void checkVolumeAccess(String volume, String userName) throws IOException;
+  boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl)
+      throws IOException;
 
   /**
    * Gets the volume information.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7dd72e2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java
index edc9101..bf33b2b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java
@@ -67,9 +67,15 @@ import org.apache.hadoop.ozone.protocol.proto
 import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.InfoVolumeResponse;
 import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.CheckVolumeAccessRequest;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.CheckVolumeAccessResponse;
+import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.VolumeInfo;
 import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.Status;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.OzoneAclInfo;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -196,13 +202,32 @@ public final class KeySpaceManagerProtocolClientSideTranslatorPB
    * Checks if the specified user can access this volume.
    *
    * @param volume - volume
-   * @param userName - user name
+   * @param userAcl - user acls which needs to be checked for access
+   * @return true if the user has required access for the volume,
+   *         false otherwise
    * @throws IOException
    */
   @Override
-  public void checkVolumeAccess(String volume, String userName) throws
+  public boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl) throws
       IOException {
+    CheckVolumeAccessRequest.Builder req =
+        CheckVolumeAccessRequest.newBuilder();
+    req.setVolumeName(volume).setUserAcl(userAcl);
+    final CheckVolumeAccessResponse resp;
+    try {
+      resp = rpcProxy.checkVolumeAccess(NULL_RPC_CONTROLLER, req.build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
 
+    if (resp.getStatus() == Status.ACCESS_DENIED) {
+      return false;
+    } else if (resp.getStatus() == Status.OK) {
+      return true;
+    } else {
+      throw new
+          IOException("Check Volume Access failed, error:" + resp.getStatus());
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7dd72e2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto
index 8346518..0a6f7bc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto
@@ -61,6 +61,7 @@ message VolumeInfo {
     required string volume = 3;
     optional uint64 quotaInBytes = 4;
     repeated KeyValue metadata = 5;
+    repeated OzoneAclInfo volumeAcls = 6;
 }
 
 /**
@@ -93,11 +94,11 @@ message SetVolumePropertyResponse {
 }
 
 /**
-    Checks if a specified user has access to the volume.
-*/
+ * Checks if the user has specified permissions for the volume
+ */
 message CheckVolumeAccessRequest {
     required string volumeName = 1;
-    required string userName = 2;
+    required OzoneAclInfo userAcl = 2;
 }
 
 message CheckVolumeAccessResponse {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7dd72e2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMConfigKeys.java
index a773b17..3eb8a56 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMConfigKeys.java
@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.ozone.ksm;
 
+import org.apache.hadoop.ozone.web.request.OzoneAcl;
 /**
  * KSM Constants.
  */
@@ -46,4 +47,15 @@ public final class KSMConfigKeys {
   public static final String OZONE_KSM_USER_MAX_VOLUME =
       "ozone.ksm.user.max.volume";
   public static final int OZONE_KSM_USER_MAX_VOLUME_DEFAULT = 1024;
+
+  // KSM Default user/group permissions
+  public static final String OZONE_KSM_USER_RIGHTS =
+      "ozone.ksm.user.rights";
+  public static final OzoneAcl.OzoneACLRights OZONE_KSM_USER_RIGHTS_DEFAULT =
+      OzoneAcl.OzoneACLRights.READ_WRITE;
+
+  public static final String OZONE_KSM_GROUP_RIGHTS =
+      "ozone.ksm.group.rights";
+  public static final OzoneAcl.OzoneACLRights OZONE_KSM_GROUP_RIGHTS_DEFAULT =
+      OzoneAcl.OzoneACLRights.READ_WRITE;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7dd72e2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
index 88ffb1a..27a14ce 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
@@ -31,6 +31,7 @@ public class KSMMetrics {
   private @Metric MutableCounterLong numVolumeCreates;
   private @Metric MutableCounterLong numVolumeModifies;
   private @Metric MutableCounterLong numVolumeInfos;
+  private @Metric MutableCounterLong numVolumeCheckAccesses;
   private @Metric MutableCounterLong numBucketCreates;
   private @Metric MutableCounterLong numVolumeDeletes;
   private @Metric MutableCounterLong numBucketInfos;
@@ -44,6 +45,7 @@ public class KSMMetrics {
   private @Metric MutableCounterLong numVolumeInfoFails;
   private @Metric MutableCounterLong numVolumeDeleteFails;
   private @Metric MutableCounterLong numBucketCreateFails;
+  private @Metric MutableCounterLong numVolumeCheckAccessFails;
   private @Metric MutableCounterLong numBucketInfoFails;
   private @Metric MutableCounterLong numBucketModifyFails;
   private @Metric MutableCounterLong numKeyAllocateFails;
@@ -75,6 +77,10 @@ public class KSMMetrics {
     numVolumeDeletes.incr();
   }
 
+  public void incNumVolumeCheckAccesses() {
+    numVolumeCheckAccesses.incr();
+  }
+
   public void incNumBucketCreates() {
     numBucketCreates.incr();
   }
@@ -103,6 +109,10 @@ public class KSMMetrics {
     numVolumeDeleteFails.incr();
   }
 
+  public void incNumVolumeCheckAccessFails() {
+    numVolumeCheckAccessFails.incr();
+  }
+
   public void incNumBucketCreateFails() {
     numBucketCreateFails.incr();
   }
@@ -152,6 +162,11 @@ public class KSMMetrics {
   }
 
   @VisibleForTesting
+  public long getNumVolumeCheckAccesses() {
+    return numVolumeCheckAccesses.value();
+  }
+
+  @VisibleForTesting
   public long getNumBucketCreates() {
     return numBucketCreates.value();
   }
@@ -187,6 +202,11 @@ public class KSMMetrics {
   }
 
   @VisibleForTesting
+  public long getNumVolumeCheckAccessFails() {
+    return numVolumeCheckAccessFails.value();
+  }
+
+  @VisibleForTesting
   public long getNumBucketCreateFails() {
     return numBucketCreateFails.value();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7dd72e2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
index 8589e4b..fb991bd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
@@ -33,6 +33,8 @@ import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.ozone.OzoneClientUtils;
 import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.OzoneAclInfo;
 import org.apache.hadoop.ozone.protocolPB
     .KeySpaceManagerProtocolServerSideTranslatorPB;
 import org.apache.hadoop.ozone.scm.StorageContainerManager;
@@ -300,13 +302,21 @@ public class KeySpaceManager implements KeySpaceManagerProtocol {
    * Checks if the specified user can access this volume.
    *
    * @param volume - volume
-   * @param userName - user name
+   * @param userAcl - user acls which needs to be checked for access
+   * @return true if the user has required access for the volume,
+   *         false otherwise
    * @throws IOException
    */
   @Override
-  public void checkVolumeAccess(String volume, String userName) throws
-      IOException {
-
+  public boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl)
+      throws IOException {
+    try {
+      metrics.incNumVolumeCheckAccesses();
+      return volumeManager.checkVolumeAccess(volume, userAcl);
+    } catch (Exception ex) {
+      metrics.incNumVolumeCheckAccessFails();
+      throw ex;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7dd72e2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManager.java
index 6f3339f..d5f4565 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManager.java
@@ -17,6 +17,8 @@
 package org.apache.hadoop.ozone.ksm;
 
 import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.OzoneAclInfo;
 
 import java.io.IOException;
 
@@ -64,4 +66,15 @@ public interface VolumeManager {
    * @throws IOException
    */
   void deleteVolume(String volume) throws IOException;
+
+  /**
+   * Checks if the specified user with a role can access this volume.
+   *
+   * @param volume - volume
+   * @param userAcl - user acl which needs to be checked for access
+   * @return true if the user has access for the volume, false otherwise
+   * @throws IOException
+   */
+  boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl)
+      throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7dd72e2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
index a7c080e..52a7523 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
@@ -21,6 +21,8 @@ import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
 import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
 import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.OzoneAclInfo;
+import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.VolumeList;
 import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.VolumeInfo;
@@ -179,7 +181,7 @@ public class VolumeManagerImpl implements VolumeManager {
 
       VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo);
       KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(volumeInfo);
-      Preconditions.checkState(volume.equalsIgnoreCase(volumeInfo.getVolume()));
+      Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
 
       delVolumeFromOwnerList(volume, volumeArgs.getOwnerName(),
           putBatch, deleteBatch);
@@ -224,7 +226,7 @@ public class VolumeManagerImpl implements VolumeManager {
 
       VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo);
       KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(volumeInfo);
-      Preconditions.checkState(volume.equalsIgnoreCase(volumeInfo.getVolume()));
+      Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
 
       KsmVolumeArgs newVolumeArgs =
           KsmVolumeArgs.newBuilder().setVolume(volumeArgs.getVolume())
@@ -262,7 +264,7 @@ public class VolumeManagerImpl implements VolumeManager {
 
       VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo);
       KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(volumeInfo);
-      Preconditions.checkState(volume.equalsIgnoreCase(volumeInfo.getVolume()));
+      Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
       return volumeArgs;
     } catch (IOException ex) {
       LOG.error("Info volume failed for volume:{}", volume, ex);
@@ -296,7 +298,7 @@ public class VolumeManagerImpl implements VolumeManager {
       }
 
       VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo);
-      Preconditions.checkState(volume.equalsIgnoreCase(volumeInfo.getVolume()));
+      Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
       // delete the volume from the owner list
       // as well as delete the volume entry
       delVolumeFromOwnerList(volume, volumeInfo.getOwnerName(),
@@ -310,4 +312,37 @@ public class VolumeManagerImpl implements VolumeManager {
       metadataManager.writeLock().unlock();
     }
   }
+
+  /**
+   * Checks if the specified user with a role can access this volume.
+   *
+   * @param volume - volume
+   * @param userAcl - user acl which needs to be checked for access
+   * @return true if the user has access for the volume, false otherwise
+   * @throws IOException
+   */
+  public boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl)
+      throws IOException {
+    Preconditions.checkNotNull(volume);
+    Preconditions.checkNotNull(userAcl);
+    metadataManager.readLock().lock();
+    try {
+      byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
+      byte[] volInfo = metadataManager.get(dbVolumeKey);
+      if (volInfo == null) {
+        throw  new KSMException(ResultCodes.FAILED_VOLUME_NOT_FOUND);
+      }
+
+      VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo);
+      KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(volumeInfo);
+      Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
+      return volumeArgs.getAclMap().hasAccess(userAcl);
+    } catch (IOException ex) {
+      LOG.error("Check volume access failed for volume:{} user:{} rights:{}",
+          volume, userAcl.getName(), userAcl.getRights(), ex);
+      throw ex;
+    } finally {
+      metadataManager.readLock().unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7dd72e2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java
index 57dd9fe..947378c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java
@@ -165,7 +165,21 @@ public class KeySpaceManagerProtocolServerSideTranslatorPB implements
   public CheckVolumeAccessResponse checkVolumeAccess(
       RpcController controller, CheckVolumeAccessRequest request)
       throws ServiceException {
-    return null;
+    CheckVolumeAccessResponse.Builder resp =
+        CheckVolumeAccessResponse.newBuilder();
+    resp.setStatus(Status.OK);
+    try {
+      boolean access = impl.checkVolumeAccess(request.getVolumeName(),
+          request.getUserAcl());
+      // if no access, set the response status as access denied
+      if (!access) {
+        resp.setStatus(Status.ACCESS_DENIED);
+      }
+    } catch (IOException e) {
+      resp.setStatus(exceptionToResponseStatus(e));
+    }
+
+    return resp.build();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7dd72e2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/exceptions/ErrorTable.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/exceptions/ErrorTable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/exceptions/ErrorTable.java
index b6a5084..330b1c2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/exceptions/ErrorTable.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/exceptions/ErrorTable.java
@@ -183,9 +183,11 @@ public final class ErrorTable {
     OzoneException err =
         new OzoneException(e.getHttpCode(), e.getShortMessage(),
                            e.getMessage());
-    err.setRequestId(args.getRequestID());
-    err.setResource(args.getResourceName());
-    err.setHostID(args.getHostName());
+    if (args != null) {
+      err.setRequestId(args.getRequestID());
+      err.setResource(args.getResourceName());
+      err.setHostID(args.getHostName());
+    }
     return err;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7dd72e2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/handlers/UserArgs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/handlers/UserArgs.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/handlers/UserArgs.java
index b83303e..07856d0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/handlers/UserArgs.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/handlers/UserArgs.java
@@ -35,6 +35,7 @@ public class UserArgs {
   private final UriInfo uri;
   private final Request request;
   private final HttpHeaders headers;
+  private String[] groups;
 
 
   /**
@@ -112,6 +113,24 @@ public class UserArgs {
   }
 
   /**
+   * Returns list of groups.
+   *
+   * @return String[]
+   */
+  public String[] getGroups() {
+    return this.groups;
+  }
+
+  /**
+   * Sets the group list.
+   *
+   * @param groups list of groups
+   */
+  public void setGroups(String[] groups) {
+    this.groups = groups;
+  }
+
+  /**
    * Returns the resource Name.
    *
    * @return String Resource.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7dd72e2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/handlers/VolumeArgs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/handlers/VolumeArgs.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/handlers/VolumeArgs.java
index def1f0e..1d67c67 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/handlers/VolumeArgs.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/handlers/VolumeArgs.java
@@ -64,11 +64,13 @@ public class VolumeArgs extends UserArgs {
    * @param request  - Http Request
    * @param info - URI info
    * @param headers - http headers
+   * @param groups - list of groups allowed to access the volume
    */
   public VolumeArgs(String userName, String volumeName, String requestID,
                     String hostName, Request request, UriInfo info,
-                    HttpHeaders headers) {
+                    HttpHeaders headers, String[] groups) {
     super(userName, requestID, hostName, request, info, headers);
+    super.setGroups(groups);
     this.volumeName = volumeName;
   }
 
@@ -81,7 +83,7 @@ public class VolumeArgs extends UserArgs {
   public VolumeArgs(String volumeName, UserArgs userArgs) {
     this(userArgs.getUserName(), volumeName, userArgs.getRequestID(),
          userArgs.getHostName(), userArgs.getRequest(), userArgs.getUri(),
-         userArgs.getHeaders());
+         userArgs.getHeaders(), userArgs.getGroups());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7dd72e2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/handlers/VolumeHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/handlers/VolumeHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/handlers/VolumeHandler.java
index afbfddb..48d024c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/handlers/VolumeHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/handlers/VolumeHandler.java
@@ -92,6 +92,7 @@ public class VolumeHandler implements Volume {
           }
 
           args.setUserName(volumeOwner);
+          args.setGroups(auth.getGroups(args));
           if (!quota.equals(Header.OZONE_QUOTA_UNDEFINED)) {
             setQuotaArgs(args, quota);
           }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7dd72e2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/interfaces/StorageHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/interfaces/StorageHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/interfaces/StorageHandler.java
index 90bc251..3999f91 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/interfaces/StorageHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/interfaces/StorageHandler.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.ozone.web.handlers.BucketArgs;
 import org.apache.hadoop.ozone.web.handlers.KeyArgs;
 import org.apache.hadoop.ozone.web.handlers.ListArgs;
 import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
+import org.apache.hadoop.ozone.web.request.OzoneAcl;
 import org.apache.hadoop.ozone.web.response.BucketInfo;
 import org.apache.hadoop.ozone.web.response.ListBuckets;
 import org.apache.hadoop.ozone.web.response.ListKeys;
@@ -80,18 +81,20 @@ public interface StorageHandler {
       throws IOException, OzoneException;
 
   /**
-   * Checks if a Volume exists and the user specified has access to the
-   * Volume.
+   * Checks if a Volume exists and the user with a role specified has access
+   * to the Volume.
    *
-   * @param args - Volume Args
+   * @param volume - Volume Name whose access permissions needs to be checked
+   * @param acl - requested acls which needs to be checked for access
    *
-   * @return - Boolean - True if the user can modify the volume.
+   * @return - Boolean - True if the user with a role can access the volume.
    * This is possible for owners of the volume and admin users
    *
    * @throws IOException
    * @throws OzoneException
    */
-  boolean checkVolumeAccess(VolumeArgs args) throws IOException, OzoneException;
+  boolean checkVolumeAccess(String volume, OzoneAcl acl)
+      throws IOException, OzoneException;
 
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7dd72e2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/LocalStorageHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/LocalStorageHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/LocalStorageHandler.java
index ae9aa69..36ad9b0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/LocalStorageHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/LocalStorageHandler.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.ozone.web.handlers.KeyArgs;
 import org.apache.hadoop.ozone.web.handlers.ListArgs;
 import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
 import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
+import org.apache.hadoop.ozone.web.request.OzoneAcl;
 import org.apache.hadoop.ozone.web.request.OzoneQuota;
 import org.apache.hadoop.ozone.web.response.BucketInfo;
 import org.apache.hadoop.ozone.web.response.ListBuckets;
@@ -106,17 +107,18 @@ public class LocalStorageHandler implements StorageHandler {
   /**
    * Checks if a Volume exists and the user specified has access to the volume.
    *
-   * @param args - volumeArgs
+   * @param volume - Volume Name
+   * @param acl - Ozone acl which needs to be compared for access
    * @return - Boolean - True if the user can modify the volume. This is
    * possible for owners of the volume and admin users
    * @throws IOException
    */
   @Override
-  public boolean checkVolumeAccess(VolumeArgs args)
+  public boolean checkVolumeAccess(String volume, OzoneAcl acl)
       throws IOException, OzoneException {
     OzoneMetadataManager oz =
         OzoneMetadataManager.getOzoneMetadataManager(conf);
-    return oz.checkVolumeAccess(args);
+    return oz.checkVolumeAccess(volume, acl);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7dd72e2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/OzoneMetadataManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/OzoneMetadataManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/OzoneMetadataManager.java
index c63707c..fed77f1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/OzoneMetadataManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/OzoneMetadataManager.java
@@ -324,23 +324,25 @@ public final class OzoneMetadataManager {
   /**
    * Checks if you are the owner of a specific volume.
    *
-   * @param args - VolumeArgs
+   * @param volume - Volume Name whose access permissions needs to be checked
+   * @param acl - requested acls which needs to be checked for access
    * @return - True if you are the owner, false otherwise
    * @throws OzoneException
    */
-  public boolean checkVolumeAccess(VolumeArgs args) throws OzoneException {
+  public boolean checkVolumeAccess(String volume, OzoneAcl acl)
+      throws OzoneException {
     lock.readLock().lock();
     try {
       byte[] volumeInfo =
-          metadataDB.get(args.getVolumeName().getBytes(encoding));
+          metadataDB.get(volume.getBytes(encoding));
       if (volumeInfo == null) {
-        throw ErrorTable.newError(ErrorTable.VOLUME_NOT_FOUND, args);
+        throw ErrorTable.newError(ErrorTable.VOLUME_NOT_FOUND, null);
       }
 
       VolumeInfo info = VolumeInfo.parse(new String(volumeInfo, encoding));
-      return info.getOwner().getName().equals(args.getUserName());
+      return info.getOwner().getName().equals(acl.getName());
     } catch (IOException | DBException ex) {
-      throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex);
+      throw ErrorTable.newError(ErrorTable.SERVER_ERROR, null, ex);
     } finally {
       lock.readLock().unlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7dd72e2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/request/OzoneAcl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/request/OzoneAcl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/request/OzoneAcl.java
index 92c7f99..fe7532e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/request/OzoneAcl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/request/OzoneAcl.java
@@ -53,6 +53,13 @@ public class OzoneAcl {
     this.name = name;
     this.rights = rights;
     this.type = type;
+    if (type == OzoneACLType.WORLD && name.length() != 0) {
+      throw new IllegalArgumentException("Unexpected name part in world type");
+    }
+    if (((type == OzoneACLType.USER) || (type == OzoneACLType.GROUP))
+        && (name.length() == 0)) {
+      throw new IllegalArgumentException("User or group name is required");
+    }
   }
 
   /**
@@ -74,14 +81,6 @@ public class OzoneAcl {
     OzoneACLType aclType = OzoneACLType.valueOf(parts[0].toUpperCase());
     OzoneACLRights rights = OzoneACLRights.getACLRight(parts[2].toLowerCase());
 
-    if (((aclType == OzoneACLType.USER) || (aclType == OzoneACLType.GROUP))
-        && (parts[1].length() == 0)) {
-      throw new IllegalArgumentException("User or group name is required");
-    }
-
-    if ((aclType == OzoneACLType.WORLD) && (parts[1].length() != 0)) {
-      throw new IllegalArgumentException("Unexpected name part in world type");
-    }
     // TODO : Support sanitation of these user names by calling into
     // userAuth Interface.
     return new OzoneAcl(aclType, parts[1], rights);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7dd72e2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
index ab2363e..b52e22e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.OzoneConsts.Versioning;
 import org.apache.hadoop.ozone.protocolPB.KSMPBHelper;
+import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
 import org.apache.hadoop.ozone.web.request.OzoneAcl;
 import org.apache.hadoop.ozone.web.request.OzoneQuota;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
@@ -92,7 +93,8 @@ public final class DistributedStorageHandler implements StorageHandler {
   private final KeySpaceManagerProtocolClientSideTranslatorPB
       keySpaceManagerClient;
   private final XceiverClientManager xceiverClientManager;
-
+  private final OzoneAcl.OzoneACLRights userRights;
+  private final OzoneAcl.OzoneACLRights groupRights;
   private int chunkSize;
 
   /**
@@ -113,6 +115,10 @@ public final class DistributedStorageHandler implements StorageHandler {
 
     chunkSize = conf.getInt(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
         ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_DEFAULT);
+    userRights = conf.getEnum(KSMConfigKeys.OZONE_KSM_USER_RIGHTS,
+        KSMConfigKeys.OZONE_KSM_USER_RIGHTS_DEFAULT);
+    groupRights = conf.getEnum(KSMConfigKeys.OZONE_KSM_GROUP_RIGHTS,
+        KSMConfigKeys.OZONE_KSM_GROUP_RIGHTS_DEFAULT);
     if(chunkSize > ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE) {
       LOG.warn("The chunk size ({}) is not allowed to be more than"
           + " the maximum size ({}),"
@@ -126,13 +132,23 @@ public final class DistributedStorageHandler implements StorageHandler {
   public void createVolume(VolumeArgs args) throws IOException, OzoneException {
     long quota = args.getQuota() == null ?
         OzoneConsts.MAX_QUOTA_IN_BYTES : args.getQuota().sizeInBytes();
-    KsmVolumeArgs volumeArgs = KsmVolumeArgs.newBuilder()
-        .setAdminName(args.getAdminName())
+    OzoneAcl userAcl =
+        new OzoneAcl(OzoneAcl.OzoneACLType.USER,
+            args.getUserName(), userRights);
+    KsmVolumeArgs.Builder builder = KsmVolumeArgs.newBuilder();
+    builder.setAdminName(args.getAdminName())
         .setOwnerName(args.getUserName())
         .setVolume(args.getVolumeName())
         .setQuotaInBytes(quota)
-        .build();
-    keySpaceManagerClient.createVolume(volumeArgs);
+        .addOzoneAcls(KSMPBHelper.convertOzoneAcl(userAcl));
+    if (args.getGroups() != null) {
+      for (String group : args.getGroups()) {
+        OzoneAcl groupAcl =
+            new OzoneAcl(OzoneAcl.OzoneACLType.GROUP, group, groupRights);
+        builder.addOzoneAcls(KSMPBHelper.convertOzoneAcl(groupAcl));
+      }
+    }
+    keySpaceManagerClient.createVolume(builder.build());
   }
 
   @Override
@@ -150,9 +166,10 @@ public final class DistributedStorageHandler implements StorageHandler {
   }
 
   @Override
-  public boolean checkVolumeAccess(VolumeArgs args)
+  public boolean checkVolumeAccess(String volume, OzoneAcl acl)
       throws IOException, OzoneException {
-    throw new UnsupportedOperationException("checkVolumeAccessnot implemented");
+    return keySpaceManagerClient
+        .checkVolumeAccess(volume, KSMPBHelper.convertOzoneAcl(acl));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7dd72e2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
index 8ef2b0e..b9cf355 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
@@ -258,6 +258,51 @@ public class TestKeySpaceManager {
     Assert.assertTrue(retVolumeInfo.getOwner().getName().equals(userName));
   }
 
+  // Create a volume and test Volume access for a different user
+  @Test(timeout = 60000)
+  public void testAccessVolume() throws IOException, OzoneException {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String[] groupName =
+        {"group" + RandomStringUtils.randomNumeric(5)};
+
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+    createVolumeArgs.setGroups(groupName);
+    storageHandler.createVolume(createVolumeArgs);
+
+    OzoneAcl userAcl = new OzoneAcl(OzoneAcl.OzoneACLType.USER, userName,
+        OzoneAcl.OzoneACLRights.READ_WRITE);
+    Assert.assertTrue(storageHandler.checkVolumeAccess(volumeName, userAcl));
+    OzoneAcl group = new OzoneAcl(OzoneAcl.OzoneACLType.GROUP, groupName[0],
+        OzoneAcl.OzoneACLRights.READ);
+    Assert.assertTrue(storageHandler.checkVolumeAccess(volumeName, group));
+
+    // Create a different user and access should fail
+    String falseUserName = "user" + RandomStringUtils.randomNumeric(5);
+    OzoneAcl falseUserAcl =
+        new OzoneAcl(OzoneAcl.OzoneACLType.USER, falseUserName,
+            OzoneAcl.OzoneACLRights.READ_WRITE);
+    Assert.assertFalse(storageHandler
+        .checkVolumeAccess(volumeName, falseUserAcl));
+    // Checking access with user name and Group Type should fail
+    OzoneAcl falseGroupAcl = new OzoneAcl(OzoneAcl.OzoneACLType.GROUP, userName,
+        OzoneAcl.OzoneACLRights.READ_WRITE);
+    Assert.assertFalse(storageHandler
+        .checkVolumeAccess(volumeName, falseGroupAcl));
+
+    // Access for acl type world should also fail
+    OzoneAcl worldAcl =
+        new OzoneAcl(OzoneAcl.OzoneACLType.WORLD, "",
+            OzoneAcl.OzoneACLRights.READ);
+    Assert.assertFalse(storageHandler.checkVolumeAccess(volumeName, worldAcl));
+
+    Assert.assertEquals(0, ksmMetrics.getNumVolumeCheckAccessFails());
+    Assert.assertEquals(0, ksmMetrics.getNumVolumeCreateFails());
+  }
+
   @Test(timeout = 60000)
   public void testCreateBucket() throws IOException, OzoneException {
     String userName = "user" + RandomStringUtils.randomNumeric(5);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org