You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2019/06/12 23:25:39 UTC

[hadoop] branch trunk updated: HDDS-1543. Implement addAcl,removeAcl,setAcl,getAcl for Prefix. Contr… (#927)

This is an automated email from the ASF dual-hosted git repository.

xyao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a43f444  HDDS-1543. Implement addAcl,removeAcl,setAcl,getAcl for Prefix. Contr… (#927)
a43f444 is described below

commit a43f4440f77ab7a8ad7adcb67c7ab6222458a692
Author: Xiaoyu Yao <xy...@apache.org>
AuthorDate: Wed Jun 12 16:25:31 2019 -0700

    HDDS-1543. Implement addAcl,removeAcl,setAcl,getAcl for Prefix. Contr… (#927)
---
 .../apache/hadoop/ozone/om/OzoneManagerLock.java   |  25 ++
 .../hadoop/ozone/om/exceptions/OMException.java    |   5 +-
 .../apache/hadoop/ozone/security/acl/OzoneObj.java |  16 +-
 .../hadoop/ozone/security/acl/OzoneObjInfo.java    |  65 ++++-
 .../org/apache/hadoop/ozone/util/RadixTree.java    |  12 +-
 .../src/main/proto/OzoneManagerProtocol.proto      |  19 +-
 .../apache/hadoop/ozone/util/TestRadixTree.java    |   2 -
 .../client/rpc/TestOzoneRpcClientAbstract.java     |  64 ++++-
 .../org/apache/hadoop/ozone/om/KeyManager.java     |  41 ---
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  14 +-
 .../org/apache/hadoop/ozone/om/PrefixManager.java  |  45 +++
 .../apache/hadoop/ozone/om/PrefixManagerImpl.java  | 316 +++++++++++++++++++++
 .../apache/hadoop/ozone/om/fs/OzoneManagerFS.java  |   3 +-
 .../apache/hadoop/ozone/om/TestKeyManagerImpl.java | 237 +++++++++++++++-
 14 files changed, 786 insertions(+), 78 deletions(-)

diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerLock.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerLock.java
index 0e36898..c569c09 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerLock.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerLock.java
@@ -46,6 +46,9 @@ import static org.apache.hadoop.ozone.OzoneConsts.OM_USER_PREFIX;
  *   <tr>
  *     <td> 2 </td> <td> Bucket Lock </td>
  *   </tr>
+ *   <tr>
+ *     <td> 3 </td> <td> Prefix Lock </td>
+ *    </tr>
  * </table>
  *
  * One cannot obtain a lower weight lock while holding a lock with higher
@@ -66,6 +69,7 @@ public final class OzoneManagerLock {
 
   private static final String VOLUME_LOCK = "volumeLock";
   private static final String BUCKET_LOCK = "bucketLock";
+  private static final String PREFIX_LOCK = "prefixLock";
   private static final String S3_BUCKET_LOCK = "s3BucketLock";
   private static final String S3_SECRET_LOCK = "s3SecretetLock";
 
@@ -77,6 +81,7 @@ public final class OzoneManagerLock {
           () -> ImmutableMap.of(
               VOLUME_LOCK, new AtomicInteger(0),
               BUCKET_LOCK, new AtomicInteger(0),
+              PREFIX_LOCK, new AtomicInteger(0),
               S3_BUCKET_LOCK, new AtomicInteger(0),
               S3_SECRET_LOCK, new AtomicInteger(0)
           )
@@ -241,4 +246,24 @@ public final class OzoneManagerLock {
     manager.unlock(awsAccessId);
     myLocks.get().get(S3_SECRET_LOCK).decrementAndGet();
   }
+
+  public void acquirePrefixLock(String prefixPath) {
+    if (hasAnyPrefixLock()) {
+      throw new RuntimeException(
+          "Thread '" + Thread.currentThread().getName() +
+              "' cannot acquire prefix path lock while holding prefix " +
+              "path lock(s) for path: " + prefixPath + ".");
+    }
+    manager.lock(prefixPath);
+    myLocks.get().get(PREFIX_LOCK).incrementAndGet();
+  }
+
+  private boolean hasAnyPrefixLock() {
+    return myLocks.get().get(PREFIX_LOCK).get() != 0;
+  }
+
+  public void releasePrefixLock(String prefixPath) {
+    manager.unlock(prefixPath);
+    myLocks.get().get(PREFIX_LOCK).decrementAndGet();
+  }
 }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
index d3925f3..2ee88d8 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
@@ -199,6 +199,9 @@ public class OMException extends IOException {
 
     PERMISSION_DENIED, // Error codes used during acl validation
 
-    TIMEOUT // Error codes used during acl validation
+    TIMEOUT, // Error codes used during acl validation
+
+    PREFIX_NOT_FOUND,
+
   }
 }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneObj.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneObj.java
index 74d0aa5..6e9ac25 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneObj.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneObj.java
@@ -71,6 +71,19 @@ public abstract class OzoneObj implements IOzoneObj {
 
   public abstract String getKeyName();
 
+  /**
+   * Get PrefixName.
+   * A prefix name is like a key name under the bucket but
+   * are mainly used for ACL for now and persisted into a separate prefix table.
+   *
+   * @return prefix name.
+   */
+  public abstract String getPrefixName();
+
+  /**
+   * Get full path of a key or prefix including volume and bucket.
+   * @return full path of a key or prefix.
+   */
   public abstract String getPath();
 
   /**
@@ -79,7 +92,8 @@ public abstract class OzoneObj implements IOzoneObj {
   public enum ResourceType {
     VOLUME(OzoneConsts.VOLUME),
     BUCKET(OzoneConsts.BUCKET),
-    KEY(OzoneConsts.KEY);
+    KEY(OzoneConsts.KEY),
+    PREFIX(OzoneConsts.PREFIX);
 
     /**
      * String value for this Enum.
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneObjInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneObjInfo.java
index 537134a..a45a156 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneObjInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneObjInfo.java
@@ -23,32 +23,51 @@ import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
 
 /**
  * Class representing an ozone object.
+ * It can be a volume with non-null volumeName (bucketName=null & name=null)
+ * or a bucket with non-null volumeName and bucketName (name=null)
+ * or a key with non-null volumeName, bucketName and key name
+ * (via getKeyName)
+ * or a prefix with non-null volumeName, bucketName and prefix name
+ * (via getPrefixName)
  */
 public final class OzoneObjInfo extends OzoneObj {
 
   private final String volumeName;
   private final String bucketName;
-  private final String keyName;
-
+  private final String name;
 
+  /**
+   *
+   * @param resType
+   * @param storeType
+   * @param volumeName
+   * @param bucketName
+   * @param name - keyName/PrefixName
+   */
   private OzoneObjInfo(ResourceType resType, StoreType storeType,
-      String volumeName, String bucketName, String keyName) {
+      String volumeName, String bucketName, String name) {
     super(resType, storeType);
     this.volumeName = volumeName;
     this.bucketName = bucketName;
-    this.keyName = keyName;
+    this.name = name;
   }
 
   @Override
   public String getPath() {
     switch (getResourceType()) {
     case VOLUME:
-      return getVolumeName();
+      return OZONE_URI_DELIMITER + getVolumeName();
     case BUCKET:
-      return getVolumeName() + OZONE_URI_DELIMITER + getBucketName();
+      return OZONE_URI_DELIMITER + getVolumeName()
+          + OZONE_URI_DELIMITER + getBucketName();
     case KEY:
-      return getVolumeName() + OZONE_URI_DELIMITER + getBucketName()
+      return OZONE_URI_DELIMITER + getVolumeName()
+          + OZONE_URI_DELIMITER + getBucketName()
           + OZONE_URI_DELIMITER + getKeyName();
+    case PREFIX:
+      return OZONE_URI_DELIMITER + getVolumeName()
+          + OZONE_URI_DELIMITER + getBucketName()
+          + OZONE_URI_DELIMITER + getPrefixName();
     default:
       throw new IllegalArgumentException("Unknown resource " +
         "type" + getResourceType());
@@ -67,9 +86,15 @@ public final class OzoneObjInfo extends OzoneObj {
 
   @Override
   public String getKeyName() {
-    return keyName;
+    return name;
   }
 
+  @Override
+  public String getPrefixName() {
+    return name;
+  }
+
+
   public static OzoneObjInfo fromProtobuf(OzoneManagerProtocolProtos.OzoneObj
       proto) {
     Builder builder = new Builder()
@@ -88,7 +113,7 @@ public final class OzoneObjInfo extends OzoneObj {
     case BUCKET:
       if (tokens.length < 2) {
         throw new IllegalArgumentException("Unexpected argument for " +
-            "Ozone key. Path:" + proto.getPath());
+            "Ozone bucket. Path:" + proto.getPath());
       }
       builder.setVolumeName(tokens[0]);
       builder.setBucketName(tokens[1]);
@@ -102,6 +127,15 @@ public final class OzoneObjInfo extends OzoneObj {
       builder.setBucketName(tokens[1]);
       builder.setKeyName(tokens[2]);
       break;
+    case PREFIX:
+      if (tokens.length < 3) {
+        throw new IllegalArgumentException("Unexpected argument for " +
+            "Ozone Prefix. Path:" + proto.getPath());
+      }
+      builder.setVolumeName(tokens[0]);
+      builder.setBucketName(tokens[1]);
+      builder.setPrefixName(tokens[2]);
+      break;
     default:
       throw new IllegalArgumentException("Unexpected type for " +
           "Ozone key. Type:" + proto.getResType());
@@ -118,7 +152,7 @@ public final class OzoneObjInfo extends OzoneObj {
     private OzoneObj.StoreType storeType;
     private String volumeName;
     private String bucketName;
-    private String keyName;
+    private String name;
 
     public static Builder newBuilder() {
       return new Builder();
@@ -145,14 +179,17 @@ public final class OzoneObjInfo extends OzoneObj {
     }
 
     public Builder setKeyName(String key) {
-      this.keyName = key;
+      this.name = key;
+      return this;
+    }
+
+    public Builder setPrefixName(String prefix) {
+      this.name = prefix;
       return this;
     }
 
     public OzoneObjInfo build() {
-      return new OzoneObjInfo(resType, storeType, volumeName, bucketName,
-          keyName);
+      return new OzoneObjInfo(resType, storeType, volumeName, bucketName, name);
     }
   }
-
 }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/util/RadixTree.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/util/RadixTree.java
index 72e9ab3..597f58d 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/util/RadixTree.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/util/RadixTree.java
@@ -202,9 +202,15 @@ public class RadixTree<T> {
         break;
       }
     }
-    return level >= 1 ?
-        Paths.get(root.getName()).resolve(p.subpath(0, level)).toString() :
-        root.getName();
+
+    if (level >= 1) {
+      Path longestMatch =
+          Paths.get(root.getName()).resolve(p.subpath(0, level));
+      String ret = longestMatch.toString();
+      return path.endsWith("/") ?  ret + "/" : ret;
+    } else {
+      return root.getName();
+    }
   }
 
   // root of a radix tree has a name of "/" and may optionally has it value.
diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
index 21cacf6..2c4766a 100644
--- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
+++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
@@ -276,6 +276,7 @@ enum Status {
     NOT_A_FILE = 47;
     PERMISSION_DENIED = 48;
     TIMEOUT = 49;
+    PREFIX_NOT_FOUND=50;
 }
 
 
@@ -507,15 +508,15 @@ message OzoneAclInfo {
     }
 
     enum OzoneAclRights {
-      READ = 1;
-      WRITE = 2;
-      CREATE = 3;
-      LIST = 4;
-      DELETE = 5;
-      READ_ACL = 6;
-      WRITE_ACL  = 7;
-      ALL = 8;
-      NONE = 9;
+        READ = 1;
+        WRITE = 2;
+        CREATE = 3;
+        LIST = 4;
+        DELETE = 5;
+        READ_ACL = 6;
+        WRITE_ACL = 7;
+        ALL = 8;
+        NONE = 9;
     }
     required OzoneAclType type = 1;
     required string name = 2;
diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/util/TestRadixTree.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/util/TestRadixTree.java
index ceed534..57b0268 100644
--- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/util/TestRadixTree.java
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/util/TestRadixTree.java
@@ -84,7 +84,6 @@ public class TestRadixTree {
     assertEquals("g", lpn.getName());
     lpn.setValue(100);
 
-
     List<RadixNode<Integer>> lpq =
         ROOT.getLongestPrefixPath("/a/b/c/d/g/q");
     RadixNode<Integer> lqn = lpp.get(lpq.size()-1);
@@ -93,7 +92,6 @@ public class TestRadixTree {
     assertEquals("g", lqn.getName());
     assertEquals(100, (int)lqn.getValue());
 
-
     assertEquals("/a/", RadixTree.radixPathToString(
         ROOT.getLongestPrefixPath("/a/g")));
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index b0f7888..d24b3da 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -2200,6 +2201,66 @@ public abstract class TestOzoneRpcClientAbstract {
     validateOzoneAcl(ozObj);
   }
 
+  @Test
+  public void testNativeAclsForPrefix() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+
+    String prefix1 = "PF" + UUID.randomUUID().toString() + "/";
+    String key1 = prefix1 + "KEY" + UUID.randomUUID().toString();
+
+    String prefix2 = "PF" + UUID.randomUUID().toString() + "/";
+    String key2 = prefix2 + "KEY" + UUID.randomUUID().toString();
+
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+    assertNotNull("Bucket creation failed", bucket);
+
+    writeKey(key1, bucket);
+    writeKey(key2, bucket);
+
+    OzoneObj ozObj = new OzoneObjInfo.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setPrefixName(prefix1)
+        .setResType(OzoneObj.ResourceType.PREFIX)
+        .setStoreType(OzoneObj.StoreType.OZONE)
+        .build();
+
+    // add acl
+    BitSet aclRights1 = new BitSet();
+    aclRights1.set(ACLType.READ.ordinal());
+    OzoneAcl user1Acl = new OzoneAcl(ACLIdentityType.USER,
+        "user1", aclRights1);
+    assertTrue(store.addAcl(ozObj, user1Acl));
+
+    // get acl
+    List<OzoneAcl> aclsGet = store.getAcl(ozObj);
+    Assert.assertEquals(1, aclsGet.size());
+    Assert.assertEquals(user1Acl, aclsGet.get(0));
+
+    // remove acl
+    Assert.assertTrue(store.removeAcl(ozObj, user1Acl));
+    aclsGet = store.getAcl(ozObj);
+    Assert.assertEquals(0, aclsGet.size());
+
+    // set acl
+    BitSet aclRights2 = new BitSet();
+    aclRights2.set(ACLType.ALL.ordinal());
+    OzoneAcl group1Acl = new OzoneAcl(ACLIdentityType.GROUP,
+        "group1", aclRights2);
+    List<OzoneAcl> acls = new ArrayList<>();
+    acls.add(user1Acl);
+    acls.add(group1Acl);
+    Assert.assertTrue(store.setAcl(ozObj, acls));
+
+    // get acl
+    aclsGet = store.getAcl(ozObj);
+    Assert.assertEquals(2, aclsGet.size());
+  }
+
   /**
    * Helper function to get default acl list for current user.
    *
@@ -2218,8 +2279,7 @@ public abstract class TestOzoneRpcClientAbstract {
     listOfAcls.add(new OzoneAcl(ACLIdentityType.USER,
         ugi.getUserName(), userRights));
     //Group ACLs of the User
-    List<String> userGroups = Arrays.asList(UserGroupInformation
-        .createRemoteUser(ugi.getUserName()).getGroupNames());
+    List<String> userGroups = Arrays.asList(ugi.getGroupNames());
     userGroups.stream().forEach((group) -> listOfAcls.add(
         new OzoneAcl(ACLIdentityType.GROUP, group, groupRights)));
     return listOfAcls;
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
index 51c0cfa..1259f71 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -18,7 +18,6 @@ package org.apache.hadoop.ozone.om;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
-import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@ -36,7 +35,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .KeyInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .KeyLocation;
-import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.utils.BackgroundService;
 
 import java.io.IOException;
@@ -286,43 +284,4 @@ public interface KeyManager extends OzoneManagerFS {
   OmMultipartUploadListParts listParts(String volumeName, String bucketName,
       String keyName, String uploadID, int partNumberMarker,
       int maxParts)  throws IOException;
-
-  /**
-   * Add acl for Ozone object. Return true if acl is added successfully else
-   * false.
-   * @param obj Ozone object for which acl should be added.
-   * @param acl ozone acl top be added.
-   *
-   * @throws IOException if there is error.
-   * */
-  boolean addAcl(OzoneObj obj, OzoneAcl acl) throws IOException;
-
-  /**
-   * Remove acl for Ozone object. Return true if acl is removed successfully
-   * else false.
-   * @param obj Ozone object.
-   * @param acl Ozone acl to be removed.
-   *
-   * @throws IOException if there is error.
-   * */
-  boolean removeAcl(OzoneObj obj, OzoneAcl acl) throws IOException;
-
-  /**
-   * Acls to be set for given Ozone object. This operations reset ACL for
-   * given object to list of ACLs provided in argument.
-   * @param obj Ozone object.
-   * @param acls List of acls.
-   *
-   * @throws IOException if there is error.
-   * */
-  boolean setAcl(OzoneObj obj, List<OzoneAcl> acls) throws IOException;
-
-  /**
-   * Returns list of ACLs for given Ozone object.
-   * @param obj Ozone object.
-   *
-   * @throws IOException if there is error.
-   * */
-  List<OzoneAcl> getAcl(OzoneObj obj) throws IOException;
-
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 5985254..9ce581b 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -237,6 +237,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   private final VolumeManager volumeManager;
   private final BucketManager bucketManager;
   private final KeyManager keyManager;
+  private final PrefixManagerImpl prefixManager;
   private final OMMetrics metrics;
   private OzoneManagerHttpServer httpServer;
   private final OMStorage omStorage;
@@ -365,6 +366,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
         new ScmClient(scmBlockClient, scmContainerClient), metadataManager,
         configuration, omStorage.getOmId(), blockTokenMgr, getKmsProvider());
 
+    prefixManager = new PrefixManagerImpl(metadataManager);
+
     shutdownHook = () -> {
       saveOmMetrics();
     };
@@ -3033,6 +3036,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       return bucketManager.addAcl(obj, acl);
     case KEY:
       return keyManager.addAcl(obj, acl);
+    case PREFIX:
+      return prefixManager.addAcl(obj, acl);
     default:
       throw new OMException("Unexpected resource type: " +
           obj.getResourceType(), INVALID_REQUEST);
@@ -3057,11 +3062,13 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     switch (obj.getResourceType()) {
     case VOLUME:
       return volumeManager.removeAcl(obj, acl);
-
     case BUCKET:
       return bucketManager.removeAcl(obj, acl);
     case KEY:
       return keyManager.removeAcl(obj, acl);
+    case PREFIX:
+      return prefixManager.removeAcl(obj, acl);
+
     default:
       throw new OMException("Unexpected resource type: " +
           obj.getResourceType(), INVALID_REQUEST);
@@ -3090,6 +3097,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       return bucketManager.setAcl(obj, acls);
     case KEY:
       return keyManager.setAcl(obj, acls);
+    case PREFIX:
+      return prefixManager.setAcl(obj, acls);
     default:
       throw new OMException("Unexpected resource type: " +
           obj.getResourceType(), INVALID_REQUEST);
@@ -3116,6 +3125,9 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       return bucketManager.getAcl(obj);
     case KEY:
       return keyManager.getAcl(obj);
+    case PREFIX:
+      return prefixManager.getAcl(obj);
+
     default:
       throw new OMException("Unexpected resource type: " +
           obj.getResourceType(), INVALID_REQUEST);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PrefixManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PrefixManager.java
new file mode 100644
index 0000000..a505b8d
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PrefixManager.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
+
+import java.util.List;
+
+/**
+ * Handles prefix commands.
+ * //TODO: support OzoneManagerFS for ozfs optimization using prefix tree.
+ */
+public interface PrefixManager extends IOzoneAcl {
+
+  /**
+   * Returns the metadataManager.
+   * @return OMMetadataManager.
+   */
+  OMMetadataManager getMetadataManager();
+
+  /**
+   * Get the list of path components that match with obj's path.
+   * longest prefix.
+   * Note: the number of the entries include a root "/"
+   * so if you have a longtest prefix path /a/b/c/
+   * the returned list will be ["/", "a", "b", "c"]
+   * @param path ozone object path
+   * @return list of longest path components that matches obj's path.
+   */
+  List<OmPrefixInfo> getLongestPrefixPath(String path);
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PrefixManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PrefixManagerImpl.java
new file mode 100644
index 0000000..b9aff89
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PrefixManagerImpl.java
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import com.google.common.base.Strings;
+import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.util.RadixNode;
+import org.apache.hadoop.ozone.util.RadixTree;
+import org.apache.hadoop.utils.db.*;
+import org.apache.hadoop.utils.db.Table.KeyValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.PREFIX_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_FOUND;
+import static org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.PREFIX;
+
+/**
+ * Implementation of PreManager.
+ */
+public class PrefixManagerImpl implements PrefixManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PrefixManagerImpl.class);
+
+  private static final List<OzoneAcl> EMPTY_ACL_LIST = new ArrayList<>();
+  private final OMMetadataManager metadataManager;
+
+  // In-memory prefix tree to optimize ACL evaluation
+  private RadixTree<OmPrefixInfo> prefixTree;
+
+  public PrefixManagerImpl(OMMetadataManager metadataManager) {
+    this.metadataManager = metadataManager;
+    loadPrefixTree();
+  }
+
+  private void loadPrefixTree() {
+    prefixTree = new RadixTree<>();
+    try (TableIterator<String, ? extends
+        KeyValue<String, OmPrefixInfo>> iterator =
+             getMetadataManager().getPrefixTable().iterator()) {
+      iterator.seekToFirst();
+      while (iterator.hasNext()) {
+        KeyValue<String, OmPrefixInfo> kv = iterator.next();
+        prefixTree.insert(kv.getKey(), kv.getValue());
+      }
+    } catch (IOException ex) {
+      LOG.error("Fail to load prefix tree");
+    }
+  }
+
+
+  @Override
+  public OMMetadataManager getMetadataManager() {
+    return metadataManager;
+  }
+
+  /**
+   * Add acl for Ozone object. Return true if acl is added successfully else
+   * false.
+   *
+   * @param obj Ozone object for which acl should be added.
+   * @param acl ozone acl top be added.
+   * @throws IOException if there is error.
+   */
+  @Override
+  public boolean addAcl(OzoneObj obj, OzoneAcl acl) throws IOException {
+    validateOzoneObj(obj);
+
+    String prefixPath = obj.getPath();
+    metadataManager.getLock().acquirePrefixLock(prefixPath);
+    try {
+      OmPrefixInfo prefixInfo =
+          metadataManager.getPrefixTable().get(prefixPath);
+      List<OzoneAcl> list = null;
+      if (prefixInfo != null) {
+        list = prefixInfo.getAcls();
+      }
+
+      if (list == null) {
+        list = new ArrayList<>();
+        list.add(acl);
+      } else {
+        boolean found = false;
+        for (OzoneAcl a: list) {
+          if (a.getName().equals(acl.getName()) &&
+              a.getType() == acl.getType()) {
+            found = true;
+            a.getAclBitSet().or(acl.getAclBitSet());
+            break;
+          }
+        }
+        if (!found) {
+          list.add(acl);
+        }
+      }
+
+      OmPrefixInfo.Builder upiBuilder = OmPrefixInfo.newBuilder();
+      upiBuilder.setName(prefixPath).setAcls(list);
+      if (prefixInfo != null && prefixInfo.getMetadata() != null) {
+        upiBuilder.addAllMetadata(prefixInfo.getMetadata());
+      }
+      prefixInfo = upiBuilder.build();
+      // Persist into prefix table first
+      metadataManager.getPrefixTable().put(prefixPath, prefixInfo);
+      // update the in-memory prefix tree
+      prefixTree.insert(prefixPath, prefixInfo);
+    } catch (IOException ex) {
+      if (!(ex instanceof OMException)) {
+        LOG.error("Add acl operation failed for prefix path:{} acl:{}",
+            prefixPath, acl, ex);
+      }
+      throw ex;
+    } finally {
+      metadataManager.getLock().releasePrefixLock(prefixPath);
+    }
+    return true;
+  }
+
+  /**
+   * Remove acl for Ozone object. Return true if acl is removed successfully
+   * else false.
+   *
+   * @param obj Ozone object.
+   * @param acl Ozone acl to be removed.
+   * @throws IOException if there is error.
+   */
+  @Override
+  public boolean removeAcl(OzoneObj obj, OzoneAcl acl) throws IOException {
+    validateOzoneObj(obj);
+    String prefixPath = obj.getPath();
+    metadataManager.getLock().acquirePrefixLock(prefixPath);
+    try {
+      OmPrefixInfo prefixInfo =
+          metadataManager.getPrefixTable().get(prefixPath);
+      List<OzoneAcl> list = null;
+      if (prefixInfo != null) {
+        list = prefixInfo.getAcls();
+      }
+
+      if (list == null) {
+        LOG.debug("acl {} does not exist for prefix path {}", acl, prefixPath);
+        return false;
+      }
+
+      boolean found = false;
+      for (OzoneAcl a: list) {
+        if (a.getName().equals(acl.getName())
+            && a.getType() == acl.getType()) {
+          found = true;
+          a.getAclBitSet().andNot(acl.getAclBitSet());
+          if (a.getAclBitSet().isEmpty()) {
+            list.remove(a);
+          }
+          break;
+        }
+      }
+      if (!found) {
+        LOG.debug("acl {} does not exist for prefix path {}", acl, prefixPath);
+        return false;
+      }
+
+      if (!list.isEmpty()) {
+        OmPrefixInfo.Builder upiBuilder = OmPrefixInfo.newBuilder();
+        upiBuilder.setName(prefixPath).setAcls(list);
+        if (prefixInfo != null && prefixInfo.getMetadata() != null) {
+          upiBuilder.addAllMetadata(prefixInfo.getMetadata());
+        }
+        prefixInfo = upiBuilder.build();
+        metadataManager.getPrefixTable().put(prefixPath, prefixInfo);
+        prefixTree.insert(prefixPath, prefixInfo);
+      } else {
+        // Remove prefix entry in table and prefix tree if the # of acls is 0
+        metadataManager.getPrefixTable().delete(prefixPath);
+        prefixTree.removePrefixPath(prefixPath);
+      }
+
+    } catch (IOException ex) {
+      if (!(ex instanceof OMException)) {
+        LOG.error("Remove prefix acl operation failed for prefix path:{}" +
+            " acl:{}", prefixPath, acl, ex);
+      }
+      throw ex;
+    } finally {
+      metadataManager.getLock().releasePrefixLock(prefixPath);
+    }
+    return true;
+  }
+
+  /**
+   * Acls to be set for given Ozone object. This operations reset ACL for given
+   * object to list of ACLs provided in argument.
+   *
+   * @param obj Ozone object.
+   * @param acls List of acls.
+   * @throws IOException if there is error.
+   */
+  @Override
+  public boolean setAcl(OzoneObj obj, List<OzoneAcl> acls) throws IOException {
+    validateOzoneObj(obj);
+    String prefixPath = obj.getPath();
+    metadataManager.getLock().acquirePrefixLock(prefixPath);
+    try {
+      OmPrefixInfo prefixInfo =
+          metadataManager.getPrefixTable().get(prefixPath);
+      OmPrefixInfo.Builder upiBuilder = OmPrefixInfo.newBuilder();
+      upiBuilder.setName(prefixPath).setAcls(acls);
+      if (prefixInfo != null && prefixInfo.getMetadata() != null) {
+        upiBuilder.addAllMetadata(prefixInfo.getMetadata());
+      }
+      prefixInfo = upiBuilder.build();
+      prefixTree.insert(prefixPath, prefixInfo);
+      metadataManager.getPrefixTable().put(prefixPath, prefixInfo);
+    } catch (IOException ex) {
+      if (!(ex instanceof OMException)) {
+        LOG.error("Set prefix acl operation failed for prefix path:{} acls:{}",
+            prefixPath, acls, ex);
+      }
+      throw ex;
+    } finally {
+      metadataManager.getLock().releasePrefixLock(prefixPath);
+    }
+    return true;
+  }
+
+  /**
+   * Returns list of ACLs for given Ozone object.
+   *
+   * @param obj Ozone object.
+   * @throws IOException if there is error.
+   */
+  @Override
+  public List<OzoneAcl> getAcl(OzoneObj obj) throws IOException {
+    validateOzoneObj(obj);
+    String prefixPath = obj.getPath();
+    metadataManager.getLock().acquirePrefixLock(prefixPath);
+    try {
+      String longestPrefix = prefixTree.getLongestPrefix(prefixPath);
+      if (prefixPath.equals(longestPrefix)) {
+        RadixNode<OmPrefixInfo> lastNode =
+            prefixTree.getLastNodeInPrefixPath(prefixPath);
+        if (lastNode != null && lastNode.getValue() != null) {
+          return lastNode.getValue().getAcls();
+        }
+      }
+    } finally {
+      metadataManager.getLock().releasePrefixLock(prefixPath);
+    }
+    return EMPTY_ACL_LIST;
+  }
+
+  @Override
+  public List<OmPrefixInfo> getLongestPrefixPath(String path) {
+    String prefixPath = prefixTree.getLongestPrefix(path);
+    metadataManager.getLock().acquirePrefixLock(prefixPath);
+    try {
+      return prefixTree.getLongestPrefixPath(prefixPath).stream()
+          .map(c -> c.getValue()).collect(Collectors.toList());
+    } finally {
+      metadataManager.getLock().releasePrefixLock(prefixPath);
+    }
+  }
+
+  /**
+   * Helper method to validate ozone object.
+   * @param obj
+   * */
+  private void validateOzoneObj(OzoneObj obj) throws OMException {
+    Objects.requireNonNull(obj);
+
+    if (!obj.getResourceType().equals(PREFIX)) {
+      throw new IllegalArgumentException("Unexpected argument passed to " +
+          "PrefixManager. OzoneObj type:" + obj.getResourceType());
+    }
+    String volume = obj.getVolumeName();
+    String bucket = obj.getBucketName();
+    String prefixName = obj.getPrefixName();
+
+    if (Strings.isNullOrEmpty(volume)) {
+      throw new OMException("Volume name is required.", VOLUME_NOT_FOUND);
+    }
+    if (Strings.isNullOrEmpty(bucket)) {
+      throw new OMException("Bucket name is required.", BUCKET_NOT_FOUND);
+    }
+    if (Strings.isNullOrEmpty(prefixName)) {
+      throw new OMException("Prefix name is required.", PREFIX_NOT_FOUND);
+    }
+    if (!prefixName.endsWith("/")) {
+      throw new OMException("Invalid prefix name: " + prefixName,
+          PREFIX_NOT_FOUND);
+    }
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/fs/OzoneManagerFS.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/fs/OzoneManagerFS.java
index 46ba58d..bff883d 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/fs/OzoneManagerFS.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/fs/OzoneManagerFS.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.om.fs;
 
+import org.apache.hadoop.ozone.om.IOzoneAcl;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
@@ -29,7 +30,7 @@ import java.util.List;
 /**
  * Ozone Manager FileSystem interface.
  */
-public interface OzoneManagerFS {
+public interface OzoneManagerFS extends IOzoneAcl {
   OzoneFileStatus getFileStatus(OmKeyArgs args) throws IOException;
 
   void createDirectory(OmKeyArgs args) throws IOException;
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
index fb323fe..e9e6b25 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -47,21 +48,38 @@ import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.ozone.om.helpers.*;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
+import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
 import org.apache.hadoop.ozone.web.utils.OzoneUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.LambdaTestUtils;
 
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
-import org.junit.AfterClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.mockito.Mockito;
 
-import static org.apache.hadoop.ozone.OzoneConfigKeys.*;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_BLOCKS_MAX;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
 import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.ALL;
 
 /**
@@ -69,6 +87,7 @@ import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.ALL
  */
 public class TestKeyManagerImpl {
 
+  private static PrefixManager prefixManager;
   private static KeyManagerImpl keyManager;
   private static VolumeManagerImpl volumeManager;
   private static BucketManagerImpl bucketManager;
@@ -82,6 +101,9 @@ public class TestKeyManagerImpl {
   private static final String BUCKET_NAME = "bucket1";
   private static final String VOLUME_NAME = "vol1";
 
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
   @BeforeClass
   public static void setUp() throws Exception {
     conf = new OzoneConfiguration();
@@ -105,6 +127,8 @@ public class TestKeyManagerImpl {
     keyManager =
         new KeyManagerImpl(scm.getBlockProtocolServer(), metadataManager, conf,
             "om1", null);
+    prefixManager = new PrefixManagerImpl(metadataManager);
+
     Mockito.when(mockScmBlockLocationProtocol
         .allocateBlock(Mockito.anyLong(), Mockito.anyInt(),
             Mockito.any(ReplicationType.class),
@@ -323,6 +347,213 @@ public class TestKeyManagerImpl {
     }
   }
 
+
+  @Test
+  public void testPrefixAclOps() throws IOException {
+    String volumeName = "vol1";
+    String bucketName = "bucket1";
+    String prefix1 = "pf1/";
+
+    OzoneObj ozPrefix1 = new OzoneObjInfo.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setPrefixName(prefix1)
+        .setResType(OzoneObj.ResourceType.PREFIX)
+        .setStoreType(OzoneObj.StoreType.OZONE)
+        .build();
+
+    OzoneAcl ozAcl1 = new OzoneAcl(ACLIdentityType.USER, "user1",
+        ACLType.READ);
+    prefixManager.addAcl(ozPrefix1, ozAcl1);
+
+    List<OzoneAcl> ozAclGet = prefixManager.getAcl(ozPrefix1);
+    Assert.assertEquals(1, ozAclGet.size());
+    Assert.assertEquals(ozAcl1, ozAclGet.get(0));
+
+    List<OzoneAcl> acls = new ArrayList<>();
+    OzoneAcl ozAcl2 = new OzoneAcl(ACLIdentityType.USER, "admin",
+        ACLType.ALL);
+
+    BitSet rwRights = new BitSet();
+    rwRights.set(IAccessAuthorizer.ACLType.WRITE.ordinal());
+    rwRights.set(IAccessAuthorizer.ACLType.READ.ordinal());
+    OzoneAcl ozAcl3 = new OzoneAcl(ACLIdentityType.GROUP, "dev",
+        rwRights);
+
+    BitSet wRights = new BitSet();
+    wRights.set(IAccessAuthorizer.ACLType.WRITE.ordinal());
+    OzoneAcl ozAcl4 = new OzoneAcl(ACLIdentityType.GROUP, "dev",
+        wRights);
+
+    BitSet rRights = new BitSet();
+    rRights.set(IAccessAuthorizer.ACLType.READ.ordinal());
+    OzoneAcl ozAcl5 = new OzoneAcl(ACLIdentityType.GROUP, "dev",
+        rRights);
+
+    acls.add(ozAcl2);
+    acls.add(ozAcl3);
+
+    prefixManager.setAcl(ozPrefix1, acls);
+    ozAclGet = prefixManager.getAcl(ozPrefix1);
+    Assert.assertEquals(2, ozAclGet.size());
+
+    int matchEntries = 0;
+    for (OzoneAcl acl : ozAclGet) {
+      if (acl.getType() == ACLIdentityType.GROUP) {
+        Assert.assertEquals(ozAcl3, acl);
+        matchEntries++;
+      }
+      if (acl.getType() == ACLIdentityType.USER) {
+        Assert.assertEquals(ozAcl2, acl);
+        matchEntries++;
+      }
+    }
+    Assert.assertEquals(2, matchEntries);
+
+    boolean result = prefixManager.removeAcl(ozPrefix1, ozAcl4);
+    Assert.assertEquals(true, result);
+
+    ozAclGet = prefixManager.getAcl(ozPrefix1);
+    Assert.assertEquals(2, ozAclGet.size());
+
+    result = prefixManager.removeAcl(ozPrefix1, ozAcl3);
+    Assert.assertEquals(true, result);
+    ozAclGet = prefixManager.getAcl(ozPrefix1);
+    Assert.assertEquals(1, ozAclGet.size());
+
+    Assert.assertEquals(ozAcl2, ozAclGet.get(0));
+
+    // add dev:w
+    prefixManager.addAcl(ozPrefix1, ozAcl4);
+    ozAclGet = prefixManager.getAcl(ozPrefix1);
+    Assert.assertEquals(2, ozAclGet.size());
+
+    // add dev:r and validate the acl bitset combined
+    prefixManager.addAcl(ozPrefix1, ozAcl5);
+    ozAclGet = prefixManager.getAcl(ozPrefix1);
+    Assert.assertEquals(2, ozAclGet.size());
+
+    matchEntries = 0;
+    for (OzoneAcl acl : ozAclGet) {
+      if (acl.getType() == ACLIdentityType.GROUP) {
+        Assert.assertEquals(ozAcl3, acl);
+        matchEntries++;
+      }
+      if (acl.getType() == ACLIdentityType.USER) {
+        Assert.assertEquals(ozAcl2, acl);
+        matchEntries++;
+      }
+    }
+    Assert.assertEquals(2, matchEntries);
+  }
+
+  @Test
+  public void testInvalidPrefixAcl() throws IOException {
+    String volumeName = "vol1";
+    String bucketName = "bucket1";
+    String prefix1 = "pf1/";
+
+    // Invalid prefix not ending with "/"
+    String invalidPrefix = "invalid/pf";
+    OzoneAcl ozAcl1 = new OzoneAcl(ACLIdentityType.USER, "user1",
+        ACLType.READ);
+
+    OzoneObj ozInvalidPrefix = new OzoneObjInfo.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setPrefixName(invalidPrefix)
+        .setResType(OzoneObj.ResourceType.PREFIX)
+        .setStoreType(OzoneObj.StoreType.OZONE)
+        .build();
+
+    // add acl with invalid prefix name
+    exception.expect(OMException.class);
+    exception.expectMessage("Invalid prefix name");
+    prefixManager.addAcl(ozInvalidPrefix, ozAcl1);
+
+    OzoneObj ozPrefix1 = new OzoneObjInfo.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setPrefixName(prefix1)
+        .setResType(OzoneObj.ResourceType.PREFIX)
+        .setStoreType(OzoneObj.StoreType.OZONE)
+        .build();
+
+
+    List<OzoneAcl> ozAclGet = prefixManager.getAcl(ozPrefix1);
+    Assert.assertEquals(1, ozAclGet.size());
+    Assert.assertEquals(ozAcl1, ozAclGet.get(0));
+
+    // get acl with invalid prefix name
+    exception.expect(OMException.class);
+    exception.expectMessage("Invalid prefix name");
+    ozAclGet = prefixManager.getAcl(ozInvalidPrefix);
+    Assert.assertEquals(null, ozAcl1);
+
+    // set acl with invalid prefix name
+    List<OzoneAcl> ozoneAcls = new ArrayList<OzoneAcl>();
+    ozoneAcls.add(ozAcl1);
+    exception.expect(OMException.class);
+    exception.expectMessage("Invalid prefix name");
+    prefixManager.setAcl(ozInvalidPrefix, ozoneAcls);
+
+    // remove acl with invalid prefix name
+    exception.expect(OMException.class);
+    exception.expectMessage("Invalid prefix name");
+    prefixManager.removeAcl(ozInvalidPrefix, ozAcl1);
+  }
+
+  @Test
+  public void testLongestPrefixPath() throws IOException {
+    String volumeName = "vol1";
+    String bucketName = "bucket1";
+    String prefix1 = "pf1/pf11/pf111/pf1111/";
+    String file1 = "pf1/pf11/file1";
+    String file2 = "pf1/pf11/pf111/pf1111/file2";
+
+    OzoneObj ozPrefix1 = new OzoneObjInfo.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setPrefixName(prefix1)
+        .setResType(OzoneObj.ResourceType.PREFIX)
+        .setStoreType(OzoneObj.StoreType.OZONE)
+        .build();
+
+    OzoneAcl ozAcl1 = new OzoneAcl(ACLIdentityType.USER, "user1",
+        ACLType.READ);
+    prefixManager.addAcl(ozPrefix1, ozAcl1);
+
+    OzoneObj ozFile1 = new OzoneObjInfo.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(file1)
+        .setResType(OzoneObj.ResourceType.KEY)
+        .setStoreType(OzoneObj.StoreType.OZONE)
+        .build();
+
+    List<OmPrefixInfo> prefixInfos =
+        prefixManager.getLongestPrefixPath(ozFile1.getPath());
+    Assert.assertEquals(5, prefixInfos.size());
+
+    OzoneObj ozFile2 = new OzoneObjInfo.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setPrefixName(file2)
+        .setResType(OzoneObj.ResourceType.KEY)
+        .setStoreType(OzoneObj.StoreType.OZONE)
+        .build();
+
+    prefixInfos =
+        prefixManager.getLongestPrefixPath(ozFile2.getPath());
+    Assert.assertEquals(7, prefixInfos.size());
+    // Only the last node has acl on it
+    Assert.assertEquals(ozAcl1, prefixInfos.get(6).getAcls().get(0));
+    // All other nodes don't have acl value associate with it
+    for (int i = 0; i < 6; i++) {
+      Assert.assertEquals(null, prefixInfos.get(i));
+    }
+  }
+
   @Test
   public void testLookupFile() throws IOException {
     String keyName = RandomStringUtils.randomAlphabetic(5);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org