You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2019/08/10 03:32:10 UTC

[hadoop] branch trunk updated: HDDS-1895. Support Key ACL operations for OM HA. (#1230)

This is an automated email from the ASF dual-hosted git repository.

arp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bd4be6e  HDDS-1895. Support Key ACL operations for OM HA. (#1230)
bd4be6e is described below

commit bd4be6e1682a154b07580b12a48d4e4346cb046e
Author: Bharat Viswanadham <bh...@apache.org>
AuthorDate: Fri Aug 9 20:32:01 2019 -0700

    HDDS-1895. Support Key ACL operations for OM HA. (#1230)
---
 .../apache/hadoop/ozone/om/helpers/OmKeyInfo.java  | 150 ++++++++++++++--
 .../apache/hadoop/ozone/om/TestOzoneManagerHA.java | 196 ++++++++++++++-------
 .../om/ratis/utils/OzoneManagerRatisUtils.java     |   9 +
 .../ozone/om/request/key/acl/OMKeyAclRequest.java  | 173 ++++++++++++++++++
 .../om/request/key/acl/OMKeyAddAclRequest.java     | 107 +++++++++++
 .../om/request/key/acl/OMKeyRemoveAclRequest.java  | 108 ++++++++++++
 .../om/request/key/acl/OMKeySetAclRequest.java     | 105 +++++++++++
 .../ozone/om/request/key/acl/package-info.java     |  24 +++
 .../hadoop/ozone/om/request/util/ObjectParser.java |   2 +-
 .../om/response/key/acl/OMKeyAclResponse.java      |  63 +++++++
 .../ozone/om/response/key/acl/package-info.java    |  24 +++
 11 files changed, 885 insertions(+), 76 deletions(-)

diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
index 80c9f58..17aabd2 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
@@ -19,12 +19,14 @@ package org.apache.hadoop.ozone.om.helpers;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
+import com.google.protobuf.ByteString;
 import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
@@ -34,6 +36,8 @@ import org.apache.hadoop.util.Time;
 
 import com.google.common.base.Preconditions;
 
+import static org.apache.hadoop.ozone.OzoneAcl.ZERO_BITSET;
+
 /**
  * Args for key block. The block instance for the key requested in putKey.
  * This is returned from OM to client, and client use class to talk to
@@ -236,6 +240,119 @@ public final class OmKeyInfo extends WithMetadata {
   }
 
   /**
+   * Add an ozoneAcl to list of existing Acl set.
+   * @param ozoneAcl
+   * @return true - if successfully added, false if not added or acl is
+   * already existing in the acl list.
+   */
+  public boolean addAcl(OzoneAclInfo ozoneAcl) {
+    // Case 1: When we are adding more rights to existing user/group.
+    boolean addToExistingAcl = false;
+    for(OzoneAclInfo existingAcl: getAcls()) {
+      if(existingAcl.getName().equals(ozoneAcl.getName()) &&
+          existingAcl.getType().equals(ozoneAcl.getType())) {
+
+        // We need to do "or" before comparision because think of a case like
+        // existing acl is 777 and newly added acl is 444, we have already
+        // that acl set. In this case if we do direct check they will not
+        // be equal, but if we do or and then check, we shall know it
+        // has acl's already set or not.
+        BitSet newAclBits = BitSet.valueOf(
+            existingAcl.getRights().toByteArray());
+
+        newAclBits.or(BitSet.valueOf(ozoneAcl.getRights().toByteArray()));
+
+        if (newAclBits.equals(BitSet.valueOf(
+            existingAcl.getRights().toByteArray()))) {
+          return false;
+        } else {
+          OzoneAclInfo newAcl = OzoneAclInfo.newBuilder()
+              .setType(ozoneAcl.getType())
+              .setName(ozoneAcl.getName())
+              .setAclScope(ozoneAcl.getAclScope())
+              .setRights(ByteString.copyFrom(newAclBits.toByteArray()))
+              .build();
+          getAcls().remove(existingAcl);
+          getAcls().add(newAcl);
+          addToExistingAcl = true;
+          break;
+        }
+      }
+    }
+
+    // Case 2: When a completely new acl is added.
+    if(!addToExistingAcl) {
+      getAcls().add(ozoneAcl);
+    }
+    return true;
+  }
+
+  /**
+   * Remove acl from existing acl list.
+   * @param ozoneAcl
+   * @return true - if successfully removed, false if not able to remove due
+   * to that acl is not in the existing acl list.
+   */
+  public boolean removeAcl(OzoneAclInfo ozoneAcl) {
+    boolean removed = false;
+
+    // When we are removing subset of rights from existing acl.
+    for(OzoneAclInfo existingAcl: getAcls()) {
+      if (existingAcl.getName().equals(ozoneAcl.getName()) &&
+          existingAcl.getType().equals(ozoneAcl.getType())) {
+
+        BitSet bits = BitSet.valueOf(ozoneAcl.getRights().toByteArray());
+        BitSet existingAclBits =
+            BitSet.valueOf(existingAcl.getRights().toByteArray());
+        bits.and(existingAclBits);
+
+        // This happens when the acl bitset asked to remove is not set for
+        // matched name and type.
+        // Like a case we have 444 permission, 333 is asked to removed.
+        if (bits.equals(ZERO_BITSET)) {
+          return false;
+        }
+
+        // We have some matching. Remove them.
+        bits.xor(existingAclBits);
+
+        // If existing acl has same bitset as passed acl bitset, remove that
+        // acl from the list
+        if (bits.equals(ZERO_BITSET)) {
+          getAcls().remove(existingAcl);
+        } else {
+          // Remove old acl and add new acl.
+          OzoneAclInfo newAcl = OzoneAclInfo.newBuilder()
+              .setType(ozoneAcl.getType())
+              .setName(ozoneAcl.getName())
+              .setAclScope(ozoneAcl.getAclScope())
+              .setRights(ByteString.copyFrom(bits.toByteArray()))
+              .build();
+          getAcls().remove(existingAcl);
+          getAcls().add(newAcl);
+        }
+        removed = true;
+        break;
+      }
+    }
+
+    return removed;
+  }
+
+  /**
+   * Reset the existing acl list.
+   * @param ozoneAcls
+   * @return true - if successfully able to reset.
+   */
+  public boolean setAcls(List<OzoneAclInfo> ozoneAcls) {
+    this.acls.clear();
+    this.acls = ozoneAcls;
+    return true;
+  }
+
+
+
+  /**
    * Builder of OmKeyInfo.
    */
   public static class Builder {
@@ -320,7 +437,8 @@ public final class OmKeyInfo extends WithMetadata {
     }
 
     public Builder setAcls(List<OzoneAclInfo> listOfAcls) {
-      this.acls = listOfAcls;
+      this.acls = new ArrayList<>();
+      this.acls.addAll(listOfAcls);
       return this;
     }
 
@@ -359,22 +477,22 @@ public final class OmKeyInfo extends WithMetadata {
   }
 
   public static OmKeyInfo getFromProtobuf(KeyInfo keyInfo) {
-    return new OmKeyInfo(
-        keyInfo.getVolumeName(),
-        keyInfo.getBucketName(),
-        keyInfo.getKeyName(),
-        keyInfo.getKeyLocationListList().stream()
+    return new OmKeyInfo.Builder()
+        .setVolumeName(keyInfo.getVolumeName())
+        .setBucketName(keyInfo.getBucketName())
+        .setKeyName(keyInfo.getKeyName())
+        .setOmKeyLocationInfos(keyInfo.getKeyLocationListList().stream()
             .map(OmKeyLocationInfoGroup::getFromProtobuf)
-            .collect(Collectors.toList()),
-        keyInfo.getDataSize(),
-        keyInfo.getCreationTime(),
-        keyInfo.getModificationTime(),
-        keyInfo.getType(),
-        keyInfo.getFactor(),
-        KeyValueUtil.getFromProtobuf(keyInfo.getMetadataList()),
-        keyInfo.hasFileEncryptionInfo() ? OMPBHelper.convert(keyInfo
-            .getFileEncryptionInfo()): null,
-        keyInfo.getAclsList());
+            .collect(Collectors.toList()))
+        .setDataSize(keyInfo.getDataSize())
+        .setCreationTime(keyInfo.getCreationTime())
+        .setModificationTime(keyInfo.getModificationTime())
+        .setReplicationType(keyInfo.getType())
+        .setReplicationFactor(keyInfo.getFactor())
+        .addAllMetadata(KeyValueUtil.getFromProtobuf(keyInfo.getMetadataList()))
+        .setFileEncryptionInfo(keyInfo.hasFileEncryptionInfo() ?
+            OMPBHelper.convert(keyInfo.getFileEncryptionInfo()): null)
+        .setAcls(keyInfo.getAclsList()).build();
   }
 
   @Override
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
index 29b6368..56624f9 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
@@ -781,26 +781,27 @@ public class TestOzoneManagerHA {
         .setVolumeName(ozoneBucket.getVolumeName())
         .setBucketName(ozoneBucket.getName()).build();
 
-    boolean addAcl = objectStore.addAcl(ozoneObj, defaultUserAcl);
-    Assert.assertTrue(addAcl);
-
-    List<OzoneAcl> acls = objectStore.getAcl(ozoneObj);
+    testAddAcl(remoteUserName, ozoneObj, defaultUserAcl);
+  }
+  @Test
+  public void testRemoveBucketAcl() throws Exception {
+    OzoneBucket ozoneBucket = setupBucket();
+    String remoteUserName = "remoteUser";
+    OzoneAcl defaultUserAcl = new OzoneAcl(USER, remoteUserName,
+        READ, DEFAULT);
 
-    Assert.assertTrue(containsAcl(defaultUserAcl, acls));
+    OzoneObj ozoneObj = OzoneObjInfo.Builder.newBuilder()
+        .setResType(OzoneObj.ResourceType.BUCKET)
+        .setStoreType(OzoneObj.StoreType.OZONE)
+        .setVolumeName(ozoneBucket.getVolumeName())
+        .setBucketName(ozoneBucket.getName()).build();
 
-    // Add an already existing acl.
-    addAcl = objectStore.addAcl(ozoneObj, defaultUserAcl);
-    Assert.assertFalse(addAcl);
+    testRemoveAcl(remoteUserName, ozoneObj, defaultUserAcl);
 
-    // Add an acl by changing acl type with same type, name and scope.
-    defaultUserAcl = new OzoneAcl(USER, remoteUserName,
-        WRITE, DEFAULT);
-    addAcl = objectStore.addAcl(ozoneObj, defaultUserAcl);
-    Assert.assertTrue(addAcl);
   }
 
   @Test
-  public void testRemoveBucketAcl() throws Exception {
+  public void testSetBucketAcl() throws Exception {
     OzoneBucket ozoneBucket = setupBucket();
     String remoteUserName = "remoteUser";
     OzoneAcl defaultUserAcl = new OzoneAcl(USER, remoteUserName,
@@ -812,50 +813,96 @@ public class TestOzoneManagerHA {
         .setVolumeName(ozoneBucket.getVolumeName())
         .setBucketName(ozoneBucket.getName()).build();
 
-    // As by default create bucket we add some default acls in RpcClient.
-    List<OzoneAcl> acls = objectStore.getAcl(ozoneObj);
+    testSetAcl(remoteUserName, ozoneObj, defaultUserAcl);
+  }
 
-    Assert.assertTrue(acls.size() > 0);
+  private boolean containsAcl(OzoneAcl ozoneAcl, List<OzoneAcl> ozoneAcls) {
+    for (OzoneAcl acl : ozoneAcls) {
+      boolean result = compareAcls(ozoneAcl, acl);
+      if (result) {
+        // We found a match, return.
+        return result;
+      }
+    }
+    return false;
+  }
 
-    // Remove an existing acl.
-    boolean removeAcl = objectStore.removeAcl(ozoneObj, acls.get(0));
-    Assert.assertTrue(removeAcl);
+  private boolean compareAcls(OzoneAcl givenAcl, OzoneAcl existingAcl) {
+    if (givenAcl.getType().equals(existingAcl.getType())
+        && givenAcl.getName().equals(existingAcl.getName())
+        && givenAcl.getAclScope().equals(existingAcl.getAclScope())) {
+      BitSet bitSet = (BitSet) givenAcl.getAclBitSet().clone();
+      bitSet.and(existingAcl.getAclBitSet());
+      if (bitSet.equals(existingAcl.getAclBitSet())) {
+        return true;
+      }
+    }
+    return false;
+  }
 
-    // Trying to remove an already removed acl.
-    removeAcl = objectStore.removeAcl(ozoneObj, acls.get(0));
-    Assert.assertFalse(removeAcl);
+  @Test
+  public void testAddKeyAcl() throws Exception {
+    OzoneBucket ozoneBucket = setupBucket();
+    String remoteUserName = "remoteUser";
+    OzoneAcl userAcl = new OzoneAcl(USER, remoteUserName,
+        READ, DEFAULT);
 
-    boolean addAcl = objectStore.addAcl(ozoneObj, defaultUserAcl);
-    Assert.assertTrue(addAcl);
+    String key = createKey(ozoneBucket);
 
-    // Just changed acl type here to write, rest all is same as defaultUserAcl.
-    OzoneAcl modifiedUserAcl = new OzoneAcl(USER, remoteUserName,
-        WRITE, DEFAULT);
-    addAcl = objectStore.addAcl(ozoneObj, modifiedUserAcl);
-    Assert.assertTrue(addAcl);
+    OzoneObj ozoneObj = OzoneObjInfo.Builder.newBuilder()
+        .setResType(OzoneObj.ResourceType.KEY)
+        .setStoreType(OzoneObj.StoreType.OZONE)
+        .setVolumeName(ozoneBucket.getVolumeName())
+        .setBucketName(ozoneBucket.getName())
+        .setKeyName(key).build();
 
-    removeAcl = objectStore.removeAcl(ozoneObj, modifiedUserAcl);
-    Assert.assertTrue(removeAcl);
+    testAddAcl(remoteUserName, ozoneObj, userAcl);
+  }
 
-    removeAcl = objectStore.removeAcl(ozoneObj, defaultUserAcl);
-    Assert.assertTrue(removeAcl);
+  @Test
+  public void testRemoveKeyAcl() throws Exception {
+    OzoneBucket ozoneBucket = setupBucket();
+    String remoteUserName = "remoteUser";
+    OzoneAcl userAcl = new OzoneAcl(USER, remoteUserName,
+        READ, DEFAULT);
+
+    String key = createKey(ozoneBucket);
+
+    OzoneObj ozoneObj = OzoneObjInfo.Builder.newBuilder()
+        .setResType(OzoneObj.ResourceType.KEY)
+        .setStoreType(OzoneObj.StoreType.OZONE)
+        .setVolumeName(ozoneBucket.getVolumeName())
+        .setBucketName(ozoneBucket.getName())
+        .setKeyName(key).build();
+
+    testRemoveAcl(remoteUserName, ozoneObj, userAcl);
 
   }
 
   @Test
-  public void testSetBucketAcl() throws Exception {
+  public void testSetKeyAcl() throws Exception {
     OzoneBucket ozoneBucket = setupBucket();
     String remoteUserName = "remoteUser";
-    OzoneAcl defaultUserAcl = new OzoneAcl(USER, remoteUserName,
+    OzoneAcl userAcl = new OzoneAcl(USER, remoteUserName,
         READ, DEFAULT);
 
+    String key = createKey(ozoneBucket);
+
     OzoneObj ozoneObj = OzoneObjInfo.Builder.newBuilder()
-        .setResType(OzoneObj.ResourceType.BUCKET)
+        .setResType(OzoneObj.ResourceType.KEY)
         .setStoreType(OzoneObj.StoreType.OZONE)
         .setVolumeName(ozoneBucket.getVolumeName())
-        .setBucketName(ozoneBucket.getName()).build();
+        .setBucketName(ozoneBucket.getName())
+        .setKeyName(key).build();
+
+    testSetAcl(remoteUserName, ozoneObj, userAcl);
+
+  }
+
 
-    // As by default create bucket we add some default acls in RpcClient.
+  private void testSetAcl(String remoteUserName, OzoneObj ozoneObj,
+      OzoneAcl userAcl) throws Exception {
+    // As by default create will add some default acls in RpcClient.
     List<OzoneAcl> acls = objectStore.getAcl(ozoneObj);
 
     Assert.assertTrue(acls.size() > 0);
@@ -875,32 +922,63 @@ public class TestOzoneManagerHA {
     for (OzoneAcl ozoneAcl : newAcls) {
       Assert.assertTrue(compareAcls(getAcls.get(i++), ozoneAcl));
     }
+
   }
 
-  private boolean containsAcl(OzoneAcl ozoneAcl, List<OzoneAcl> ozoneAcls) {
-    for (OzoneAcl acl : ozoneAcls) {
-      boolean result = compareAcls(ozoneAcl, acl);
-      if (result) {
-        // We found a match, return.
-        return result;
-      }
-    }
-    return false;
+  private void testAddAcl(String remoteUserName, OzoneObj ozoneObj,
+      OzoneAcl userAcl) throws Exception {
+    boolean addAcl = objectStore.addAcl(ozoneObj, userAcl);
+    Assert.assertTrue(addAcl);
+
+    List<OzoneAcl> acls = objectStore.getAcl(ozoneObj);
+
+    Assert.assertTrue(containsAcl(userAcl, acls));
+
+    // Add an already existing acl.
+    addAcl = objectStore.addAcl(ozoneObj, userAcl);
+    Assert.assertFalse(addAcl);
+
+    // Add an acl by changing acl type with same type, name and scope.
+    userAcl = new OzoneAcl(USER, remoteUserName,
+        WRITE, DEFAULT);
+    addAcl = objectStore.addAcl(ozoneObj, userAcl);
+    Assert.assertTrue(addAcl);
   }
 
-  private boolean compareAcls(OzoneAcl givenAcl, OzoneAcl existingAcl) {
-    if (givenAcl.getType().equals(existingAcl.getType())
-        && givenAcl.getName().equals(existingAcl.getName())
-        && givenAcl.getAclScope().equals(existingAcl.getAclScope())) {
-      BitSet bitSet = (BitSet) givenAcl.getAclBitSet().clone();
-      bitSet.and(existingAcl.getAclBitSet());
-      if (bitSet.equals(existingAcl.getAclBitSet())) {
-        return true;
-      }
-    }
-    return false;
+  private void testRemoveAcl(String remoteUserName, OzoneObj ozoneObj,
+      OzoneAcl userAcl)
+      throws Exception{
+    // As by default create will add some default acls in RpcClient.
+    List<OzoneAcl> acls = objectStore.getAcl(ozoneObj);
+
+    Assert.assertTrue(acls.size() > 0);
+
+    // Remove an existing acl.
+    boolean removeAcl = objectStore.removeAcl(ozoneObj, acls.get(0));
+    Assert.assertTrue(removeAcl);
+
+    // Trying to remove an already removed acl.
+    removeAcl = objectStore.removeAcl(ozoneObj, acls.get(0));
+    Assert.assertFalse(removeAcl);
+
+    boolean addAcl = objectStore.addAcl(ozoneObj, userAcl);
+    Assert.assertTrue(addAcl);
+
+    // Just changed acl type here to write, rest all is same as defaultUserAcl.
+    OzoneAcl modifiedUserAcl = new OzoneAcl(USER, remoteUserName,
+        WRITE, DEFAULT);
+    addAcl = objectStore.addAcl(ozoneObj, modifiedUserAcl);
+    Assert.assertTrue(addAcl);
+
+    removeAcl = objectStore.removeAcl(ozoneObj, modifiedUserAcl);
+    Assert.assertTrue(removeAcl);
+
+    removeAcl = objectStore.removeAcl(ozoneObj, userAcl);
+    Assert.assertTrue(removeAcl);
   }
 
+
+
   @Test
   public void testOMRatisSnapshot() throws Exception {
     String userName = "user" + RandomStringUtils.randomNumeric(5);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
index d0dd640..bc1ec0e 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
@@ -34,6 +34,9 @@ import org.apache.hadoop.ozone.om.request.key.OMKeyCreateRequest;
 import org.apache.hadoop.ozone.om.request.key.OMKeyDeleteRequest;
 import org.apache.hadoop.ozone.om.request.key.OMKeyPurgeRequest;
 import org.apache.hadoop.ozone.om.request.key.OMKeyRenameRequest;
+import org.apache.hadoop.ozone.om.request.key.acl.OMKeyAddAclRequest;
+import org.apache.hadoop.ozone.om.request.key.acl.OMKeyRemoveAclRequest;
+import org.apache.hadoop.ozone.om.request.key.acl.OMKeySetAclRequest;
 import org.apache.hadoop.ozone.om.request.s3.bucket.S3BucketCreateRequest;
 import org.apache.hadoop.ozone.om.request.s3.bucket.S3BucketDeleteRequest;
 import org.apache.hadoop.ozone.om.request.s3.multipart.S3InitiateMultipartUploadRequest;
@@ -141,6 +144,8 @@ public final class OzoneManagerRatisUtils {
         return new OMVolumeAddAclRequest(omRequest);
       } else if (ObjectType.BUCKET == type) {
         return new OMBucketAddAclRequest(omRequest);
+      } else if (ObjectType.KEY == type) {
+        return new OMKeyAddAclRequest(omRequest);
       }
     } else if (Type.RemoveAcl == cmdType) {
       ObjectType type = omRequest.getRemoveAclRequest().getObj().getResType();
@@ -148,6 +153,8 @@ public final class OzoneManagerRatisUtils {
         return new OMVolumeRemoveAclRequest(omRequest);
       } else if (ObjectType.BUCKET == type) {
         return new OMBucketRemoveAclRequest(omRequest);
+      } else if (ObjectType.KEY == type) {
+        return new OMKeyRemoveAclRequest(omRequest);
       }
     } else if (Type.SetAcl == cmdType) {
       ObjectType type = omRequest.getSetAclRequest().getObj().getResType();
@@ -155,6 +162,8 @@ public final class OzoneManagerRatisUtils {
         return new OMVolumeSetAclRequest(omRequest);
       } else if (ObjectType.BUCKET == type) {
         return new OMBucketSetAclRequest(omRequest);
+      } else if (ObjectType.KEY == type) {
+        return new OMKeySetAclRequest(omRequest);
       }
     }
     //TODO: handle key and prefix AddAcl
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyAclRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyAclRequest.java
new file mode 100644
index 0000000..d11cf59
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyAclRequest.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.key.acl;
+
+import java.io.IOException;
+
+import com.google.common.base.Optional;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.request.util.ObjectParser;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneObj.ObjectType;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.utils.db.cache.CacheKey;
+import org.apache.hadoop.utils.db.cache.CacheValue;
+
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+
+/**
+ * Base class for Bucket acl request.
+ */
+public abstract class OMKeyAclRequest extends OMClientRequest {
+
+
+  public OMKeyAclRequest(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
+      long transactionLogIndex,
+      OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) {
+
+    OmKeyInfo omKeyInfo = null;
+
+    OMResponse.Builder omResponse = onInit();
+    OMClientResponse omClientResponse = null;
+    IOException exception = null;
+
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+    boolean lockAcquired = false;
+    String volume = null;
+    String bucket = null;
+    String key = null;
+    boolean operationResult = false;
+    try {
+      ObjectParser objectParser = new ObjectParser(getPath(),
+          ObjectType.KEY);
+
+      volume = objectParser.getVolume();
+      bucket = objectParser.getBucket();
+      key = objectParser.getKey();
+
+      // check Acl
+      if (ozoneManager.getAclsEnabled()) {
+        checkAcls(ozoneManager, OzoneObj.ResourceType.VOLUME,
+            OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.WRITE_ACL,
+            volume, bucket, key);
+      }
+      lockAcquired =
+          omMetadataManager.getLock().acquireLock(BUCKET_LOCK, volume, bucket);
+
+      String dbKey = omMetadataManager.getOzoneKey(volume, bucket, key);
+      omKeyInfo = omMetadataManager.getKeyTable().get(dbKey);
+
+      if (omKeyInfo == null) {
+        throw new OMException(OMException.ResultCodes.KEY_NOT_FOUND);
+      }
+
+      operationResult = apply(omKeyInfo);
+
+      if (operationResult) {
+        // update cache.
+        omMetadataManager.getKeyTable().addCacheEntry(
+            new CacheKey<>(dbKey),
+            new CacheValue<>(Optional.of(omKeyInfo), transactionLogIndex));
+      }
+
+      omClientResponse = onSuccess(omResponse, omKeyInfo, operationResult);
+
+    } catch (IOException ex) {
+      exception = ex;
+      omClientResponse = onFailure(omResponse, ex);
+    } finally {
+      if (omClientResponse != null) {
+        omClientResponse.setFlushFuture(
+            ozoneManagerDoubleBufferHelper.add(omClientResponse,
+                transactionLogIndex));
+      }
+      if (lockAcquired) {
+        omMetadataManager.getLock().releaseLock(BUCKET_LOCK, volume, bucket);
+      }
+    }
+
+
+    onComplete(operationResult, exception);
+
+    return omClientResponse;
+  }
+
+  /**
+   * Get the path name from the request.
+   * @return path name
+   */
+  abstract String getPath();
+
+  // TODO: Finer grain metrics can be moved to these callbacks. They can also
+  // be abstracted into separate interfaces in future.
+  /**
+   * Get the initial om response builder with lock.
+   * @return om response builder.
+   */
+  abstract OMResponse.Builder onInit();
+
+  /**
+   * Get the om client response on success case with lock.
+   * @param omResponse
+   * @param omKeyInfo
+   * @param operationResult
+   * @return OMClientResponse
+   */
+  abstract OMClientResponse onSuccess(
+      OMResponse.Builder omResponse, OmKeyInfo omKeyInfo,
+      boolean operationResult);
+
+  /**
+   * Get the om client response on failure case with lock.
+   * @param omResponse
+   * @param exception
+   * @return OMClientResponse
+   */
+  abstract OMClientResponse onFailure(OMResponse.Builder omResponse,
+      IOException exception);
+
+  /**
+   * Completion hook for final processing before return without lock.
+   * Usually used for logging without lock and metric update.
+   * @param operationResult
+   * @param exception
+   */
+  abstract void onComplete(boolean operationResult, IOException exception);
+
+  /**
+   * Apply the acl operation, if successfully completed returns true,
+   * else false.
+   * @param omKeyInfo
+   */
+  abstract boolean apply(OmKeyInfo omKeyInfo);
+}
+
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyAddAclRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyAddAclRequest.java
new file mode 100644
index 0000000..a129334
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyAddAclRequest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.key.acl;
+
+import java.io.IOException;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.response.key.acl.OMKeyAclResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AddAclResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+
+/**
+ * Handle add Acl request for bucket.
+ */
+public class OMKeyAddAclRequest extends OMKeyAclRequest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMKeyAddAclRequest.class);
+
+  private String path;
+  private List<OzoneAclInfo> ozoneAcls;
+
+  public OMKeyAddAclRequest(OMRequest omRequest) {
+    super(omRequest);
+    OzoneManagerProtocolProtos.AddAclRequest addAclRequest =
+        getOmRequest().getAddAclRequest();
+    path = addAclRequest.getObj().getPath();
+    ozoneAcls = Lists.newArrayList(addAclRequest.getAcl());
+  }
+
+  @Override
+  String getPath() {
+    return path;
+  }
+
+  @Override
+  OMResponse.Builder onInit() {
+    return OMResponse.newBuilder().setCmdType(
+        OzoneManagerProtocolProtos.Type.AddAcl).setStatus(
+        OzoneManagerProtocolProtos.Status.OK).setSuccess(true);
+
+  }
+
+  @Override
+  OMClientResponse onSuccess(OMResponse.Builder omResponse,
+      OmKeyInfo omKeyInfo, boolean operationResult) {
+    omResponse.setSuccess(operationResult);
+    omResponse.setAddAclResponse(AddAclResponse.newBuilder()
+        .setResponse(operationResult));
+    return new OMKeyAclResponse(omKeyInfo,
+        omResponse.build());
+  }
+
+  @Override
+  OMClientResponse onFailure(OMResponse.Builder omResponse,
+      IOException exception) {
+    return new OMKeyAclResponse(null,
+        createErrorOMResponse(omResponse, exception));
+  }
+
+  @Override
+  void onComplete(boolean operationResult, IOException exception) {
+    if (operationResult) {
+      LOG.debug("Add acl: {} to path: {} success!", ozoneAcls, path);
+    } else {
+      if (exception == null) {
+        LOG.debug("Add acl {} to path {} failed, because acl already exist",
+            ozoneAcls, path);
+      } else {
+        LOG.error("Add acl {} to path {} failed!", ozoneAcls, path, exception);
+      }
+    }
+  }
+
+  @Override
+  boolean apply(OmKeyInfo omKeyInfo) {
+    // No need to check not null here, this will be never called with null.
+    return omKeyInfo.addAcl(ozoneAcls.get(0));
+  }
+
+}
+
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyRemoveAclRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyRemoveAclRequest.java
new file mode 100644
index 0000000..81d59d0
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyRemoveAclRequest.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.key.acl;
+
+import java.io.IOException;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.response.key.acl.OMKeyAclResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RemoveAclResponse;
+
+/**
+ * Handle add Acl request for bucket.
+ */
+public class OMKeyRemoveAclRequest extends OMKeyAclRequest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMKeyAddAclRequest.class);
+
+  private String path;
+  private List<OzoneAclInfo> ozoneAcls;
+
+  public OMKeyRemoveAclRequest(OMRequest omRequest) {
+    super(omRequest);
+    OzoneManagerProtocolProtos.RemoveAclRequest removeAclRequest =
+        getOmRequest().getRemoveAclRequest();
+    path = removeAclRequest.getObj().getPath();
+    ozoneAcls = Lists.newArrayList(removeAclRequest.getAcl());
+  }
+
+  @Override
+  String getPath() {
+    return path;
+  }
+
+  @Override
+  OMResponse.Builder onInit() {
+    return OMResponse.newBuilder().setCmdType(
+        OzoneManagerProtocolProtos.Type.RemoveAcl).setStatus(
+        OzoneManagerProtocolProtos.Status.OK).setSuccess(true);
+
+  }
+
+  @Override
+  OMClientResponse onSuccess(OMResponse.Builder omResponse,
+      OmKeyInfo omKeyInfo, boolean operationResult) {
+    omResponse.setSuccess(operationResult);
+    omResponse.setRemoveAclResponse(RemoveAclResponse.newBuilder()
+        .setResponse(operationResult));
+    return new OMKeyAclResponse(omKeyInfo,
+        omResponse.build());
+  }
+
+  @Override
+  OMClientResponse onFailure(OMResponse.Builder omResponse,
+      IOException exception) {
+    return new OMKeyAclResponse(null,
+        createErrorOMResponse(omResponse, exception));
+  }
+
+  @Override
+  void onComplete(boolean operationResult, IOException exception) {
+    if (operationResult) {
+      LOG.debug("Remove acl: {} to path: {} success!", ozoneAcls, path);
+    } else {
+      if (exception == null) {
+        LOG.debug("Remove acl {} to path {} failed, because acl already exist",
+            ozoneAcls, path);
+      } else {
+        LOG.error("Remove acl {} to path {} failed!", ozoneAcls, path,
+            exception);
+      }
+    }
+  }
+
+  @Override
+  boolean apply(OmKeyInfo omKeyInfo) {
+    // No need to check not null here, this will be never called with null.
+    return omKeyInfo.removeAcl(ozoneAcls.get(0));
+  }
+
+}
+
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeySetAclRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeySetAclRequest.java
new file mode 100644
index 0000000..9770608
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeySetAclRequest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.key.acl;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.response.key.acl.OMKeyAclResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetAclResponse;
+
+/**
+ * Handle add Acl request for bucket.
+ */
+public class OMKeySetAclRequest extends OMKeyAclRequest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMKeyAddAclRequest.class);
+
+  private String path;
+  private List<OzoneAclInfo> ozoneAcls;
+
+  public OMKeySetAclRequest(OMRequest omRequest) {
+    super(omRequest);
+    OzoneManagerProtocolProtos.SetAclRequest setAclRequest =
+        getOmRequest().getSetAclRequest();
+    path = setAclRequest.getObj().getPath();
+    ozoneAcls = setAclRequest.getAclList();
+  }
+
+  @Override
+  String getPath() {
+    return path;
+  }
+
+  @Override
+  OMResponse.Builder onInit() {
+    return OMResponse.newBuilder().setCmdType(
+        OzoneManagerProtocolProtos.Type.SetAcl).setStatus(
+        OzoneManagerProtocolProtos.Status.OK).setSuccess(true);
+
+  }
+
+  @Override
+  OMClientResponse onSuccess(OMResponse.Builder omResponse,
+      OmKeyInfo omKeyInfo, boolean operationResult) {
+    omResponse.setSuccess(operationResult);
+    omResponse.setSetAclResponse(SetAclResponse.newBuilder()
+        .setResponse(operationResult));
+    return new OMKeyAclResponse(omKeyInfo,
+        omResponse.build());
+  }
+
+  @Override
+  OMClientResponse onFailure(OMResponse.Builder omResponse,
+      IOException exception) {
+    return new OMKeyAclResponse(null,
+        createErrorOMResponse(omResponse, exception));
+  }
+
+  @Override
+  void onComplete(boolean operationResult, IOException exception) {
+    if (operationResult) {
+      LOG.debug("Set acl: {} to path: {} success!", ozoneAcls, path);
+    } else {
+      if (exception == null) {
+        LOG.debug("Set acl {} to path {} failed!", ozoneAcls, path);
+      } else {
+        LOG.error("Set acl {} to path {} failed!", ozoneAcls, path, exception);
+      }
+    }
+  }
+
+  @Override
+  boolean apply(OmKeyInfo omKeyInfo) {
+    // No need to check not null here, this will be never called with null.
+    return omKeyInfo.setAcls(ozoneAcls);
+  }
+
+}
+
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/package-info.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/package-info.java
new file mode 100644
index 0000000..c532519
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * Package contains classes related to acl requests for keys.
+ */
+package org.apache.hadoop.ozone.om.request.key.acl;
+
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/util/ObjectParser.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/util/ObjectParser.java
index 7b258a0..c12cdac 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/util/ObjectParser.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/util/ObjectParser.java
@@ -52,7 +52,7 @@ public class ObjectParser {
     } else if (objectType == ObjectType.KEY && tokens.length == 3) {
       volume = tokens[0];
       bucket = tokens[1];
-      key = tokens[3];
+      key = tokens[2];
     } else {
       throw new OMException("Illegal path " + path,
           OMException.ResultCodes.INVALID_PATH_IN_ACL_REQUEST);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/acl/OMKeyAclResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/acl/OMKeyAclResponse.java
new file mode 100644
index 0000000..a67ec39
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/acl/OMKeyAclResponse.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response.key.acl;
+
+import java.io.IOException;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMResponse;
+import org.apache.hadoop.utils.db.BatchOperation;
+
+/**
+ * Response for Bucket acl request.
+ */
+public class OMKeyAclResponse extends OMClientResponse {
+
+  private final OmKeyInfo omKeyInfo;
+
+  public OMKeyAclResponse(@Nullable OmKeyInfo omKeyInfo,
+      @Nonnull OMResponse omResponse) {
+    super(omResponse);
+    this.omKeyInfo = omKeyInfo;
+  }
+
+  @Override
+  public void addToDBBatch(OMMetadataManager omMetadataManager,
+      BatchOperation batchOperation) throws IOException {
+
+    // If response status is OK and success is true, add to DB batch.
+    if (getOMResponse().getStatus() == OzoneManagerProtocolProtos.Status.OK &&
+        getOMResponse().getSuccess()) {
+      String dbKey =
+          omMetadataManager.getOzoneKey(omKeyInfo.getVolumeName(),
+              omKeyInfo.getBucketName(), omKeyInfo.getKeyName());
+      omMetadataManager.getKeyTable().putWithBatch(batchOperation,
+          dbKey, omKeyInfo);
+    }
+  }
+
+}
+
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/acl/package-info.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/acl/package-info.java
new file mode 100644
index 0000000..6a17231
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/acl/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * Package contains classes related to bucket acl responses.
+ */
+package org.apache.hadoop.ozone.om.response.key.acl;
+


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org