You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2019/08/21 23:35:59 UTC

[hadoop] branch trunk updated: HDDS-1927. Consolidate add/remove Acl into OzoneAclUtil class. Contributed by Xiaoyu Yao.

This is an automated email from the ASF dual-hosted git repository.

aengineer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d58eba8  HDDS-1927. Consolidate add/remove Acl into OzoneAclUtil class. Contributed by Xiaoyu Yao.
d58eba8 is described below

commit d58eba867234eaac0e229feb990e9dab3912e063
Author: Xiaoyu Yao <xy...@apache.org>
AuthorDate: Mon Aug 19 15:54:44 2019 -0700

    HDDS-1927. Consolidate add/remove Acl into OzoneAclUtil class. Contributed by Xiaoyu Yao.
    
    Signed-off-by: Anu Engineer <ae...@apache.org>
---
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |   5 +-
 .../java/org/apache/hadoop/ozone/OzoneAcl.java     |   5 +
 .../hadoop/ozone/om/helpers/OmBucketInfo.java      |  76 +-----
 .../apache/hadoop/ozone/om/helpers/OmKeyInfo.java  | 147 ++---------
 .../hadoop/ozone/om/helpers/OmOzoneAclMap.java     |   7 +-
 .../hadoop/ozone/om/helpers/OmPrefixInfo.java      |  30 ++-
 .../hadoop/ozone/om/helpers/OzoneAclUtil.java      | 286 +++++++++++++++++++++
 .../apache/hadoop/ozone/web/utils/OzoneUtils.java  | 158 ------------
 .../hadoop/ozone/om/helpers/TestOzoneAclUtil.java  | 191 ++++++++++++++
 .../client/rpc/TestOzoneRpcClientAbstract.java     |  15 +-
 .../apache/hadoop/ozone/om/TestKeyManagerImpl.java |   8 +-
 .../apache/hadoop/ozone/om/TestOzoneManager.java   |   3 +-
 .../security/acl/TestOzoneNativeAuthorizer.java    |   4 +-
 .../web/storage/DistributedStorageHandler.java     |   3 +-
 .../apache/hadoop/ozone/om/BucketManagerImpl.java  |  91 +------
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 204 ++++-----------
 .../apache/hadoop/ozone/om/PrefixManagerImpl.java  | 186 ++++----------
 .../om/request/file/OMDirectoryCreateRequest.java  |  11 +-
 .../hadoop/ozone/om/request/key/OMKeyRequest.java  |  13 +-
 .../om/request/key/acl/OMKeyAddAclRequest.java     |   7 +-
 .../om/request/key/acl/OMKeyRemoveAclRequest.java  |   7 +-
 .../om/request/key/acl/OMKeySetAclRequest.java     |   9 +-
 .../S3InitiateMultipartUploadRequest.java          |   3 +-
 .../S3MultipartUploadCompleteRequest.java          |   4 +-
 .../protocolPB/OzoneManagerRequestHandler.java     |   5 +-
 25 files changed, 699 insertions(+), 779 deletions(-)

diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index fbb488e..003bcc4 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -64,9 +64,10 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
 import org.apache.hadoop.ozone.om.helpers.OmPartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
+import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
-import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import org.apache.hadoop.ozone.om.protocolPB
     .OzoneManagerProtocolClientSideTranslatorPB;
@@ -440,7 +441,7 @@ public class RpcClient implements ClientProtocol {
    * @return listOfAcls
    * */
   private List<OzoneAcl> getAclList() {
-    return OzoneUtils.getAclList(ugi.getUserName(), ugi.getGroups(),
+    return OzoneAclUtil.getAclList(ugi.getUserName(), ugi.getGroups(),
         userRights, groupRights);
   }
 
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OzoneAcl.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OzoneAcl.java
index 1730a4f..6a74342 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OzoneAcl.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OzoneAcl.java
@@ -328,6 +328,11 @@ public class OzoneAcl {
         otherAcl.getAclScope().equals(this.getAclScope());
   }
 
+  public OzoneAcl setAclScope(AclScope scope) {
+    this.aclScope = scope;
+    return this;
+  }
+
   /**
    * Scope of ozone acl.
    * */
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
index 4d764a5..4207583 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.ozone.om.helpers;
 
 
-import java.util.BitSet;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
@@ -37,8 +36,6 @@ import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
 
 import com.google.common.base.Preconditions;
 
-import static org.apache.hadoop.ozone.OzoneAcl.ZERO_BITSET;
-
 /**
  * A class that encapsulates Bucket Info.
  */
@@ -135,36 +132,7 @@ public final class OmBucketInfo extends WithMetadata implements Auditable {
    * already existing in the acl list.
    */
   public boolean addAcl(OzoneAcl ozoneAcl) {
-    // Case 1: When we are adding more rights to existing user/group.
-    boolean addToExistingAcl = false;
-    for(OzoneAcl existingAcl: getAcls()) {
-      if(existingAcl.getName().equals(ozoneAcl.getName()) &&
-          existingAcl.getType().equals(ozoneAcl.getType())) {
-
-        BitSet bits = (BitSet) ozoneAcl.getAclBitSet().clone();
-
-        // We need to do "or" before comparision because think of a case like
-        // existing acl is 777 and newly added acl is 444, we have already
-        // that acl set. In this case if we do direct check they will not
-        // be equal, but if we do or and then check, we shall know it
-        // has acl's already set or not.
-        bits.or(existingAcl.getAclBitSet());
-
-        if (bits.equals(existingAcl.getAclBitSet())) {
-          return false;
-        } else {
-          existingAcl.getAclBitSet().or(ozoneAcl.getAclBitSet());
-          addToExistingAcl = true;
-          break;
-        }
-      }
-    }
-
-    // Case 2: When a completely new acl is added.
-    if(!addToExistingAcl) {
-      getAcls().add(ozoneAcl);
-    }
-    return true;
+    return OzoneAclUtil.addAcl(acls, ozoneAcl);
   }
 
   /**
@@ -174,36 +142,7 @@ public final class OmBucketInfo extends WithMetadata implements Auditable {
    * to that acl is not in the existing acl list.
    */
   public boolean removeAcl(OzoneAcl ozoneAcl) {
-    boolean removed = false;
-
-    // When we are removing subset of rights from existing acl.
-    for(OzoneAcl existingAcl: getAcls()) {
-      if (existingAcl.getName().equals(ozoneAcl.getName()) &&
-          existingAcl.getType().equals(ozoneAcl.getType())) {
-        BitSet bits = (BitSet) ozoneAcl.getAclBitSet().clone();
-        bits.and(existingAcl.getAclBitSet());
-
-        // This happens when the acl bitset is not existing for current name
-        // and type.
-        // Like a case we have 444 permission, 333 is asked to removed.
-        if (bits.equals(ZERO_BITSET)) {
-          return false;
-        }
-
-        // We have some matching. Remove them.
-        existingAcl.getAclBitSet().xor(bits);
-
-        // If existing acl has same bitset as passed acl bitset, remove that
-        // acl from the list
-        if (existingAcl.getAclBitSet().equals(ZERO_BITSET)) {
-          getAcls().remove(existingAcl);
-        }
-        removed = true;
-        break;
-      }
-    }
-
-    return removed;
+    return OzoneAclUtil.removeAcl(acls, ozoneAcl);
   }
 
   /**
@@ -212,9 +151,7 @@ public final class OmBucketInfo extends WithMetadata implements Auditable {
    * @return true - if successfully able to reset.
    */
   public boolean setAcls(List<OzoneAcl> ozoneAcls) {
-    this.acls.clear();
-    this.acls = ozoneAcls;
-    return true;
+    return OzoneAclUtil.setAcl(acls, ozoneAcls);
   }
 
   /**
@@ -307,7 +244,9 @@ public final class OmBucketInfo extends WithMetadata implements Auditable {
     }
 
     public Builder setAcls(List<OzoneAcl> listOfAcls) {
-      this.acls = listOfAcls;
+      if (listOfAcls != null) {
+        this.acls.addAll(listOfAcls);
+      }
       return this;
     }
 
@@ -367,8 +306,7 @@ public final class OmBucketInfo extends WithMetadata implements Auditable {
     BucketInfo.Builder bib =  BucketInfo.newBuilder()
         .setVolumeName(volumeName)
         .setBucketName(bucketName)
-        .addAllAcls(acls.stream().map(OzoneAcl::toProtobuf)
-            .collect(Collectors.toList()))
+        .addAllAcls(OzoneAclUtil.toProtobuf(acls))
         .setIsVersionEnabled(isVersionEnabled)
         .setStorageType(storageType.toProto())
         .setCreationTime(creationTime)
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
index 17aabd2..83adee9 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
@@ -19,25 +19,21 @@ package org.apache.hadoop.ozone.om.helpers;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.BitSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
-import com.google.protobuf.ByteString;
 import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
 import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
 import org.apache.hadoop.util.Time;
 
 import com.google.common.base.Preconditions;
 
-import static org.apache.hadoop.ozone.OzoneAcl.ZERO_BITSET;
-
 /**
  * Args for key block. The block instance for the key requested in putKey.
  * This is returned from OM to client, and client use class to talk to
@@ -58,7 +54,7 @@ public final class OmKeyInfo extends WithMetadata {
   /**
    * ACL Information.
    */
-  private List<OzoneAclInfo> acls;
+  private List<OzoneAcl> acls;
 
   @SuppressWarnings("parameternumber")
   OmKeyInfo(String volumeName, String bucketName, String keyName,
@@ -67,7 +63,7 @@ public final class OmKeyInfo extends WithMetadata {
       HddsProtos.ReplicationType type,
       HddsProtos.ReplicationFactor factor,
       Map<String, String> metadata,
-      FileEncryptionInfo encInfo, List<OzoneAclInfo> acls) {
+      FileEncryptionInfo encInfo, List<OzoneAcl> acls) {
     this.volumeName = volumeName;
     this.bucketName = bucketName;
     this.keyName = keyName;
@@ -235,123 +231,22 @@ public final class OmKeyInfo extends WithMetadata {
     return encInfo;
   }
 
-  public List<OzoneAclInfo> getAcls() {
+  public List<OzoneAcl> getAcls() {
     return acls;
   }
 
-  /**
-   * Add an ozoneAcl to list of existing Acl set.
-   * @param ozoneAcl
-   * @return true - if successfully added, false if not added or acl is
-   * already existing in the acl list.
-   */
-  public boolean addAcl(OzoneAclInfo ozoneAcl) {
-    // Case 1: When we are adding more rights to existing user/group.
-    boolean addToExistingAcl = false;
-    for(OzoneAclInfo existingAcl: getAcls()) {
-      if(existingAcl.getName().equals(ozoneAcl.getName()) &&
-          existingAcl.getType().equals(ozoneAcl.getType())) {
-
-        // We need to do "or" before comparision because think of a case like
-        // existing acl is 777 and newly added acl is 444, we have already
-        // that acl set. In this case if we do direct check they will not
-        // be equal, but if we do or and then check, we shall know it
-        // has acl's already set or not.
-        BitSet newAclBits = BitSet.valueOf(
-            existingAcl.getRights().toByteArray());
-
-        newAclBits.or(BitSet.valueOf(ozoneAcl.getRights().toByteArray()));
-
-        if (newAclBits.equals(BitSet.valueOf(
-            existingAcl.getRights().toByteArray()))) {
-          return false;
-        } else {
-          OzoneAclInfo newAcl = OzoneAclInfo.newBuilder()
-              .setType(ozoneAcl.getType())
-              .setName(ozoneAcl.getName())
-              .setAclScope(ozoneAcl.getAclScope())
-              .setRights(ByteString.copyFrom(newAclBits.toByteArray()))
-              .build();
-          getAcls().remove(existingAcl);
-          getAcls().add(newAcl);
-          addToExistingAcl = true;
-          break;
-        }
-      }
-    }
-
-    // Case 2: When a completely new acl is added.
-    if(!addToExistingAcl) {
-      getAcls().add(ozoneAcl);
-    }
-    return true;
+  public boolean addAcl(OzoneAcl acl) {
+    return OzoneAclUtil.addAcl(acls, acl);
   }
 
-  /**
-   * Remove acl from existing acl list.
-   * @param ozoneAcl
-   * @return true - if successfully removed, false if not able to remove due
-   * to that acl is not in the existing acl list.
-   */
-  public boolean removeAcl(OzoneAclInfo ozoneAcl) {
-    boolean removed = false;
-
-    // When we are removing subset of rights from existing acl.
-    for(OzoneAclInfo existingAcl: getAcls()) {
-      if (existingAcl.getName().equals(ozoneAcl.getName()) &&
-          existingAcl.getType().equals(ozoneAcl.getType())) {
-
-        BitSet bits = BitSet.valueOf(ozoneAcl.getRights().toByteArray());
-        BitSet existingAclBits =
-            BitSet.valueOf(existingAcl.getRights().toByteArray());
-        bits.and(existingAclBits);
-
-        // This happens when the acl bitset asked to remove is not set for
-        // matched name and type.
-        // Like a case we have 444 permission, 333 is asked to removed.
-        if (bits.equals(ZERO_BITSET)) {
-          return false;
-        }
-
-        // We have some matching. Remove them.
-        bits.xor(existingAclBits);
-
-        // If existing acl has same bitset as passed acl bitset, remove that
-        // acl from the list
-        if (bits.equals(ZERO_BITSET)) {
-          getAcls().remove(existingAcl);
-        } else {
-          // Remove old acl and add new acl.
-          OzoneAclInfo newAcl = OzoneAclInfo.newBuilder()
-              .setType(ozoneAcl.getType())
-              .setName(ozoneAcl.getName())
-              .setAclScope(ozoneAcl.getAclScope())
-              .setRights(ByteString.copyFrom(bits.toByteArray()))
-              .build();
-          getAcls().remove(existingAcl);
-          getAcls().add(newAcl);
-        }
-        removed = true;
-        break;
-      }
-    }
-
-    return removed;
+  public boolean removeAcl(OzoneAcl acl) {
+    return OzoneAclUtil.removeAcl(acls, acl);
   }
 
-  /**
-   * Reset the existing acl list.
-   * @param ozoneAcls
-   * @return true - if successfully able to reset.
-   */
-  public boolean setAcls(List<OzoneAclInfo> ozoneAcls) {
-    this.acls.clear();
-    this.acls = ozoneAcls;
-    return true;
+  public boolean setAcls(List<OzoneAcl> newAcls) {
+    return OzoneAclUtil.setAcl(acls, newAcls);
   }
 
-
-
   /**
    * Builder of OmKeyInfo.
    */
@@ -368,11 +263,12 @@ public final class OmKeyInfo extends WithMetadata {
     private HddsProtos.ReplicationFactor factor;
     private Map<String, String> metadata;
     private FileEncryptionInfo encInfo;
-    private List<OzoneAclInfo> acls;
+    private List<OzoneAcl> acls;
 
     public Builder() {
       this.metadata = new HashMap<>();
       omKeyLocationInfoGroups = new ArrayList<>();
+      acls = new ArrayList<>();
     }
 
     public Builder setVolumeName(String volume) {
@@ -436,9 +332,10 @@ public final class OmKeyInfo extends WithMetadata {
       return this;
     }
 
-    public Builder setAcls(List<OzoneAclInfo> listOfAcls) {
-      this.acls = new ArrayList<>();
-      this.acls.addAll(listOfAcls);
+    public Builder setAcls(List<OzoneAcl> listOfAcls) {
+      if (listOfAcls != null) {
+        this.acls.addAll(listOfAcls);
+      }
       return this;
     }
 
@@ -466,13 +363,11 @@ public final class OmKeyInfo extends WithMetadata {
         .setLatestVersion(latestVersion)
         .setCreationTime(creationTime)
         .setModificationTime(modificationTime)
-        .addAllMetadata(KeyValueUtil.toProtobuf(metadata));
+        .addAllMetadata(KeyValueUtil.toProtobuf(metadata))
+        .addAllAcls(OzoneAclUtil.toProtobuf(acls));
     if (encInfo != null) {
       kb.setFileEncryptionInfo(OMPBHelper.convert(encInfo));
     }
-    if(acls != null) {
-      kb.addAllAcls(acls);
-    }
     return kb.build();
   }
 
@@ -492,7 +387,8 @@ public final class OmKeyInfo extends WithMetadata {
         .addAllMetadata(KeyValueUtil.getFromProtobuf(keyInfo.getMetadataList()))
         .setFileEncryptionInfo(keyInfo.hasFileEncryptionInfo() ?
             OMPBHelper.convert(keyInfo.getFileEncryptionInfo()): null)
-        .setAcls(keyInfo.getAclsList()).build();
+        .setAcls(OzoneAclUtil.fromProtobuf(keyInfo.getAclsList()))
+        .build();
   }
 
   @Override
@@ -514,7 +410,8 @@ public final class OmKeyInfo extends WithMetadata {
             .equals(keyLocationVersions, omKeyInfo.keyLocationVersions) &&
         type == omKeyInfo.type &&
         factor == omKeyInfo.factor &&
-        Objects.equals(metadata, omKeyInfo.metadata);
+        Objects.equals(metadata, omKeyInfo.metadata) &&
+        Objects.equals(acls, omKeyInfo.acls);
   }
 
   @Override
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmOzoneAclMap.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmOzoneAclMap.java
index ac836fe..b4f0d16 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmOzoneAclMap.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmOzoneAclMap.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.ozone.protocol.proto
     .OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclType;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
-import org.apache.hadoop.ozone.web.utils.OzoneUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import java.util.BitSet;
@@ -235,12 +234,12 @@ public class OmOzoneAclMap {
 
     switch (identityType) {
     case USER:
-      return OzoneUtils.checkIfAclBitIsSet(acl, getAcl(identityType,
+      return OzoneAclUtil.checkIfAclBitIsSet(acl, getAcl(identityType,
           ugi.getUserName()));
     case GROUP:
       // Check access for user groups.
       for (String userGroup : ugi.getGroupNames()) {
-        if (OzoneUtils.checkIfAclBitIsSet(acl, getAcl(identityType,
+        if (OzoneAclUtil.checkIfAclBitIsSet(acl, getAcl(identityType,
             userGroup))) {
           // Return true if any user group has required permission.
           return true;
@@ -249,7 +248,7 @@ public class OmOzoneAclMap {
       break;
     default:
       // For type WORLD and ANONYMOUS we set acl type as name.
-      if(OzoneUtils.checkIfAclBitIsSet(acl, getAcl(identityType,
+      if(OzoneAclUtil.checkIfAclBitIsSet(acl, getAcl(identityType,
           identityType.name()))) {
         return true;
       }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmPrefixInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmPrefixInfo.java
index d625a81..26b5b1d 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmPrefixInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmPrefixInfo.java
@@ -27,7 +27,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.stream.Collectors;
 
 /**
  * Wrapper class for Ozone prefix path info, currently mainly target for ACL but
@@ -54,6 +53,18 @@ public final class OmPrefixInfo extends WithMetadata {
     return acls;
   }
 
+  public boolean addAcl(OzoneAcl acl) {
+    return OzoneAclUtil.addAcl(acls, acl);
+  }
+
+  public boolean removeAcl(OzoneAcl acl) {
+    return OzoneAclUtil.removeAcl(acls, acl);
+  }
+
+  public boolean setAcls(List<OzoneAcl> newAcls) {
+    return OzoneAclUtil.setAcl(acls, newAcls);
+  }
+
   /**
    * Returns the name of the prefix path.
    * @return name of the prefix path.
@@ -86,7 +97,9 @@ public final class OmPrefixInfo extends WithMetadata {
     }
 
     public Builder setAcls(List<OzoneAcl> listOfAcls) {
-      this.acls = listOfAcls;
+      if (listOfAcls != null) {
+        acls.addAll(listOfAcls);
+      }
       return this;
     }
 
@@ -114,7 +127,6 @@ public final class OmPrefixInfo extends WithMetadata {
      */
     public OmPrefixInfo build() {
       Preconditions.checkNotNull(name);
-      Preconditions.checkNotNull(acls);
       return new OmPrefixInfo(name, acls, metadata);
     }
   }
@@ -124,9 +136,10 @@ public final class OmPrefixInfo extends WithMetadata {
    */
   public PrefixInfo getProtobuf() {
     PrefixInfo.Builder pib =  PrefixInfo.newBuilder().setName(name)
-        .addAllAcls(acls.stream().map(OzoneAcl::toProtobuf)
-            .collect(Collectors.toList()))
         .addAllMetadata(KeyValueUtil.toProtobuf(metadata));
+    if (acls != null) {
+      pib.addAllAcls(OzoneAclUtil.toProtobuf(acls));
+    }
     return pib.build();
   }
 
@@ -137,13 +150,14 @@ public final class OmPrefixInfo extends WithMetadata {
    */
   public static OmPrefixInfo getFromProtobuf(PrefixInfo prefixInfo) {
     OmPrefixInfo.Builder opib = OmPrefixInfo.newBuilder()
-        .setName(prefixInfo.getName())
-        .setAcls(prefixInfo.getAclsList().stream().map(
-            OzoneAcl::fromProtobuf).collect(Collectors.toList()));
+        .setName(prefixInfo.getName());
     if (prefixInfo.getMetadataList() != null) {
       opib.addAllMetadata(KeyValueUtil
           .getFromProtobuf(prefixInfo.getMetadataList()));
     }
+    if (prefixInfo.getAclsList() != null) {
+      opib.setAcls(OzoneAclUtil.fromProtobuf(prefixInfo.getAclsList()));
+    }
     return opib.build();
   }
 
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OzoneAclUtil.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OzoneAclUtil.java
new file mode 100644
index 0000000..fd42fea
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OzoneAclUtil.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.helpers;
+
+import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
+import org.apache.hadoop.ozone.security.acl.RequestContext;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS;
+import static org.apache.hadoop.ozone.OzoneAcl.AclScope.DEFAULT;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType.GROUP;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType.USER;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.ALL;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.NONE;
+
+/**
+ * Helper class for ozone acls operations.
+ */
+public final class OzoneAclUtil {
+
+  private OzoneAclUtil(){
+  }
+
+  /**
+   * Helper function to get access acl list for current user.
+   *
+   * @param userName
+   * @param userGroups
+   * @return list of OzoneAcls
+   * */
+  public static List<OzoneAcl> getAclList(String userName,
+      List<String> userGroups, ACLType userRights, ACLType groupRights) {
+
+    List<OzoneAcl> listOfAcls = new ArrayList<>();
+
+    // User ACL.
+    listOfAcls.add(new OzoneAcl(USER, userName, userRights, ACCESS));
+    if(userGroups != null) {
+      // Group ACLs of the User.
+      userGroups.forEach((group) -> listOfAcls.add(
+          new OzoneAcl(GROUP, group, groupRights, ACCESS)));
+    }
+    return listOfAcls;
+  }
+
+  /**
+   * Check if acl right requested for given RequestContext exist
+   * in provided acl list.
+   * Acl validation rules:
+   * 1. If user/group has ALL bit set than all user should have all rights.
+   * 2. If user/group has NONE bit set than user/group will not have any right.
+   * 3. For all other individual rights individual bits should be set.
+   *
+   * @param acls
+   * @param context
+   * @return return true if acl list contains right requsted in context.
+   * */
+  public static boolean checkAclRight(List<OzoneAcl> acls,
+      RequestContext context) throws OMException {
+    String[] userGroups = context.getClientUgi().getGroupNames();
+    String userName = context.getClientUgi().getUserName();
+    ACLType aclToCheck = context.getAclRights();
+    for (OzoneAcl a : acls) {
+      if(checkAccessInAcl(a, userGroups, userName, aclToCheck)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private static boolean checkAccessInAcl(OzoneAcl a, String[] groups,
+      String username, ACLType aclToCheck) {
+    BitSet rights = a.getAclBitSet();
+    switch (a.getType()) {
+    case USER:
+      if (a.getName().equals(username)) {
+        return checkIfAclBitIsSet(aclToCheck, rights);
+      }
+      break;
+    case GROUP:
+      for (String grp : groups) {
+        if (a.getName().equals(grp)) {
+          return checkIfAclBitIsSet(aclToCheck, rights);
+        }
+      }
+      break;
+
+    default:
+      return checkIfAclBitIsSet(aclToCheck, rights);
+    }
+    return false;
+  }
+
+  /**
+   * Check if acl right requested for given RequestContext exist
+   * in provided acl list.
+   * Acl validation rules:
+   * 1. If user/group has ALL bit set than all user should have all rights.
+   * 2. If user/group has NONE bit set than user/group will not have any right.
+   * 3. For all other individual rights individual bits should be set.
+   *
+   * @param acls
+   * @param context
+   * @return return true if acl list contains right requsted in context.
+   * */
+  public static boolean checkAclRights(List<OzoneAcl> acls,
+      RequestContext context) throws OMException {
+    String[] userGroups = context.getClientUgi().getGroupNames();
+    String userName = context.getClientUgi().getUserName();
+    ACLType aclToCheck = context.getAclRights();
+    for (OzoneAcl acl : acls) {
+      if (checkAccessInAcl(acl, userGroups, userName, aclToCheck)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Helper function to check if bit for given acl is set.
+   * @param acl
+   * @param bitset
+   * @return True of acl bit is set else false.
+   * */
+  public static boolean checkIfAclBitIsSet(IAccessAuthorizer.ACLType acl,
+      BitSet bitset) {
+    if (bitset == null) {
+      return false;
+    }
+
+    return ((bitset.get(acl.ordinal())
+        || bitset.get(ALL.ordinal()))
+        && !bitset.get(NONE.ordinal()));
+  }
+
+  /**
+   * Helper function to inherit default ACL as access ACL for child object.
+   * 1. deep copy of OzoneAcl to avoid unexpected parent default ACL change
+   * 2. merge inherited access ACL with existing access ACL via
+   * OzoneUtils.addAcl().
+   * @param acls
+   * @param parentAcls
+   * @return true if acls inherited DEFAULT acls from parentAcls successfully,
+   * false otherwise.
+   */
+  public static boolean inheritDefaultAcls(List<OzoneAcl> acls,
+      List<OzoneAcl> parentAcls) {
+    List<OzoneAcl> inheritedAcls = null;
+    if (parentAcls != null && !parentAcls.isEmpty()) {
+      inheritedAcls = parentAcls.stream()
+          .filter(a -> a.getAclScope() == DEFAULT)
+          .map(acl -> new OzoneAcl(acl.getType(), acl.getName(),
+              acl.getAclBitSet(), OzoneAcl.AclScope.ACCESS))
+          .collect(Collectors.toList());
+    }
+    if (inheritedAcls != null && !inheritedAcls.isEmpty()) {
+      inheritedAcls.stream().forEach(acl -> addAcl(acls, acl));
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Convert a list of OzoneAclInfo(protoc) to list of OzoneAcl(java).
+   * @param protoAcls
+   * @return list of OzoneAcl.
+   */
+  public static List<OzoneAcl> fromProtobuf(List<OzoneAclInfo> protoAcls) {
+    return protoAcls.stream().map(acl->OzoneAcl.fromProtobuf(acl))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Convert a list of OzoneAcl(java) to list of OzoneAclInfo(protoc).
+   * @param protoAcls
+   * @return list of OzoneAclInfo.
+   */
+  public static List<OzoneAclInfo> toProtobuf(List<OzoneAcl> protoAcls) {
+    return protoAcls.stream().map(acl->OzoneAcl.toProtobuf(acl))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Add an OzoneAcl to existing list of OzoneAcls.
+   * @param existingAcls
+   * @param acl
+   * @return true if current OzoneAcls are changed, false otherwise.
+   */
+  public static boolean addAcl(List<OzoneAcl> existingAcls, OzoneAcl acl) {
+    if (existingAcls == null || acl == null) {
+      return false;
+    }
+
+    for (OzoneAcl a: existingAcls) {
+      if (a.getName().equals(acl.getName()) &&
+          a.getType().equals(acl.getType()) &&
+          a.getAclScope().equals(acl.getAclScope())) {
+        BitSet current = a.getAclBitSet();
+        BitSet original = (BitSet) current.clone();
+        current.or(acl.getAclBitSet());
+        if (current.equals(original)) {
+          return false;
+        }
+        return true;
+      }
+    }
+
+    existingAcls.add(acl);
+    return true;
+  }
+
+  /**
+   * remove OzoneAcl from existing list of OzoneAcls.
+   * @param existingAcls
+   * @param acl
+   * @return true if current OzoneAcls are changed, false otherwise.
+   */
+  public static boolean removeAcl(List<OzoneAcl> existingAcls, OzoneAcl acl) {
+    if (existingAcls == null || existingAcls.isEmpty() || acl == null) {
+      return false;
+    }
+
+    for (OzoneAcl a: existingAcls) {
+      if (a.getName().equals(acl.getName()) &&
+          a.getType().equals(acl.getType()) &&
+          a.getAclScope().equals(acl.getAclScope())) {
+        BitSet current = a.getAclBitSet();
+        BitSet original = (BitSet) current.clone();
+        current.andNot(acl.getAclBitSet());
+
+        if (current.equals(original)) {
+          return false;
+        }
+
+        if (current.isEmpty()) {
+          existingAcls.remove(a);
+        }
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Set existingAcls to newAcls.
+   * @param existingAcls
+   * @param newAcls
+   * @return true if newAcls are set successfully, false otherwise.
+   */
+  public static boolean setAcl(List<OzoneAcl> existingAcls,
+      List<OzoneAcl> newAcls) {
+    if (existingAcls == null) {
+      return false;
+    } else {
+      existingAcls.clear();
+      if (newAcls != null) {
+        existingAcls.addAll(newAcls);
+      }
+    }
+    return true;
+  }
+}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/web/utils/OzoneUtils.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/web/utils/OzoneUtils.java
index 2a0fb9d..954cab1 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/web/utils/OzoneUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/web/utils/OzoneUtils.java
@@ -23,36 +23,19 @@ import java.net.UnknownHostException;
 import java.nio.charset.Charset;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.List;
 import java.util.Locale;
 import java.util.TimeZone;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsUtils;
-import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneConsts;
 
 import com.google.common.base.Preconditions;
-import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
-import org.apache.hadoop.ozone.security.acl.RequestContext;
 import org.apache.ratis.util.TimeDuration;
 
-import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS;
-import static org.apache.hadoop.ozone.OzoneAcl.AclScope.DEFAULT;
-import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType.GROUP;
-import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType.USER;
-import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.ALL;
-import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.NONE;
-
 /**
  * Set of Utility functions used in ozone.
  */
@@ -254,145 +237,4 @@ public final class OzoneUtils {
         .toLong(TimeUnit.MILLISECONDS);
   }
 
-  /**
-   * Helper function to get access acl list for current user.
-   *
-   * @param userName
-   * @param userGroups
-   * @return listOfAcls
-   * */
-  public static List<OzoneAcl> getAclList(String userName,
-      List<String> userGroups, ACLType userRights, ACLType groupRights) {
-
-    List<OzoneAcl> listOfAcls = new ArrayList<>();
-
-    // User ACL.
-    listOfAcls.add(new OzoneAcl(USER, userName, userRights, ACCESS));
-    if(userGroups != null) {
-      // Group ACLs of the User.
-      userGroups.forEach((group) -> listOfAcls.add(
-          new OzoneAcl(GROUP, group, groupRights, ACCESS)));
-    }
-    return listOfAcls;
-  }
-
-  /**
-   * Check if acl right requested for given RequestContext exist
-   * in provided acl list.
-   * Acl validation rules:
-   * 1. If user/group has ALL bit set than all user should have all rights.
-   * 2. If user/group has NONE bit set than user/group will not have any right.
-   * 3. For all other individual rights individual bits should be set.
-   *
-   * @param acls
-   * @param context
-   * @return return true if acl list contains right requsted in context.
-   * */
-  public static boolean checkAclRight(List<OzoneAclInfo> acls,
-      RequestContext context) throws OMException {
-    String[] userGroups = context.getClientUgi().getGroupNames();
-    String userName = context.getClientUgi().getUserName();
-    ACLType aclToCheck = context.getAclRights();
-    for (OzoneAclInfo a : acls) {
-      if(checkAccessInAcl(a, userGroups, userName, aclToCheck)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private static boolean checkAccessInAcl(OzoneAclInfo a, String[] groups,
-      String username, ACLType aclToCheck) {
-    BitSet rights = BitSet.valueOf(a.getRights().toByteArray());
-    switch (a.getType()) {
-    case USER:
-      if (a.getName().equals(username)) {
-        return checkIfAclBitIsSet(aclToCheck, rights);
-      }
-      break;
-    case GROUP:
-      for (String grp : groups) {
-         // TODO: Convert ozone acls to proto map format for efficient
-        //  acl checks.
-        if (a.getName().equals(grp)) {
-          return checkIfAclBitIsSet(aclToCheck, rights);
-        }
-      }
-      break;
-
-    default:
-      return checkIfAclBitIsSet(aclToCheck, rights);
-    }
-    return false;
-  }
-
-  /**
-   * Check if acl right requested for given RequestContext exist
-   * in provided acl list.
-   * Acl validation rules:
-   * 1. If user/group has ALL bit set than all user should have all rights.
-   * 2. If user/group has NONE bit set than user/group will not have any right.
-   * 3. For all other individual rights individual bits should be set.
-   *
-   * @param acls
-   * @param context
-   * @return return true if acl list contains right requsted in context.
-   * */
-  public static boolean checkAclRights(List<OzoneAcl> acls,
-      RequestContext context) throws OMException {
-    String[] userGroups = context.getClientUgi().getGroupNames();
-    String userName = context.getClientUgi().getUserName();
-    ACLType aclToCheck = context.getAclRights();
-    // TODO: All ozone types should use one data type for acls. i.e Store
-    //  and maintain acls in proto format only.
-    for (OzoneAcl a : acls) {
-      if (checkAccessInAcl(OzoneAcl.toProtobuf(a), userGroups,
-          userName, aclToCheck)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  /**
-   * Helper function to check if bit for given acl is set.
-   * @param acl
-   * @param bitset
-   * @return True of acl bit is set else false.
-   * */
-  public static boolean checkIfAclBitIsSet(ACLType acl, BitSet bitset) {
-    if (bitset == null) {
-      return false;
-    }
-
-    return ((bitset.get(acl.ordinal())
-        || bitset.get(ALL.ordinal()))
-        && !bitset.get(NONE.ordinal()));
-  }
-
-  /**
-   * Helper function to find and return all DEFAULT acls in input list with
-   * scope changed to ACCESS.
-   * @param acls
-   *
-   * @return list of default Acls.
-   * */
-  public static Collection<OzoneAclInfo> getDefaultAclsProto(
-      List<OzoneAcl> acls) {
-    return acls.stream().filter(a -> a.getAclScope() == DEFAULT)
-        .map(OzoneAcl::toProtobufWithAccessType).collect(Collectors.toList());
-  }
-
-  /**
-   * Helper function to find and return all DEFAULT acls in input list with
-   * scope changed to ACCESS.
-   * @param acls
-   *
-   * @return list of default Acls.
-   * */
-  public static Collection<OzoneAcl> getDefaultAcls(List<OzoneAcl> acls) {
-    return acls.stream().filter(a -> a.getAclScope() == DEFAULT)
-        .collect(Collectors.toList());
-  }
-
 }
diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOzoneAclUtil.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOzoneAclUtil.java
new file mode 100644
index 0000000..b1a4e45
--- /dev/null
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOzoneAclUtil.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.helpers;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
+import org.apache.hadoop.ozone.security.acl.OzoneAclConfig;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.List;
+
+import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType.GROUP;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType.USER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for OzoneAcls utility class.
+ */
+public class TestOzoneAclUtil {
+
+  private static final List<OzoneAcl> DEFAULT_ACLS =
+      getDefaultAcls(new OzoneConfiguration());
+
+  private static final OzoneAcl USER1 = new OzoneAcl(USER, "user1",
+      ACLType.READ_ACL, ACCESS);
+
+  private static final OzoneAcl USER2 = new OzoneAcl(USER, "user2",
+      ACLType.WRITE, ACCESS);
+
+  private static final OzoneAcl GROUP1 = new OzoneAcl(GROUP, "group1",
+      ACLType.ALL, ACCESS);
+
+  @Test
+  public void testAddAcl() throws IOException {
+    List<OzoneAcl> currentAcls = getDefaultAcls(new OzoneConfiguration());
+    assertTrue(currentAcls.size() > 0);
+
+    // Add new permission to existing acl entry.
+    OzoneAcl oldAcl = currentAcls.get(0);
+    OzoneAcl newAcl = new OzoneAcl(oldAcl.getType(), oldAcl.getName(),
+        ACLType.READ_ACL, ACCESS);
+
+    addAndVerifyAcl(currentAcls, newAcl, true, DEFAULT_ACLS.size());
+    // Add same permission again and verify result
+    addAndVerifyAcl(currentAcls, newAcl, false, DEFAULT_ACLS.size());
+
+    // Add a new user acl entry.
+    addAndVerifyAcl(currentAcls, USER1, true, DEFAULT_ACLS.size() + 1);
+    // Add same acl entry again and verify result
+    addAndVerifyAcl(currentAcls, USER1, false, DEFAULT_ACLS.size() + 1);
+
+    // Add a new group acl entry.
+    addAndVerifyAcl(currentAcls, GROUP1, true, DEFAULT_ACLS.size() + 2);
+    // Add same acl entry again and verify result
+    addAndVerifyAcl(currentAcls, GROUP1, false, DEFAULT_ACLS.size() + 2);
+  }
+
+  @Test
+  public void testRemoveAcl() {
+    List<OzoneAcl> currentAcls = null;
+
+    // add/remove to/from null OzoneAcls
+    removeAndVerifyAcl(currentAcls, USER1, false, 0);
+    addAndVerifyAcl(currentAcls, USER1, false, 0);
+    removeAndVerifyAcl(currentAcls, USER1, false, 0);
+
+    currentAcls = getDefaultAcls(new OzoneConfiguration());
+    assertTrue(currentAcls.size() > 0);
+
+    // Add new permission to existing acl entru.
+    OzoneAcl oldAcl = currentAcls.get(0);
+    OzoneAcl newAcl = new OzoneAcl(oldAcl.getType(), oldAcl.getName(),
+        ACLType.READ_ACL, ACCESS);
+
+    // Remove non existing acl entry
+    removeAndVerifyAcl(currentAcls, USER1, false, DEFAULT_ACLS.size());
+
+    // Remove non existing acl permission
+    removeAndVerifyAcl(currentAcls, newAcl, false, DEFAULT_ACLS.size());
+
+    // Add new permission to existing acl entry.
+    addAndVerifyAcl(currentAcls, newAcl, true, DEFAULT_ACLS.size());
+
+    // Remove the new permission added.
+    removeAndVerifyAcl(currentAcls, newAcl, true, DEFAULT_ACLS.size());
+
+    removeAndVerifyAcl(currentAcls, oldAcl, true, DEFAULT_ACLS.size() - 1);
+  }
+
+  private void addAndVerifyAcl(List<OzoneAcl> currentAcls, OzoneAcl addedAcl,
+      boolean expectedResult, int expectedSize) {
+    assertEquals(expectedResult, OzoneAclUtil.addAcl(currentAcls, addedAcl));
+    if (currentAcls != null) {
+      boolean verified = verifyAclAdded(currentAcls, addedAcl);
+      assertTrue("addedAcl: " + addedAcl + " should exist in the" +
+          " current acls: " + currentAcls, verified);
+      assertEquals(expectedSize, currentAcls.size());
+    }
+  }
+
+  private void removeAndVerifyAcl(List<OzoneAcl> currentAcls,
+      OzoneAcl removedAcl, boolean expectedResult, int expectedSize) {
+    assertEquals(expectedResult, OzoneAclUtil.removeAcl(currentAcls,
+        removedAcl));
+    if (currentAcls != null) {
+      boolean verified = verifyAclRemoved(currentAcls, removedAcl);
+      assertTrue("removedAcl: " + removedAcl + " should not exist in the" +
+          " current acls: " + currentAcls, verified);
+      assertEquals(expectedSize, currentAcls.size());
+    }
+  }
+
+  private boolean verifyAclRemoved(List<OzoneAcl> acls, OzoneAcl removedAcl) {
+    for (OzoneAcl acl : acls) {
+      if (acl.getName().equals(removedAcl.getName()) &&
+          acl.getType().equals(removedAcl.getType()) &&
+          acl.getAclScope().equals(removedAcl.getAclScope())) {
+        BitSet temp = (BitSet) acl.getAclBitSet().clone();
+        temp.and(removedAcl.getAclBitSet());
+        return !temp.equals(removedAcl.getAclBitSet());
+      }
+    }
+    return true;
+  }
+
+  private boolean verifyAclAdded(List<OzoneAcl> acls, OzoneAcl newAcl) {
+    for (OzoneAcl acl : acls) {
+      if (acl.getName().equals(newAcl.getName()) &&
+          acl.getType().equals(newAcl.getType()) &&
+          acl.getAclScope().equals(newAcl.getAclScope())) {
+        BitSet temp = (BitSet) acl.getAclBitSet().clone();
+        temp.and(newAcl.getAclBitSet());
+        return temp.equals(newAcl.getAclBitSet());
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Helper function to get default acl list for current user.
+   *
+   * @return list of ozoneAcls.
+   * @throws IOException
+   * */
+  private static List<OzoneAcl> getDefaultAcls(OzoneConfiguration conf) {
+    List<OzoneAcl> ozoneAcls = new ArrayList<>();
+    //User ACL
+    UserGroupInformation ugi;
+    try {
+      ugi = UserGroupInformation.getCurrentUser();
+    } catch (IOException ioe) {
+      ugi = UserGroupInformation.createRemoteUser("user0");
+    }
+
+    OzoneAclConfig aclConfig = conf.getObject(OzoneAclConfig.class);
+    IAccessAuthorizer.ACLType userRights = aclConfig.getUserDefaultRights();
+    IAccessAuthorizer.ACLType groupRights = aclConfig.getGroupDefaultRights();
+
+    OzoneAclUtil.addAcl(ozoneAcls, new OzoneAcl(USER,
+        ugi.getUserName(), userRights, ACCESS));
+    //Group ACLs of the User
+    List<String> userGroups = Arrays.asList(ugi.getGroupNames());
+    userGroups.stream().forEach((group) -> OzoneAclUtil.addAcl(ozoneAcls,
+        new OzoneAcl(GROUP, group, groupRights, ACCESS)));
+    return ozoneAcls;
+  }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index ca506c6..84d17ad 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -144,14 +144,15 @@ public abstract class TestOzoneRpcClientAbstract {
   private static StorageContainerLocationProtocolClientSideTranslatorPB
       storageContainerLocationClient;
   private static String remoteUserName = "remoteUser";
+  private static String remoteGroupName = "remoteGroup";
   private static OzoneAcl defaultUserAcl = new OzoneAcl(USER, remoteUserName,
       READ, DEFAULT);
-  private static OzoneAcl defaultGroupAcl = new OzoneAcl(GROUP, remoteUserName,
+  private static OzoneAcl defaultGroupAcl = new OzoneAcl(GROUP, remoteGroupName,
       READ, DEFAULT);
   private static OzoneAcl inheritedUserAcl = new OzoneAcl(USER, remoteUserName,
       READ, ACCESS);
   private static OzoneAcl inheritedGroupAcl = new OzoneAcl(GROUP,
-      remoteUserName, READ, ACCESS);
+      remoteGroupName, READ, ACCESS);
 
   private static String scmId = UUID.randomUUID().toString();
 
@@ -2280,11 +2281,11 @@ public abstract class TestOzoneRpcClientAbstract {
       }
     }
     List<OzoneAcl> acls = store.getAcl(parentObj);
-    assertTrue("Current acls:" + StringUtils.join(",", acls) +
-            " inheritedUserAcl:" + inheritedUserAcl,
+    assertTrue("Current acls: " + StringUtils.join(",", acls) +
+            " inheritedUserAcl: " + inheritedUserAcl,
         acls.contains(defaultUserAcl));
-    assertTrue("Current acls:" + StringUtils.join(",", acls) +
-            " inheritedUserAcl:" + inheritedUserAcl,
+    assertTrue("Current acls: " + StringUtils.join(",", acls) +
+            " inheritedGroupAcl: " + inheritedGroupAcl,
         acls.contains(defaultGroupAcl));
 
     acls = store.getAcl(childObj);
@@ -2292,7 +2293,7 @@ public abstract class TestOzoneRpcClientAbstract {
             " inheritedUserAcl:" + inheritedUserAcl,
         acls.contains(inheritedUserAcl));
     assertTrue("Current acls:" + StringUtils.join(",", acls) +
-            " inheritedUserAcl:" + inheritedUserAcl,
+            " inheritedGroupAcl:" + inheritedGroupAcl,
         acls.contains(inheritedGroupAcl));
   }
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
index a4106f3..7ad1a05 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneTestUtils;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -67,6 +68,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
 import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
@@ -75,7 +77,6 @@ import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
 import org.apache.hadoop.ozone.security.acl.RequestContext;
-import org.apache.hadoop.ozone.web.utils.OzoneUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.LambdaTestUtils;
@@ -130,6 +131,7 @@ public class TestKeyManagerImpl {
     conf = new OzoneConfiguration();
     dir = GenericTestUtils.getRandomizedTestDir();
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, dir.toString());
+    conf.set(OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, "true");
     mockScmBlockLocationProtocol = Mockito.mock(ScmBlockLocationProtocol.class);
     metadataManager = new OmMetadataManagerImpl(conf);
     volumeManager = new VolumeManagerImpl(metadataManager, conf);
@@ -256,7 +258,7 @@ public class TestKeyManagerImpl {
     OmKeyArgs keyArgs = createBuilder()
         .setKeyName(KEY_NAME)
         .setDataSize(1000)
-        .setAcls(OzoneUtils.getAclList(ugi.getUserName(), ugi.getGroups(),
+        .setAcls(OzoneAclUtil.getAclList(ugi.getUserName(), ugi.getGroups(),
             ALL, ALL))
         .build();
     LambdaTestUtils.intercept(OMException.class,
@@ -960,7 +962,7 @@ public class TestKeyManagerImpl {
         .setFactor(ReplicationFactor.ONE)
         .setDataSize(0)
         .setType(ReplicationType.STAND_ALONE)
-        .setAcls(OzoneUtils.getAclList(ugi.getUserName(), ugi.getGroups(),
+        .setAcls(OzoneAclUtil.getAclList(ugi.getUserName(), ugi.getGroups(),
             ALL, ALL))
         .setVolumeName(VOLUME_NAME);
   }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java
index a1b023e..1fb3b00 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServicePort;
@@ -1435,7 +1436,7 @@ public class TestOzoneManager {
         .setFactor(HddsProtos.ReplicationFactor.ONE)
         .setDataSize(0)
         .setType(HddsProtos.ReplicationType.STAND_ALONE)
-        .setAcls(OzoneUtils.getAclList(ugi.getUserName(), ugi.getGroups(),
+        .setAcls(OzoneAclUtil.getAclList(ugi.getUserName(), ugi.getGroups(),
             ALL, ALL))
         .setVolumeName("vol1")
         .setKeyName(UUID.randomUUID().toString())
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/security/acl/TestOzoneNativeAuthorizer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/security/acl/TestOzoneNativeAuthorizer.java
index 6ecf702..301e6f5 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/security/acl/TestOzoneNativeAuthorizer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/security/acl/TestOzoneNativeAuthorizer.java
@@ -38,9 +38,9 @@ import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
-import org.apache.hadoop.ozone.web.utils.OzoneUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.BeforeClass;
@@ -178,7 +178,7 @@ public class TestOzoneNativeAuthorizer {
         .setFactor(HddsProtos.ReplicationFactor.ONE)
         .setDataSize(0)
         .setType(HddsProtos.ReplicationType.STAND_ALONE)
-        .setAcls(OzoneUtils.getAclList(ugi.getUserName(), ugi.getGroups(),
+        .setAcls(OzoneAclUtil.getAclList(ugi.getUserName(), ugi.getGroups(),
             ALL, ALL))
         .build();
 
diff --git a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
index fbbd3be..eb694c4 100644
--- a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
+++ b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.OzoneConsts;
@@ -434,7 +435,7 @@ public final class DistributedStorageHandler implements StorageHandler {
         .setDataSize(args.getSize())
         .setType(xceiverClientManager.getType())
         .setFactor(xceiverClientManager.getFactor())
-        .setAcls(OzoneUtils.getAclList(args.getUserName(),
+        .setAcls(OzoneAclUtil.getAclList(args.getUserName(),
             args.getGroups() != null ? Arrays.asList(args.getGroups()) : null,
             ACLType.ALL, ACLType.ALL))
         .build();
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
index 8837c2d..8a32dd6 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
@@ -18,7 +18,6 @@ package org.apache.hadoop.ozone.om;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.BitSet;
 import java.util.List;
 import java.util.Objects;
 
@@ -34,9 +33,9 @@ import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.ozone.security.acl.RequestContext;
-import org.apache.hadoop.ozone.web.utils.OzoneUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 
@@ -45,12 +44,10 @@ import org.iq80.leveldb.DBException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.hadoop.ozone.OzoneAcl.ZERO_BITSET;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INTERNAL_ERROR;
 import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
 import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
-import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclScope.*;
 
 /**
  * OM bucket manager.
@@ -391,6 +388,7 @@ public class BucketManagerImpl implements BucketManager {
     }
     String volume = obj.getVolumeName();
     String bucket = obj.getBucketName();
+    boolean changed = false;
     metadataManager.getLock().acquireLock(BUCKET_LOCK, volume, bucket);
     try {
       String dbBucketKey = metadataManager.getBucketKey(volume, bucket);
@@ -402,43 +400,10 @@ public class BucketManagerImpl implements BucketManager {
             BUCKET_NOT_FOUND);
       }
 
-      // Case 1: When we are adding more rights to existing user/group.
-      boolean addToExistingAcl = false;
-      for(OzoneAcl a: bucketInfo.getAcls()) {
-        if(a.getName().equals(acl.getName()) &&
-            a.getType().equals(acl.getType())) {
-          BitSet bits = (BitSet) acl.getAclBitSet().clone();
-          bits.or(a.getAclBitSet());
-
-          if (bits.equals(a.getAclBitSet())) {
-            return false;
-          }
-          a.getAclBitSet().or(acl.getAclBitSet());
-          addToExistingAcl = true;
-          break;
-        }
-      }
-
-      // Case 2: When a completely new acl is added.
-      if(!addToExistingAcl) {
-        List<OzoneAcl> newAcls = bucketInfo.getAcls();
-        if(newAcls == null) {
-          newAcls = new ArrayList<>();
-        }
-        newAcls.add(acl);
-        bucketInfo = OmBucketInfo.newBuilder()
-            .setVolumeName(bucketInfo.getVolumeName())
-            .setBucketName(bucketInfo.getBucketName())
-            .setStorageType(bucketInfo.getStorageType())
-            .setIsVersionEnabled(bucketInfo.getIsVersionEnabled())
-            .setCreationTime(bucketInfo.getCreationTime())
-            .setBucketEncryptionKey(bucketInfo.getEncryptionKeyInfo())
-            .addAllMetadata(bucketInfo.getMetadata())
-            .setAcls(newAcls)
-            .build();
+      changed = bucketInfo.addAcl(acl);
+      if (changed) {
+        metadataManager.getBucketTable().put(dbBucketKey, bucketInfo);
       }
-
-      metadataManager.getBucketTable().put(dbBucketKey, bucketInfo);
     } catch (IOException ex) {
       if (!(ex instanceof OMException)) {
         LOG.error("Add acl operation failed for bucket:{}/{} acl:{}",
@@ -449,7 +414,7 @@ public class BucketManagerImpl implements BucketManager {
       metadataManager.getLock().releaseLock(BUCKET_LOCK, volume, bucket);
     }
 
-    return true;
+    return changed;
   }
 
   /**
@@ -470,6 +435,7 @@ public class BucketManagerImpl implements BucketManager {
     }
     String volume = obj.getVolumeName();
     String bucket = obj.getBucketName();
+    boolean removed = false;
     metadataManager.getLock().acquireLock(BUCKET_LOCK, volume, bucket);
     try {
       String dbBucketKey = metadataManager.getBucketKey(volume, bucket);
@@ -480,33 +446,10 @@ public class BucketManagerImpl implements BucketManager {
         throw new OMException("Bucket " + bucket + " is not found",
             BUCKET_NOT_FOUND);
       }
-
-      boolean removed = false;
-      // When we are removing subset of rights from existing acl.
-      for(OzoneAcl a: bucketInfo.getAcls()) {
-        if(a.getName().equals(acl.getName()) &&
-            a.getType().equals(acl.getType())) {
-          BitSet bits = (BitSet) acl.getAclBitSet().clone();
-          bits.and(a.getAclBitSet());
-
-          if (bits.equals(ZERO_BITSET)) {
-            return false;
-          }
-
-          a.getAclBitSet().xor(bits);
-
-          if(a.getAclBitSet().equals(ZERO_BITSET)) {
-            bucketInfo.getAcls().remove(a);
-          }
-          removed = true;
-          break;
-        }
-      }
-
+      removed = bucketInfo.removeAcl(acl);
       if (removed) {
         metadataManager.getBucketTable().put(dbBucketKey, bucketInfo);
       }
-      return removed;
     } catch (IOException ex) {
       if (!(ex instanceof OMException)) {
         LOG.error("Remove acl operation failed for bucket:{}/{} acl:{}",
@@ -516,6 +459,7 @@ public class BucketManagerImpl implements BucketManager {
     } finally {
       metadataManager.getLock().releaseLock(BUCKET_LOCK, volume, bucket);
     }
+    return removed;
   }
 
   /**
@@ -546,18 +490,8 @@ public class BucketManagerImpl implements BucketManager {
         throw new OMException("Bucket " + bucket + " is not found",
             BUCKET_NOT_FOUND);
       }
-      OmBucketInfo updatedBucket = OmBucketInfo.newBuilder()
-          .setVolumeName(bucketInfo.getVolumeName())
-          .setBucketName(bucketInfo.getBucketName())
-          .setStorageType(bucketInfo.getStorageType())
-          .setIsVersionEnabled(bucketInfo.getIsVersionEnabled())
-          .setCreationTime(bucketInfo.getCreationTime())
-          .setBucketEncryptionKey(bucketInfo.getEncryptionKeyInfo())
-          .addAllMetadata(bucketInfo.getMetadata())
-          .setAcls(acls)
-          .build();
-
-      metadataManager.getBucketTable().put(dbBucketKey, updatedBucket);
+      bucketInfo.setAcls(acls);
+      metadataManager.getBucketTable().put(dbBucketKey, bucketInfo);
     } catch (IOException ex) {
       if (!(ex instanceof OMException)) {
         LOG.error("Set acl operation failed for bucket:{}/{} acl:{}",
@@ -567,7 +501,6 @@ public class BucketManagerImpl implements BucketManager {
     } finally {
       metadataManager.getLock().releaseLock(BUCKET_LOCK, volume, bucket);
     }
-
     return true;
   }
 
@@ -634,7 +567,7 @@ public class BucketManagerImpl implements BucketManager {
         throw new OMException("Bucket " + bucket + " is not found",
             BUCKET_NOT_FOUND);
       }
-      boolean hasAccess = OzoneUtils.checkAclRights(bucketInfo.getAcls(),
+      boolean hasAccess = OzoneAclUtil.checkAclRights(bucketInfo.getAcls(),
           context);
       LOG.debug("user:{} has access rights for bucket:{} :{} ",
           context.getClientUgi(), ozObject.getBucketName(), hasAccess);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index d3e957c..b58095f 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -20,7 +20,6 @@ import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
-import java.util.BitSet;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -33,11 +32,9 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.security.GeneralSecurityException;
 import java.security.PrivilegedExceptionAction;
-import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
-import com.google.protobuf.ByteString;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
@@ -58,9 +55,6 @@ import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto;
-import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
-import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
-import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
@@ -74,14 +68,14 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
 import org.apache.hadoop.ozone.om.helpers.OmPartInfo;
+import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclType;
+import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.ozone.security.acl.RequestContext;
-import org.apache.hadoop.ozone.web.utils.OzoneUtils;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.ozone.common.BlockGroup;
@@ -542,6 +536,7 @@ public class KeyManagerImpl implements KeyManager {
    * @param type
    * @param size
    * @param encInfo
+   * @param omBucketInfo
    * @return
    */
   private OmKeyInfo createKeyInfo(OmKeyArgs keyArgs,
@@ -562,34 +557,7 @@ public class KeyManagerImpl implements KeyManager {
         .setReplicationType(type)
         .setReplicationFactor(factor)
         .setFileEncryptionInfo(encInfo);
-    List<OzoneAclInfo> acls = new ArrayList<>();
-    if(keyArgs.getAcls() != null) {
-      acls.addAll(keyArgs.getAcls().stream().map(a ->
-          OzoneAcl.toProtobuf(a)).collect(Collectors.toList()));
-    }
-
-    // Inherit DEFAULT acls from prefix.
-    boolean prefixParentFound = false;
-    if(prefixManager != null) {
-      List<OmPrefixInfo> prefixList = prefixManager.getLongestPrefixPath(
-          OZONE_URI_DELIMITER +
-              keyArgs.getVolumeName() + OZONE_URI_DELIMITER +
-              keyArgs.getBucketName() + OZONE_URI_DELIMITER +
-              keyArgs.getKeyName());
-
-      if(prefixList.size() > 0) {
-        // Add all acls from direct parent to key.
-        OmPrefixInfo prefixInfo = prefixList.get(prefixList.size() - 1);
-        if(prefixInfo  != null) {
-          acls.addAll(OzoneUtils.getDefaultAclsProto(prefixInfo.getAcls()));
-          prefixParentFound = true;
-        }
-      }
-    }
-    if(!prefixParentFound && omBucketInfo != null) {
-      acls.addAll(OzoneUtils.getDefaultAclsProto(omBucketInfo.getAcls()));
-    }
-    builder.setAcls(acls);
+    builder.setAcls(getAclsForKey(keyArgs, omBucketInfo));
     return builder.build();
   }
 
@@ -926,7 +894,7 @@ public class KeyManagerImpl implements KeyManager {
           .setReplicationFactor(keyArgs.getFactor())
           .setOmKeyLocationInfos(Collections.singletonList(
               new OmKeyLocationInfoGroup(0, locations)))
-          .setAcls(getAclsForKey(keyArgs, null, bucketInfo))
+          .setAcls(getAclsForKey(keyArgs, bucketInfo))
           .build();
       DBStore store = metadataManager.getStore();
       try (BatchOperation batch = store.initBatchOperation()) {
@@ -951,15 +919,15 @@ public class KeyManagerImpl implements KeyManager {
     }
   }
 
-  private List<OzoneAclInfo> getAclsForKey(OmKeyArgs keyArgs,
-      OmVolumeArgs volArgs, OmBucketInfo bucketInfo) {
-    List<OzoneAclInfo> acls = new ArrayList<>(keyArgs.getAcls().size());
+  private List<OzoneAcl> getAclsForKey(OmKeyArgs keyArgs,
+      OmBucketInfo bucketInfo) {
+    List<OzoneAcl> acls = new ArrayList<>();
 
-    keyArgs.getAcls().stream().map(OzoneAcl::toProtobuf).
-        collect(Collectors.toList());
+    if(keyArgs.getAcls() != null) {
+      acls.addAll(keyArgs.getAcls());
+    }
 
     // Inherit DEFAULT acls from prefix.
-    boolean prefixParentFound = false;
     if(prefixManager != null) {
       List<OmPrefixInfo> prefixList = prefixManager.getLongestPrefixPath(
           OZONE_URI_DELIMITER +
@@ -971,21 +939,22 @@ public class KeyManagerImpl implements KeyManager {
         // Add all acls from direct parent to key.
         OmPrefixInfo prefixInfo = prefixList.get(prefixList.size() - 1);
         if(prefixInfo  != null) {
-          acls.addAll(OzoneUtils.getDefaultAclsProto(prefixInfo.getAcls()));
-          prefixParentFound = true;
+          if (OzoneAclUtil.inheritDefaultAcls(acls, prefixInfo.getAcls())) {
+            return acls;
+          }
         }
       }
     }
 
     // Inherit DEFAULT acls from bucket only if DEFAULT acls for
     // prefix are not set.
-    if (!prefixParentFound && bucketInfo != null) {
-      acls.addAll(bucketInfo.getAcls().stream().filter(a -> a.getAclScope()
-          .equals(OzoneAcl.AclScope.DEFAULT))
-          .map(OzoneAcl::toProtobufWithAccessType)
-          .collect(Collectors.toList()));
+    if (bucketInfo != null) {
+      if (OzoneAclUtil.inheritDefaultAcls(acls, bucketInfo.getAcls())) {
+        return acls;
+      }
     }
 
+    // TODO: do we need to further fallback to volume default ACL
     return acls;
   }
 
@@ -1201,8 +1170,7 @@ public class KeyManagerImpl implements KeyManager {
             .setDataSize(size)
             .setOmKeyLocationInfos(
                 Collections.singletonList(keyLocationInfoGroup))
-            .setAcls(omKeyArgs.getAcls().stream().map(a ->
-                OzoneAcl.toProtobuf(a)).collect(Collectors.toList())).build();
+            .setAcls(omKeyArgs.getAcls()).build();
       } else {
         // Already a version exists, so we should add it as a new version.
         // But now as versioning is not supported, just following the commit
@@ -1394,6 +1362,8 @@ public class KeyManagerImpl implements KeyManager {
     String volume = obj.getVolumeName();
     String bucket = obj.getBucketName();
     String keyName = obj.getKeyName();
+    boolean changed = false;
+
 
     metadataManager.getLock().acquireLock(BUCKET_LOCK, volume, bucket);
     try {
@@ -1404,43 +1374,13 @@ public class KeyManagerImpl implements KeyManager {
         throw new OMException("Key not found. Key:" + objectKey, KEY_NOT_FOUND);
       }
 
-      List<OzoneAclInfo> newAcls = new ArrayList<>(keyInfo.getAcls());
-      OzoneAclInfo newAcl = null;
-      for(OzoneAclInfo a: keyInfo.getAcls()) {
-        if (a.getName().equals(acl.getName()) &&
-            a.getType().equals(OzoneAclType.valueOf(acl.getType().name()))) {
-          BitSet currentAcls = BitSet.valueOf(a.getRights().toByteArray());
-          currentAcls.or(acl.getAclBitSet());
-
-          newAcl = OzoneAclInfo.newBuilder()
-              .setType(a.getType())
-              .setName(a.getName())
-              .setAclScope(a.getAclScope())
-              .setRights(ByteString.copyFrom(currentAcls.toByteArray()))
-              .build();
-          newAcls.remove(a);
-          newAcls.add(newAcl);
-          break;
-        }
+      if (keyInfo.getAcls() == null) {
+        keyInfo.setAcls(new ArrayList<>());
       }
-      if(newAcl == null) {
-        newAcls.add(OzoneAcl.toProtobuf(acl));
+      changed = keyInfo.addAcl(acl);
+      if (changed) {
+        metadataManager.getKeyTable().put(objectKey, keyInfo);
       }
-
-      OmKeyInfo newObj = new OmKeyInfo.Builder()
-          .setBucketName(keyInfo.getBucketName())
-          .setKeyName(keyInfo.getKeyName())
-          .setReplicationFactor(keyInfo.getFactor())
-          .setReplicationType(keyInfo.getType())
-          .setVolumeName(keyInfo.getVolumeName())
-          .setOmKeyLocationInfos(keyInfo.getKeyLocationVersions())
-          .setCreationTime(keyInfo.getCreationTime())
-          .setModificationTime(keyInfo.getModificationTime())
-          .setAcls(newAcls)
-          .setDataSize(keyInfo.getDataSize())
-          .setFileEncryptionInfo(keyInfo.getFileEncryptionInfo())
-          .build();
-      metadataManager.getKeyTable().put(objectKey, newObj);
     } catch (IOException ex) {
       if (!(ex instanceof OMException)) {
         LOG.error("Add acl operation failed for key:{}/{}/{}", volume,
@@ -1450,7 +1390,7 @@ public class KeyManagerImpl implements KeyManager {
     } finally {
       metadataManager.getLock().releaseLock(BUCKET_LOCK, volume, bucket);
     }
-    return true;
+    return changed;
   }
 
   /**
@@ -1467,6 +1407,7 @@ public class KeyManagerImpl implements KeyManager {
     String volume = obj.getVolumeName();
     String bucket = obj.getBucketName();
     String keyName = obj.getKeyName();
+    boolean changed = false;
 
     metadataManager.getLock().acquireLock(BUCKET_LOCK, volume, bucket);
     try {
@@ -1477,50 +1418,10 @@ public class KeyManagerImpl implements KeyManager {
         throw new OMException("Key not found. Key:" + objectKey, KEY_NOT_FOUND);
       }
 
-      List<OzoneAclInfo> newAcls = new ArrayList<>(keyInfo.getAcls());
-      OzoneAclInfo newAcl = OzoneAcl.toProtobuf(acl);
-
-      if(newAcls.contains(OzoneAcl.toProtobuf(acl))) {
-        newAcls.remove(newAcl);
-      } else {
-        // Acl to be removed might be a subset of existing acls.
-        for(OzoneAclInfo a: keyInfo.getAcls()) {
-          if (a.getName().equals(acl.getName()) &&
-              a.getType().equals(OzoneAclType.valueOf(acl.getType().name()))) {
-            BitSet currentAcls = BitSet.valueOf(a.getRights().toByteArray());
-            acl.getAclBitSet().xor(currentAcls);
-            currentAcls.and(acl.getAclBitSet());
-            newAcl = OzoneAclInfo.newBuilder()
-                .setType(a.getType())
-                .setName(a.getName())
-                .setAclScope(a.getAclScope())
-                .setRights(ByteString.copyFrom(currentAcls.toByteArray()))
-                .build();
-            newAcls.remove(a);
-            newAcls.add(newAcl);
-            break;
-          }
-        }
-        if(newAcl == null) {
-          newAcls.add(OzoneAcl.toProtobuf(acl));
-        }
+      changed = keyInfo.removeAcl(acl);
+      if (changed) {
+        metadataManager.getKeyTable().put(objectKey, keyInfo);
       }
-
-      OmKeyInfo newObj = new OmKeyInfo.Builder()
-          .setBucketName(keyInfo.getBucketName())
-          .setKeyName(keyInfo.getKeyName())
-          .setReplicationFactor(keyInfo.getFactor())
-          .setReplicationType(keyInfo.getType())
-          .setVolumeName(keyInfo.getVolumeName())
-          .setOmKeyLocationInfos(keyInfo.getKeyLocationVersions())
-          .setCreationTime(keyInfo.getCreationTime())
-          .setModificationTime(keyInfo.getModificationTime())
-          .setAcls(newAcls)
-          .setDataSize(keyInfo.getDataSize())
-          .setFileEncryptionInfo(keyInfo.getFileEncryptionInfo())
-          .build();
-
-      metadataManager.getKeyTable().put(objectKey, newObj);
     } catch (IOException ex) {
       if (!(ex instanceof OMException)) {
         LOG.error("Remove acl operation failed for key:{}/{}/{}", volume,
@@ -1530,7 +1431,7 @@ public class KeyManagerImpl implements KeyManager {
     } finally {
       metadataManager.getLock().releaseLock(BUCKET_LOCK, volume, bucket);
     }
-    return true;
+    return changed;
   }
 
   /**
@@ -1547,6 +1448,7 @@ public class KeyManagerImpl implements KeyManager {
     String volume = obj.getVolumeName();
     String bucket = obj.getBucketName();
     String keyName = obj.getKeyName();
+    boolean changed = false;
 
     metadataManager.getLock().acquireLock(BUCKET_LOCK, volume, bucket);
     try {
@@ -1557,25 +1459,11 @@ public class KeyManagerImpl implements KeyManager {
         throw new OMException("Key not found. Key:" + objectKey, KEY_NOT_FOUND);
       }
 
-      List<OzoneAclInfo> newAcls = new ArrayList<>();
-      for (OzoneAcl a : acls) {
-        newAcls.add(OzoneAcl.toProtobuf(a));
-      }
-      OmKeyInfo newObj = new OmKeyInfo.Builder()
-          .setBucketName(keyInfo.getBucketName())
-          .setKeyName(keyInfo.getKeyName())
-          .setReplicationFactor(keyInfo.getFactor())
-          .setReplicationType(keyInfo.getType())
-          .setVolumeName(keyInfo.getVolumeName())
-          .setOmKeyLocationInfos(keyInfo.getKeyLocationVersions())
-          .setCreationTime(keyInfo.getCreationTime())
-          .setModificationTime(keyInfo.getModificationTime())
-          .setAcls(newAcls)
-          .setDataSize(keyInfo.getDataSize())
-          .setFileEncryptionInfo(keyInfo.getFileEncryptionInfo())
-          .build();
+      changed = keyInfo.setAcls(acls);
 
-      metadataManager.getKeyTable().put(objectKey, newObj);
+      if (changed) {
+        metadataManager.getKeyTable().put(objectKey, keyInfo);
+      }
     } catch (IOException ex) {
       if (!(ex instanceof OMException)) {
         LOG.error("Set acl operation failed for key:{}/{}/{}", volume,
@@ -1585,7 +1473,7 @@ public class KeyManagerImpl implements KeyManager {
     } finally {
       metadataManager.getLock().releaseLock(BUCKET_LOCK, volume, bucket);
     }
-    return true;
+    return changed;
   }
 
   /**
@@ -1610,11 +1498,7 @@ public class KeyManagerImpl implements KeyManager {
         throw new OMException("Key not found. Key:" + objectKey, KEY_NOT_FOUND);
       }
 
-      List<OzoneAcl> acls = new ArrayList<>();
-      for (OzoneAclInfo a : keyInfo.getAcls()) {
-        acls.add(OzoneAcl.fromProtobuf(a));
-      }
-      return acls;
+      return keyInfo.getAcls();
     } catch (IOException ex) {
       if (!(ex instanceof OMException)) {
         LOG.error("Get acl operation failed for key:{}/{}/{}", volume,
@@ -1675,7 +1559,8 @@ public class KeyManagerImpl implements KeyManager {
             objectKey, KEY_NOT_FOUND);
       }
 
-      boolean hasAccess = OzoneUtils.checkAclRight(keyInfo.getAcls(), context);
+      boolean hasAccess = OzoneAclUtil.checkAclRight(
+          keyInfo.getAcls(), context);
       LOG.debug("user:{} has access rights for key:{} :{} ",
           context.getClientUgi(), ozObject.getKeyName(), hasAccess);
       return hasAccess;
@@ -1838,8 +1723,7 @@ public class KeyManagerImpl implements KeyManager {
         .setReplicationType(ReplicationType.RATIS)
         .setReplicationFactor(ReplicationFactor.ONE)
         .setFileEncryptionInfo(encInfo)
-        .setAcls(acls.stream().map(a ->
-            OzoneAcl.toProtobuf(a)).collect(Collectors.toList()))
+        .setAcls(acls)
         .build();
   }
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PrefixManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PrefixManagerImpl.java
index 2c27f9f..ed4c67a 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PrefixManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PrefixManagerImpl.java
@@ -21,24 +21,22 @@ import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
+import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.ozone.security.acl.RequestContext;
 import org.apache.hadoop.ozone.util.RadixNode;
 import org.apache.hadoop.ozone.util.RadixTree;
-import org.apache.hadoop.ozone.web.utils.OzoneUtils;
-import org.apache.hadoop.utils.db.*;
 import org.apache.hadoop.utils.db.Table.KeyValue;
+import org.apache.hadoop.utils.db.TableIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.BitSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
-import static org.apache.hadoop.ozone.OzoneAcl.ZERO_BITSET;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.PREFIX_NOT_FOUND;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_FOUND;
@@ -236,7 +234,7 @@ public class PrefixManagerImpl implements PrefixManager {
         RadixNode<OmPrefixInfo> lastNode =
             prefixTree.getLastNodeInPrefixPath(prefixPath);
         if (lastNode != null && lastNode.getValue() != null) {
-          boolean hasAccess = OzoneUtils.checkAclRights(lastNode.getValue().
+          boolean hasAccess = OzoneAclUtil.checkAclRights(lastNode.getValue().
               getAcls(), context);
           LOG.debug("user:{} has access rights for ozObj:{} ::{} ",
               context.getClientUgi(), ozObject, hasAccess);
@@ -306,110 +304,33 @@ public class PrefixManagerImpl implements PrefixManager {
   public OMPrefixAclOpResult addAcl(OzoneObj ozoneObj, OzoneAcl ozoneAcl,
       OmPrefixInfo prefixInfo) throws IOException {
 
-    List<OzoneAcl> ozoneAclList = null;
-    if (prefixInfo != null) {
-      ozoneAclList = prefixInfo.getAcls();
+    if (prefixInfo == null) {
+      prefixInfo = new OmPrefixInfo.Builder().setName(ozoneObj
+          .getPath()).build();
     }
+    boolean changed = prefixInfo.addAcl(ozoneAcl);
+    if (changed) {
+      // update the in-memory prefix tree
+      prefixTree.insert(ozoneObj.getPath(), prefixInfo);
 
-    if (ozoneAclList == null) {
-      ozoneAclList = new ArrayList<>();
-      ozoneAclList.add(ozoneAcl);
-    } else {
-      boolean addToExistingAcl = false;
-      for(OzoneAcl existingAcl: ozoneAclList) {
-        if(existingAcl.getName().equals(ozoneAcl.getName()) &&
-            existingAcl.getType().equals(ozoneAcl.getType())) {
-
-          BitSet bits = (BitSet) ozoneAcl.getAclBitSet().clone();
-
-          // We need to do "or" before comparision because think of a case like
-          // existing acl is 777 and newly added acl is 444, we have already
-          // that acl set. In this case if we do direct check they will not
-          // be equal, but if we do or and then check, we shall know it
-          // has acl's already set or not.
-          bits.or(existingAcl.getAclBitSet());
-
-          if (bits.equals(existingAcl.getAclBitSet())) {
-            return new OMPrefixAclOpResult(null, false);
-          } else {
-            existingAcl.getAclBitSet().or(ozoneAcl.getAclBitSet());
-            addToExistingAcl = true;
-            break;
-          }
-        }
+      if (!isRatisEnabled) {
+        metadataManager.getPrefixTable().put(ozoneObj.getPath(), prefixInfo);
       }
-      if (!addToExistingAcl) {
-        ozoneAclList.add(ozoneAcl);
-      }
-    }
-    OmPrefixInfo.Builder upiBuilder = OmPrefixInfo.newBuilder();
-    upiBuilder.setName(ozoneObj.getPath()).setAcls(ozoneAclList);
-    if (prefixInfo != null && prefixInfo.getMetadata() != null) {
-      upiBuilder.addAllMetadata(prefixInfo.getMetadata());
-    }
-    prefixInfo = upiBuilder.build();
-
-    // update the in-memory prefix tree
-    prefixTree.insert(ozoneObj.getPath(), prefixInfo);
-
-    if (!isRatisEnabled) {
-      metadataManager.getPrefixTable().put(ozoneObj.getPath(), prefixInfo);
     }
-    return new OMPrefixAclOpResult(prefixInfo, true);
+    return new OMPrefixAclOpResult(prefixInfo, changed);
   }
 
   public OMPrefixAclOpResult removeAcl(OzoneObj ozoneObj, OzoneAcl ozoneAcl,
       OmPrefixInfo prefixInfo) throws IOException {
-    List<OzoneAcl> list = null;
-    if (prefixInfo != null) {
-      list = prefixInfo.getAcls();
-    }
-
-    if (list == null) {
-      return new OMPrefixAclOpResult(null, false);
-    }
-
     boolean removed = false;
-    for (OzoneAcl existingAcl: list) {
-      if (existingAcl.getName().equals(ozoneAcl.getName())
-          && existingAcl.getType() == ozoneAcl.getType()) {
-        BitSet bits = (BitSet) ozoneAcl.getAclBitSet().clone();
-        bits.and(existingAcl.getAclBitSet());
-
-        // This happens when the acl bitset is not existing for current name
-        // and type.
-        // Like a case we have 444 permission, 333 is asked to removed.
-        if (bits.equals(ZERO_BITSET)) {
-          removed = false;
-          break;
-        }
-
-        // We have some matching. Remove them.
-        existingAcl.getAclBitSet().xor(bits);
-
-        // If existing acl has same bitset as passed acl bitset, remove that
-        // acl from the list
-        if (existingAcl.getAclBitSet().equals(ZERO_BITSET)) {
-          list.remove(existingAcl);
-        }
-        removed = true;
-        break;
-      }
+    if (prefixInfo != null) {
+      removed = prefixInfo.removeAcl(ozoneAcl);
     }
 
     // Nothing is matching to remove.
-    if (!removed) {
-      return new OMPrefixAclOpResult(null, false);
-    } else {
-      OmPrefixInfo.Builder upiBuilder = OmPrefixInfo.newBuilder();
-      upiBuilder.setName(ozoneObj.getPath()).setAcls(list);
-      if (prefixInfo != null && prefixInfo.getMetadata() != null) {
-        upiBuilder.addAllMetadata(prefixInfo.getMetadata());
-      }
-      prefixInfo = upiBuilder.build();
-
+    if (removed) {
       // Update in-memory prefix tree.
-      if (list.isEmpty()) {
+      if (prefixInfo.getAcls().isEmpty()) {
         prefixTree.removePrefixPath(ozoneObj.getPath());
         if (!isRatisEnabled) {
           metadataManager.getPrefixTable().delete(ozoneObj.getPath());
@@ -420,58 +341,51 @@ public class PrefixManagerImpl implements PrefixManager {
           metadataManager.getPrefixTable().put(ozoneObj.getPath(), prefixInfo);
         }
       }
-      return new OMPrefixAclOpResult(prefixInfo, true);
     }
+    return new OMPrefixAclOpResult(prefixInfo, removed);
   }
 
   public OMPrefixAclOpResult setAcl(OzoneObj ozoneObj, List<OzoneAcl> ozoneAcls,
       OmPrefixInfo prefixInfo) throws IOException {
-    OmPrefixInfo.Builder upiBuilder = OmPrefixInfo.newBuilder();
-    List<OzoneAcl> aclsToBeSet = new ArrayList<>(ozoneAcls.size());
-    aclsToBeSet.addAll(ozoneAcls);
-    upiBuilder.setName(ozoneObj.getPath());
-    if (prefixInfo != null && prefixInfo.getMetadata() != null) {
-      upiBuilder.addAllMetadata(prefixInfo.getMetadata());
+    if (prefixInfo == null) {
+      prefixInfo = new OmPrefixInfo.Builder().setName(ozoneObj
+          .getPath()).build();
     }
 
-    // Inherit DEFAULT acls from prefix.
-    boolean prefixParentFound = false;
-    List<OmPrefixInfo> prefixList = getLongestPrefixPathHelper(
-        prefixTree.getLongestPrefix(ozoneObj.getPath()));
-
-    if (prefixList.size() > 0) {
-      // Add all acls from direct parent to key.
-      OmPrefixInfo parentPrefixInfo = prefixList.get(prefixList.size() - 1);
-      if (parentPrefixInfo != null) {
-        aclsToBeSet.addAll(OzoneUtils.getDefaultAcls(
-            parentPrefixInfo.getAcls()));
-        prefixParentFound = true;
+    boolean changed = prefixInfo.setAcls(ozoneAcls);
+    if (changed) {
+      List<OzoneAcl> aclsToBeSet = prefixInfo.getAcls();
+      // Inherit DEFAULT acls from prefix.
+      boolean prefixParentFound = false;
+      List<OmPrefixInfo> prefixList = getLongestPrefixPathHelper(
+          prefixTree.getLongestPrefix(ozoneObj.getPath()));
+
+      if (prefixList.size() > 0) {
+        // Add all acls from direct parent to key.
+        OmPrefixInfo parentPrefixInfo = prefixList.get(prefixList.size() - 1);
+        if (parentPrefixInfo != null) {
+          prefixParentFound = OzoneAclUtil.inheritDefaultAcls(aclsToBeSet,
+              parentPrefixInfo.getAcls());
+        }
       }
-    }
 
-    // If no parent prefix is found inherit DEFULT acls from bucket.
-    if (!prefixParentFound) {
-      String bucketKey = metadataManager.getBucketKey(ozoneObj.getVolumeName(),
-          ozoneObj.getBucketName());
-      OmBucketInfo bucketInfo = metadataManager.getBucketTable().
-          get(bucketKey);
-      if (bucketInfo != null) {
-        bucketInfo.getAcls().forEach(a -> {
-          if (a.getAclScope().equals(OzoneAcl.AclScope.DEFAULT)) {
-            aclsToBeSet.add(new OzoneAcl(a.getType(), a.getName(),
-                a.getAclBitSet(), OzoneAcl.AclScope.ACCESS));
-          }
-        });
+      // If no parent prefix is found inherit DEFAULT acls from bucket.
+      if (!prefixParentFound) {
+        String bucketKey = metadataManager.getBucketKey(ozoneObj
+            .getVolumeName(), ozoneObj.getBucketName());
+        OmBucketInfo bucketInfo = metadataManager.getBucketTable().
+            get(bucketKey);
+        if (bucketInfo != null) {
+          OzoneAclUtil.inheritDefaultAcls(aclsToBeSet, bucketInfo.getAcls());
+        }
       }
-    }
-
-    prefixInfo = upiBuilder.setAcls(aclsToBeSet).build();
 
-    prefixTree.insert(ozoneObj.getPath(), prefixInfo);
-    if (!isRatisEnabled) {
-      metadataManager.getPrefixTable().put(ozoneObj.getPath(), prefixInfo);
+      prefixTree.insert(ozoneObj.getPath(), prefixInfo);
+      if (!isRatisEnabled) {
+        metadataManager.getPrefixTable().put(ozoneObj.getPath(), prefixInfo);
+      }
     }
-    return new OMPrefixAclOpResult(prefixInfo, true);
+    return new OMPrefixAclOpResult(prefixInfo, changed);
   }
 
   /**
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
index beafeab..a85ea8b 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
@@ -26,6 +26,11 @@ import java.util.Map;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
+import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,10 +43,6 @@ import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
-import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
 import org.apache.hadoop.ozone.om.request.key.OMKeyRequest;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import org.apache.hadoop.ozone.om.response.file.OMDirectoryCreateResponse;
@@ -237,7 +238,7 @@ public class OMDirectoryCreateRequest extends OMKeyRequest {
         .setReplicationType(HddsProtos.ReplicationType.RATIS)
         .setReplicationFactor(HddsProtos.ReplicationFactor.ONE)
         .setFileEncryptionInfo(encryptionInfo.orNull())
-        .setAcls(keyArgs.getAclsList())
+        .setAcls(OzoneAclUtil.fromProtobuf(keyArgs.getAclsList()))
         .build();
   }
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index 266abf5..a32c0a7 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@ -32,6 +32,12 @@ import java.util.stream.Collectors;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,11 +57,6 @@ import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.ScmClient;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import org.apache.hadoop.ozone.om.response.file.OMFileCreateResponse;
@@ -352,7 +353,7 @@ public abstract class OMKeyRequest extends OMClientRequest {
         .setReplicationFactor(factor)
         .setFileEncryptionInfo(encInfo);
     if(keyArgs.getAclsList() != null) {
-      builder.setAcls(keyArgs.getAclsList());
+      builder.setAcls(OzoneAclUtil.fromProtobuf(keyArgs.getAclsList()));
     }
     return builder.build();
   }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyAddAclRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyAddAclRequest.java
index a129334..8d69a24 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyAddAclRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyAddAclRequest.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.List;
 
 import com.google.common.collect.Lists;
+import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.response.key.acl.OMKeyAclResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
@@ -29,7 +30,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AddAclResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
@@ -43,14 +43,15 @@ public class OMKeyAddAclRequest extends OMKeyAclRequest {
       LoggerFactory.getLogger(OMKeyAddAclRequest.class);
 
   private String path;
-  private List<OzoneAclInfo> ozoneAcls;
+  private List<OzoneAcl> ozoneAcls;
 
   public OMKeyAddAclRequest(OMRequest omRequest) {
     super(omRequest);
     OzoneManagerProtocolProtos.AddAclRequest addAclRequest =
         getOmRequest().getAddAclRequest();
     path = addAclRequest.getObj().getPath();
-    ozoneAcls = Lists.newArrayList(addAclRequest.getAcl());
+    ozoneAcls = Lists.newArrayList(
+        OzoneAcl.fromProtobuf(addAclRequest.getAcl()));
   }
 
   @Override
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyRemoveAclRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyRemoveAclRequest.java
index 81d59d0..0bd81d3 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyRemoveAclRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyRemoveAclRequest.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.List;
 
 import com.google.common.collect.Lists;
+import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.response.key.acl.OMKeyAclResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
@@ -29,7 +30,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RemoveAclResponse;
@@ -43,14 +43,15 @@ public class OMKeyRemoveAclRequest extends OMKeyAclRequest {
       LoggerFactory.getLogger(OMKeyAddAclRequest.class);
 
   private String path;
-  private List<OzoneAclInfo> ozoneAcls;
+  private List<OzoneAcl> ozoneAcls;
 
   public OMKeyRemoveAclRequest(OMRequest omRequest) {
     super(omRequest);
     OzoneManagerProtocolProtos.RemoveAclRequest removeAclRequest =
         getOmRequest().getRemoveAclRequest();
     path = removeAclRequest.getObj().getPath();
-    ozoneAcls = Lists.newArrayList(removeAclRequest.getAcl());
+    ozoneAcls = Lists.newArrayList(
+        OzoneAcl.fromProtobuf(removeAclRequest.getAcl()));
   }
 
   @Override
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeySetAclRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeySetAclRequest.java
index 9770608..24d46f8 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeySetAclRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeySetAclRequest.java
@@ -21,14 +21,16 @@ package org.apache.hadoop.ozone.om.request.key.acl;
 import java.io.IOException;
 import java.util.List;
 
+import com.google.common.collect.Lists;
+import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
 import org.apache.hadoop.ozone.om.response.key.acl.OMKeyAclResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetAclResponse;
@@ -42,14 +44,15 @@ public class OMKeySetAclRequest extends OMKeyAclRequest {
       LoggerFactory.getLogger(OMKeyAddAclRequest.class);
 
   private String path;
-  private List<OzoneAclInfo> ozoneAcls;
+  private List<OzoneAcl> ozoneAcls;
 
   public OMKeySetAclRequest(OMRequest omRequest) {
     super(omRequest);
     OzoneManagerProtocolProtos.SetAclRequest setAclRequest =
         getOmRequest().getSetAclRequest();
     path = setAclRequest.getObj().getPath();
-    ozoneAcls = setAclRequest.getAclList();
+    ozoneAcls = Lists.newArrayList(
+        OzoneAclUtil.fromProtobuf(setAclRequest.getAclList()));
   }
 
   @Override
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
index 94637be..181d79c 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.om.request.key.OMKeyRequest;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
@@ -163,7 +164,7 @@ public class S3InitiateMultipartUploadRequest extends OMKeyRequest {
           .setReplicationFactor(keyArgs.getFactor())
           .setOmKeyLocationInfos(Collections.singletonList(
               new OmKeyLocationInfoGroup(0, new ArrayList<>())))
-          .setAcls(keyArgs.getAclsList())
+          .setAcls(OzoneAclUtil.fromProtobuf(keyArgs.getAclsList()))
           .build();
 
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
index f263c79..cfcedc4 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
+import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.om.request.key.OMKeyRequest;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
@@ -231,7 +232,8 @@ public class S3MultipartUploadCompleteRequest extends OMKeyRequest {
             .setDataSize(size)
             .setOmKeyLocationInfos(
                 Collections.singletonList(keyLocationInfoGroup))
-            .setAcls(keyArgs.getAclsList()).build();
+            .setAcls(OzoneAclUtil.fromProtobuf(keyArgs.getAclsList()))
+            .build();
       } else {
         // Already a version exists, so we should add it as a new version.
         // But now as versioning is not supported, just following the commit
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
index c2d6227..46250fc 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
@@ -403,10 +403,11 @@ public class OzoneManagerRequestHandler implements RequestHandler {
 
   private GetAclResponse getAcl(GetAclRequest req) throws IOException {
     List<OzoneAclInfo> acls = new ArrayList<>();
-
     List<OzoneAcl> aclList =
         impl.getAcl(OzoneObjInfo.fromProtobuf(req.getObj()));
-    aclList.forEach(a -> acls.add(OzoneAcl.toProtobuf(a)));
+    if (aclList != null) {
+      aclList.forEach(a -> acls.add(OzoneAcl.toProtobuf(a)));
+    }
     return GetAclResponse.newBuilder().addAllAcls(acls).build();
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org