You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by pr...@apache.org on 2021/12/06 19:57:30 UTC

[ozone] branch master updated: HDDS-5903. Add Support for Bucket Owner Acls (#2826)

This is an automated email from the ASF dual-hosted git repository.

prashantpogde pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new df0489a  HDDS-5903. Add Support for Bucket Owner Acls (#2826)
df0489a is described below

commit df0489a4ad5ba1f8459a10036524926bef114c18
Author: Aswin Shakil Balasubramanian <as...@gmail.com>
AuthorDate: Mon Dec 6 11:57:16 2021 -0800

    HDDS-5903. Add Support for Bucket Owner Acls (#2826)
    
    * HDDS-5684. Initial commit
    
    * HDDS-5684. Support for bucket info and bucket list.
    
    * HDDS-5684. Removed admin field.
    
    * HDDS-5903. Refactored checkACLs.
    
    * HDDS-5903. Fixed getBucketOwner
    
    * HDDS-5903. Added Test.
    
    * HDDS-5903. Fixed Tests.
    
    * HDDS-5903. Added additional tests.
    
    * Trigger Build
---
 .../org/apache/hadoop/ozone/client/BucketArgs.java |  21 +-
 .../apache/hadoop/ozone/client/OzoneBucket.java    |  29 +++
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |  18 +-
 .../hadoop/ozone/om/helpers/OmBucketInfo.java      |  32 ++-
 .../apache/hadoop/ozone/om/TestBucketOwner.java    | 244 +++++++++++++++++++++
 .../org/apache/hadoop/ozone/om/TestOmAcls.java     |   6 +-
 .../src/main/proto/OmClientProtocol.proto          |   1 +
 .../org/apache/hadoop/ozone/om/OzoneAclUtils.java  | 125 +++++++++++
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  72 +++++-
 .../hadoop/ozone/om/request/OMClientRequest.java   |  90 +++++++-
 .../hadoop/ozone/om/request/key/OMKeyRequest.java  |   5 +-
 .../ozone/shell/bucket/CreateBucketHandler.java    |  14 +-
 12 files changed, 626 insertions(+), 31 deletions(-)

diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/BucketArgs.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/BucketArgs.java
index 1eeb2b5..43d28f0 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/BucketArgs.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/BucketArgs.java
@@ -62,6 +62,8 @@ public final class BucketArgs {
   private long quotaInBytes;
   private long quotaInNamespace;
 
+  private String owner;
+
   /**
    * Bucket Layout.
    */
@@ -79,12 +81,14 @@ public final class BucketArgs {
    * @param quotaInBytes Bucket quota in bytes.
    * @param quotaInNamespace Bucket quota in counts.
    * @param bucketLayout Bucket Layouts.
+   * @param owner owner of the bucket.
    */
   @SuppressWarnings("parameternumber")
   private BucketArgs(Boolean versioning, StorageType storageType,
       List<OzoneAcl> acls, Map<String, String> metadata,
       String bucketEncryptionKey, String sourceVolume, String sourceBucket,
-      long quotaInBytes, long quotaInNamespace, BucketLayout bucketLayout) {
+      long quotaInBytes, long quotaInNamespace, BucketLayout bucketLayout,
+      String owner) {
     this.acls = acls;
     this.versioning = versioning;
     this.storageType = storageType;
@@ -95,6 +99,7 @@ public final class BucketArgs {
     this.quotaInBytes = quotaInBytes;
     this.quotaInNamespace = quotaInNamespace;
     this.bucketLayout = bucketLayout;
+    this.owner = owner;
   }
 
   /**
@@ -179,6 +184,13 @@ public final class BucketArgs {
   }
 
   /**
+   * Returns the Owner Name.
+   */
+  public String getOwner() {
+    return owner;
+  }
+
+  /**
    * Builder for OmBucketInfo.
    */
   public static class Builder {
@@ -192,6 +204,7 @@ public final class BucketArgs {
     private long quotaInBytes;
     private long quotaInNamespace;
     private BucketLayout bucketLayout;
+    private String owner;
 
     public Builder() {
       metadata = new HashMap<>();
@@ -249,6 +262,10 @@ public final class BucketArgs {
       return this;
     }
 
+    public BucketArgs.Builder setOwner(String ownerName) {
+      owner = ownerName;
+      return this;
+    }
 
     /**
      * Constructs the BucketArgs.
@@ -257,7 +274,7 @@ public final class BucketArgs {
     public BucketArgs build() {
       return new BucketArgs(versioning, storageType, acls, metadata,
           bucketEncryptionKey, sourceVolume, sourceBucket, quotaInBytes,
-          quotaInNamespace, bucketLayout);
+          quotaInNamespace, bucketLayout, owner);
     }
   }
 }
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
index e472808..23cf922 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
@@ -140,6 +140,10 @@ public class OzoneBucket extends WithMetadata {
    * Bucket Layout.
    */
   private BucketLayout bucketLayout = BucketLayout.DEFAULT;
+  /**
+   * Bucket Owner.
+   */
+  private String owner;
 
   private OzoneBucket(ConfigurationSource conf, String volumeName,
       String bucketName, ClientProtocol proxy) {
@@ -221,6 +225,22 @@ public class OzoneBucket extends WithMetadata {
     this.bucketLayout = bucketLayout;
   }
 
+
+  @SuppressWarnings("checkstyle:ParameterNumber")
+  public OzoneBucket(ConfigurationSource conf, ClientProtocol proxy,
+       String volumeName, String bucketName, StorageType storageType,
+       Boolean versioning, long creationTime, long modificationTime,
+       Map<String, String> metadata, String encryptionKeyName,
+       String sourceVolume, String sourceBucket, long usedBytes,
+       long usedNamespace, long quotaInBytes, long quotaInNamespace,
+       BucketLayout bucketLayout, String owner) {
+    this(conf, proxy, volumeName, bucketName, storageType, versioning,
+        creationTime, modificationTime, metadata, encryptionKeyName,
+        sourceVolume, sourceBucket, usedBytes, usedNamespace, quotaInBytes,
+        quotaInNamespace, bucketLayout);
+    this.owner = owner;
+  }
+
   /**
    * Constructs OzoneBucket instance.
    * @param conf Configuration object.
@@ -386,6 +406,15 @@ public class OzoneBucket extends WithMetadata {
   }
 
   /**
+   * Returns the owner of the Bucket.
+   *
+   * @return owner
+   */
+  public String getOwner() {
+    return owner;
+  }
+
+  /**
    * Builder for OmBucketInfo.
   /**
    * Adds ACLs to the Bucket.
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 4df5508..9ca8693 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -513,6 +513,8 @@ public class RpcClient implements ClientProtocol {
     verifyCountsQuota(bucketArgs.getQuotaInNamespace());
     verifySpaceQuota(bucketArgs.getQuotaInBytes());
 
+    String owner = bucketArgs.getOwner() == null ?
+            ugi.getShortUserName() : bucketArgs.getOwner();
     Boolean isVersionEnabled = bucketArgs.getVersioning() == null ?
         Boolean.FALSE : bucketArgs.getVersioning();
     StorageType storageType = bucketArgs.getStorageType() == null ?
@@ -540,15 +542,17 @@ public class RpcClient implements ClientProtocol {
         .setQuotaInBytes(bucketArgs.getQuotaInBytes())
         .setQuotaInNamespace(bucketArgs.getQuotaInNamespace())
         .setAcls(listOfAcls.stream().distinct().collect(Collectors.toList()))
-        .setBucketLayout(bucketArgs.getBucketLayout());
+        .setBucketLayout(bucketArgs.getBucketLayout())
+        .setOwner(owner);
 
     if (bek != null) {
       builder.setBucketEncryptionKey(bek);
     }
 
-    LOG.info("Creating Bucket: {}/{}, with Versioning {} and " +
-            "Storage Type set to {} and Encryption set to {} ",
-        volumeName, bucketName, isVersionEnabled, storageType, bek != null);
+    LOG.info("Creating Bucket: {}/{}, with {} as owner and Versioning {} and " +
+        "Storage Type set to {} and Encryption set to {} ",
+        volumeName, bucketName, owner, isVersionEnabled,
+        storageType, bek != null);
     ozoneManagerClient.createBucket(builder.build());
   }
 
@@ -773,7 +777,8 @@ public class RpcClient implements ClientProtocol {
         bucketInfo.getUsedNamespace(),
         bucketInfo.getQuotaInBytes(),
         bucketInfo.getQuotaInNamespace(),
-        bucketInfo.getBucketLayout()
+        bucketInfo.getBucketLayout(),
+        bucketInfo.getOwner()
     );
   }
 
@@ -802,7 +807,8 @@ public class RpcClient implements ClientProtocol {
         bucket.getUsedNamespace(),
         bucket.getQuotaInBytes(),
         bucket.getQuotaInNamespace(),
-        bucket.getBucketLayout()))
+        bucket.getBucketLayout(),
+        bucket.getOwner()))
         .collect(Collectors.toList());
   }
 
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
index b1b1102..786bb74 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
@@ -91,6 +91,8 @@ public final class OmBucketInfo extends WithObjectID implements Auditable {
    */
   private BucketLayout bucketLayout;
 
+  private final String owner;
+
   /**
    * Private constructor, constructed via builder.
    * @param volumeName - Volume name.
@@ -108,6 +110,7 @@ public final class OmBucketInfo extends WithObjectID implements Auditable {
    * @param quotaInBytes Bucket quota in bytes.
    * @param quotaInNamespace Bucket quota in counts.
    * @param bucketLayout Bucket Layout.
+   * @param owner owner of the bucket.
    */
   @SuppressWarnings("checkstyle:ParameterNumber")
   private OmBucketInfo(String volumeName,
@@ -127,7 +130,8 @@ public final class OmBucketInfo extends WithObjectID implements Auditable {
       long usedNamespace,
       long quotaInBytes,
       long quotaInNamespace,
-      BucketLayout bucketLayout) {
+      BucketLayout bucketLayout,
+      String owner) {
     this.volumeName = volumeName;
     this.bucketName = bucketName;
     this.acls = acls;
@@ -146,6 +150,7 @@ public final class OmBucketInfo extends WithObjectID implements Auditable {
     this.quotaInBytes = quotaInBytes;
     this.quotaInNamespace = quotaInNamespace;
     this.bucketLayout = bucketLayout;
+    this.owner = owner;
   }
 
   /**
@@ -288,6 +293,10 @@ public final class OmBucketInfo extends WithObjectID implements Auditable {
     return sourceVolume != null && sourceBucket != null;
   }
 
+  public String getOwner() {
+    return owner;
+  }
+
   /**
    * Returns new builder class that builds a OmBucketInfo.
    *
@@ -362,7 +371,8 @@ public final class OmBucketInfo extends WithObjectID implements Auditable {
         .setUsedNamespace(usedNamespace)
         .setQuotaInBytes(quotaInBytes)
         .setQuotaInNamespace(quotaInNamespace)
-        .setBucketLayout(bucketLayout);
+        .setBucketLayout(bucketLayout)
+        .setOwner(owner);
   }
 
   /**
@@ -387,6 +397,7 @@ public final class OmBucketInfo extends WithObjectID implements Auditable {
     private long quotaInBytes;
     private long quotaInNamespace;
     private BucketLayout bucketLayout;
+    private String owner;
 
     public Builder() {
       //Default values
@@ -510,6 +521,11 @@ public final class OmBucketInfo extends WithObjectID implements Auditable {
       return this;
     }
 
+    public Builder setOwner(String ownerName) {
+      this.owner = ownerName;
+      return this;
+    }
+
     /**
      * Constructs the OmBucketInfo.
      * @return instance of OmBucketInfo.
@@ -524,7 +540,7 @@ public final class OmBucketInfo extends WithObjectID implements Auditable {
       return new OmBucketInfo(volumeName, bucketName, acls, isVersionEnabled,
           storageType, creationTime, modificationTime, objectID, updateID,
           metadata, bekInfo, sourceVolume, sourceBucket, usedBytes,
-          usedNamespace, quotaInBytes, quotaInNamespace, bucketLayout);
+          usedNamespace, quotaInBytes, quotaInNamespace, bucketLayout, owner);
     }
   }
 
@@ -559,6 +575,9 @@ public final class OmBucketInfo extends WithObjectID implements Auditable {
     if (sourceBucket != null) {
       bib.setSourceBucket(sourceBucket);
     }
+    if (owner != null) {
+      bib.setOwner(owner);
+    }
     return bib.build();
   }
 
@@ -617,6 +636,9 @@ public final class OmBucketInfo extends WithObjectID implements Auditable {
     if (bucketInfo.hasSourceBucket()) {
       obib.setSourceBucket(bucketInfo.getSourceBucket());
     }
+    if (bucketInfo.hasOwner()) {
+      obib.setOwner(bucketInfo.getOwner());
+    }
     return obib.build();
   }
 
@@ -664,7 +686,8 @@ public final class OmBucketInfo extends WithObjectID implements Auditable {
         Objects.equals(sourceVolume, that.sourceVolume) &&
         Objects.equals(sourceBucket, that.sourceBucket) &&
         Objects.equals(metadata, that.metadata) &&
-        Objects.equals(bekInfo, that.bekInfo);
+        Objects.equals(bekInfo, that.bekInfo) &&
+        Objects.equals(owner, that.owner);
   }
 
   @Override
@@ -692,6 +715,7 @@ public final class OmBucketInfo extends WithObjectID implements Auditable {
         ", quotaInBytes=" + quotaInBytes +
         ", quotaInNamespace=" + quotaInNamespace +
         ", bucketLayout=" + bucketLayout +
+        ", owner=" + owner +
         '}';
   }
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestBucketOwner.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestBucketOwner.java
new file mode 100644
index 0000000..0cbc0da
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestBucketOwner.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.StorageType;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.client.*;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.*;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.apache.hadoop.ozone.OzoneAcl.AclScope.DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.*;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType.USER;
+import static org.apache.hadoop.ozone.security.acl.OzoneObj.StoreType.OZONE;
+import static org.junit.Assert.fail;
+
+/**
+ * Test for Ozone Bucket Owner.
+ */
+public class TestBucketOwner {
+  @Rule public Timeout timeout = Timeout.seconds(120);
+
+  private static MiniOzoneCluster cluster;
+  private static final Logger LOG =
+          LoggerFactory.getLogger(TestBucketOwner.class);
+  private static  UserGroupInformation adminUser =
+          UserGroupInformation.createUserForTesting("om",
+          new String[] {"ozone"});
+  private static  UserGroupInformation user1 = UserGroupInformation
+          .createUserForTesting("user1", new String[] {"test1"});
+  private static UserGroupInformation user2 = UserGroupInformation
+          .createUserForTesting("user2", new String[] {"test2"});
+  private static UserGroupInformation user3 = UserGroupInformation
+          .createUserForTesting("user3", new String[] {"test3"});
+  private static OzoneClient client;
+  private static ObjectStore objectStore;
+
+  @BeforeClass
+  public static void init() throws Exception {
+    // loginUser is the user running this test.
+    UserGroupInformation.setLoginUser(adminUser);
+    OzoneConfiguration conf = new OzoneConfiguration();
+    String clusterId = UUID.randomUUID().toString();
+    String scmId = UUID.randomUUID().toString();
+    String omId = UUID.randomUUID().toString();
+    conf.set(OZONE_ACL_AUTHORIZER_CLASS, OZONE_ACL_AUTHORIZER_CLASS_NATIVE);
+    conf.setBoolean(OZONE_ACL_ENABLED, true);
+    cluster = MiniOzoneCluster.newBuilder(conf).setClusterId(clusterId)
+            .setScmId(scmId).setOmId(omId).build();
+    cluster.waitForClusterToBeReady();
+    client = cluster.getClient();
+    objectStore = client.getObjectStore();
+    /* r = READ, w = WRITE, c = CREATE, d = DELETE
+       l = LIST, a = ALL, n = NONE, x = READ_ACL, y = WRITE_ACL */
+    String aclWorldAll = "world::a";
+    createVolumeWithOwnerAndAcl(objectStore, "volume1", "user2", aclWorldAll);
+    UserGroupInformation.setLoginUser(user1);
+    client = cluster.getClient();
+    objectStore = client.getObjectStore();
+    OzoneVolume volume = objectStore.getVolume("volume1");
+    BucketArgs omBucketArgs = BucketArgs.newBuilder()
+            .setStorageType(StorageType.DISK).setOwner("user1").build();
+    volume.createBucket("bucket1", omBucketArgs);
+    volume.createBucket("bucket2", omBucketArgs);
+    volume.createBucket("bucket3", omBucketArgs);
+  }
+
+  @AfterClass
+  public static void stopCluster() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testBucketOwner() throws Exception {
+    // Test Key Operations as Bucket Owner,  Non-Volume Owner
+    UserGroupInformation.setLoginUser(user1);
+    OzoneVolume volume = cluster.getClient().getObjectStore()
+            .getVolume("volume1");
+    OzoneBucket ozoneBucket = volume.getBucket("bucket1");
+    //Key Create
+    createKey(ozoneBucket, "key1", 10, new byte[10]);
+    createKey(ozoneBucket, "key2", 10, new byte[10]);
+    //Key Delete
+    ozoneBucket.deleteKey("key1");
+    //Bucket Delete
+    volume.deleteBucket("bucket3");
+    //List Keys
+    ozoneBucket.listKeys("key");
+    //Get Acls
+    ozoneBucket.getAcls();
+    //Add Acls
+    OzoneAcl acl = new OzoneAcl(USER, "testuser",
+        IAccessAuthorizer.ACLType.ALL, DEFAULT);
+    ozoneBucket.addAcl(acl);
+  }
+
+  @Test
+  public void testNonBucketNonVolumeOwner() throws Exception {
+    // Test Key Operations Non-Bucket Owner, Non-Volume Owner
+    //Key Create
+    UserGroupInformation.setLoginUser(user3);
+    OzoneBucket ozoneBucket;
+    try {
+      OzoneVolume volume = cluster.getClient().getObjectStore()
+              .getVolume("volume1");
+      ozoneBucket = volume.getBucket("bucket1");
+      createKey(ozoneBucket, "key3", 10, new byte[10]);
+      fail("Create key as non-volume and non-bucket owner should fail");
+    } catch (Exception ex) {
+      LOG.info(ex.getMessage());
+    }
+    //Key Delete - should fail
+    try {
+      OzoneVolume volume = cluster.getClient().getObjectStore()
+              .getVolume("volume1");
+      ozoneBucket = volume.getBucket("bucket1");
+      ozoneBucket.deleteKey("key2");
+      fail("Delete key as non-volume and non-bucket owner should fail");
+    } catch (Exception ex) {
+      LOG.info(ex.getMessage());
+    }
+    //Key Rename - should fail
+    try {
+      OzoneVolume volume = cluster.getClient().getObjectStore()
+              .getVolume("volume1");
+      ozoneBucket = volume.getBucket("bucket1");
+      ozoneBucket.renameKey("key2", "key4");
+      fail("Rename key as non-volume and non-bucket owner should fail");
+    } catch (Exception ex) {
+      LOG.info(ex.getMessage());
+    }
+    //List Keys - should fail
+    try {
+      OzoneVolume volume = cluster.getClient().getObjectStore()
+              .getVolume("volume1");
+      ozoneBucket = volume.getBucket("bucket1");
+      ozoneBucket.listKeys("key");
+      fail("List keys as non-volume and non-bucket owner should fail");
+    } catch (Exception ex) {
+      LOG.info(ex.getMessage());
+    }
+    //Get Acls - should fail
+    try {
+      OzoneVolume volume = cluster.getClient().getObjectStore()
+              .getVolume("volume1");
+      ozoneBucket = volume.getBucket("bucket1");
+      ozoneBucket.getAcls();
+      fail("Get Acls as non-volume and non-bucket owner should fail");
+    } catch (Exception ex) {
+      LOG.info(ex.getMessage());
+    }
+    //Add Acls - should fail
+    try {
+      OzoneVolume volume = cluster.getClient().getObjectStore()
+              .getVolume("volume1");
+      ozoneBucket = volume.getBucket("bucket1");
+      OzoneAcl acl = new OzoneAcl(USER, "testuser1",
+              IAccessAuthorizer.ACLType.ALL, DEFAULT);
+      ozoneBucket.addAcl(acl);
+      fail("Add Acls as non-volume and non-bucket owner should fail");
+    } catch (Exception ex) {
+      LOG.info(ex.getMessage());
+    }
+  }
+
+  @Test
+  public void testVolumeOwner() throws Exception {
+    //Test Key Operations for Volume Owner
+    UserGroupInformation.setLoginUser(user2);
+    OzoneVolume volume = cluster.getClient().getObjectStore()
+            .getVolume("volume1");
+    OzoneBucket ozoneBucket = volume.getBucket("bucket1");
+    //Key Create
+    createKey(ozoneBucket, "key2", 10, new byte[10]);
+    //Key Delete
+    ozoneBucket.deleteKey("key2");
+    //List Keys
+    ozoneBucket.listKeys("key");
+    //Get Acls
+    ozoneBucket.getAcls();
+    //Add Acls
+    OzoneAcl acl = new OzoneAcl(USER, "testuser2",
+            IAccessAuthorizer.ACLType.ALL, DEFAULT);
+    ozoneBucket.addAcl(acl);
+    //Bucket Delete
+    volume.deleteBucket("bucket2");
+  }
+
+  private static void createVolumeWithOwnerAndAcl(ObjectStore store,
+      String volumeName, String ownerName, String aclString)
+      throws IOException {
+    ClientProtocol proxy = store.getClientProxy();
+    store.createVolume(volumeName);
+    proxy.setVolumeOwner(volumeName, ownerName);
+    setVolumeAcl(store, volumeName, aclString);
+  }
+
+  /**
+   * Helper function to set volume ACL.
+   */
+  private static void setVolumeAcl(ObjectStore store, String volumeName,
+      String aclString) throws IOException {
+    OzoneObj obj = OzoneObjInfo.Builder.newBuilder().setVolumeName(volumeName)
+        .setResType(OzoneObj.ResourceType.VOLUME).setStoreType(OZONE).build();
+    Assert.assertTrue(store.setAcl(obj, OzoneAcl.parseAcls(aclString)));
+  }
+
+  private void createKey(OzoneBucket ozoneBucket, String key, int length,
+       byte[] input) throws Exception {
+    OzoneOutputStream ozoneOutputStream = ozoneBucket.createKey(key, length);
+    ozoneOutputStream.write(input);
+    ozoneOutputStream.write(input, 0, 10);
+    ozoneOutputStream.close();
+  }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java
index ac43c24..5a96d68 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java
@@ -132,7 +132,7 @@ public class TestOmAcls {
         () -> volume.createBucket(bucketName));
 
     assertTrue(logCapturer.getOutput()
-        .contains("doesn't have CREATE permission to access bucket"));
+        .contains("doesn't have READ permission to access volume"));
   }
 
   @Test
@@ -147,8 +147,8 @@ public class TestOmAcls {
 
     OzoneTestUtils.expectOmException(ResultCodes.PERMISSION_DENIED,
         () -> TestDataUtil.createKey(bucket, "testKey", "testcontent"));
-    assertTrue(logCapturer.getOutput().contains("doesn't have CREATE " +
-        "permission to access key"));
+    assertTrue(logCapturer.getOutput().contains("doesn't have READ " +
+        "permission to access volume"));
   }
 
   /**
diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 7fb230f..1dd922c 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -545,6 +545,7 @@ message BucketInfo {
     optional int64 quotaInNamespace = 16 [default = -2];
     optional uint64 usedNamespace = 17;
     optional BucketLayoutProto bucketLayout = 18;
+    optional string owner = 19;
 }
 
 enum StorageTypeProto {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneAclUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneAclUtils.java
new file mode 100644
index 0000000..e7834db
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneAclUtils.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.net.InetAddress;
+
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
+
+/**
+ * Ozone Acl Wrapper class.
+ */
+public final class OzoneAclUtils {
+
+  private OzoneAclUtils() {
+  }
+
+  /**
+   * Check Acls of ozone object with volume owner and bucket owner.
+   * @param ozoneManager
+   * @param resType
+   * @param storeType
+   * @param aclType
+   * @param vol
+   * @param bucket
+   * @param key
+   * @param volOwner
+   * @param bucketOwner
+   * @throws IOException
+   */
+  @SuppressWarnings("parameternumber")
+  public static void checkAllAcls(OzoneManager ozoneManager,
+      OzoneObj.ResourceType resType,
+      OzoneObj.StoreType storeType, IAccessAuthorizer.ACLType aclType,
+      String vol, String bucket, String key, String volOwner,
+      String bucketOwner, UserGroupInformation user, InetAddress remoteAddress,
+      String hostName) throws IOException {
+
+    boolean isVolOwner = isOwner(user, volOwner);
+
+    IAccessAuthorizer.ACLType parentAclRight = aclType;
+
+    //OzoneNativeAuthorizer differs from Ranger Authorizer as Ranger requires
+    // only READ access on parent level access. OzoneNativeAuthorizer has
+    // different parent level access based on the child level access type
+    if(ozoneManager.isNativeAuthorizerEnabled()) {
+      if (aclType == IAccessAuthorizer.ACLType.CREATE ||
+          aclType == IAccessAuthorizer.ACLType.DELETE ||
+          aclType == IAccessAuthorizer.ACLType.WRITE_ACL) {
+        parentAclRight = IAccessAuthorizer.ACLType.WRITE;
+      } else if (aclType == IAccessAuthorizer.ACLType.READ_ACL ||
+          aclType == IAccessAuthorizer.ACLType.LIST) {
+        parentAclRight = IAccessAuthorizer.ACLType.READ;
+      }
+    } else {
+      parentAclRight =  IAccessAuthorizer.ACLType.READ;
+    }
+
+    switch (resType) {
+    //For Volume level access we only need to check {OWNER} equal
+    // to Volume Owner.
+    case VOLUME:
+      ozoneManager.checkAcls(resType, storeType, aclType, vol, bucket, key,
+          user, remoteAddress, hostName, true,
+          volOwner);
+      break;
+    case BUCKET:
+    case KEY:
+    //For Bucket/Key/Prefix level access, first we need to check {OWNER} equal
+    // to volume owner on parent volume. Then we need to check {OWNER} equals
+    // volume owner if current ugi user is volume owner else we need check
+    //{OWNER} equals bucket owner for bucket/key/prefix.
+    case PREFIX:
+      ozoneManager.checkAcls(OzoneObj.ResourceType.VOLUME, storeType,
+          parentAclRight, vol, bucket, key, user,
+          remoteAddress, hostName, true,
+          volOwner);
+      if (isVolOwner) {
+        ozoneManager.checkAcls(resType, storeType, aclType, vol, bucket, key,
+            user, remoteAddress, hostName, true,
+            volOwner);
+      } else {
+        ozoneManager.checkAcls(resType, storeType, aclType, vol, bucket, key,
+            user, remoteAddress, hostName, true,
+            bucketOwner);
+      }
+      break;
+    default:
+      throw new OMException("Unexpected object type:" +
+              resType, INVALID_REQUEST);
+    }
+  }
+
+  private static boolean isOwner(UserGroupInformation callerUgi,
+      String ownerName) {
+    if (ownerName == null) {
+      return false;
+    }
+    if (callerUgi.getUserName().equals(ownerName) ||
+        callerUgi.getShortUserName().equals(ownerName)) {
+      return true;
+    }
+    return false;
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 8ed1bc3..2107754 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -244,6 +244,7 @@ import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.DETE
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_AUTH_METHOD;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
 import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
 import static org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer.RaftServerStatus.LEADER_AND_READY;
 import static org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer.getRaftGroupIdFromOmServiceId;
@@ -1322,6 +1323,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     return prefixManager;
   }
 
+  public IAccessAuthorizer getAccessAuthorizer() {
+    return accessAuthorizer;
+  }
+
   /**
    * Get metadata manager.
    *
@@ -2151,12 +2156,25 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     }
 
     InetAddress remoteIp = ProtobufRpcEngine.Server.getRemoteIp();
+    String volumeOwner = getVolumeOwner(vol, acl, resType);
+    String bucketOwner = getBucketOwner(vol, bucket, acl, resType);
 
-    checkAcls(resType, store, acl, vol, bucket, key,
+    OzoneAclUtils.checkAllAcls(this, resType, store, acl,
+        vol, bucket, key, volumeOwner, bucketOwner,
         user != null ? user : getRemoteUser(),
         remoteIp != null ? remoteIp : omRpcAddress.getAddress(),
-        remoteIp != null ? remoteIp.getHostName() : omRpcAddress.getHostName(),
-        true, getVolumeOwner(vol, acl, resType));
+        remoteIp != null ? remoteIp.getHostName() : omRpcAddress.getHostName());
+  }
+
+  private boolean isOwner(UserGroupInformation callerUgi, String ownerName) {
+    if (ownerName == null) {
+      return false;
+    }
+    if (callerUgi.getUserName().equals(ownerName) ||
+            callerUgi.getShortUserName().equals(ownerName)) {
+      return true;
+    }
+    return false;
   }
 
   /**
@@ -2216,6 +2234,50 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   }
 
   /**
+   * Return the owner of a given bucket.
+   *
+   * @return String
+   */
+  public String getBucketOwner(String volume, String bucket, ACLType type,
+       ResourceType resType) throws OMException {
+    String bucketOwner = null;
+    if ((resType != ResourceType.VOLUME) &&
+        !(type == ACLType.CREATE && resType == ResourceType.BUCKET)) {
+      bucketOwner = getBucketOwner(volume, bucket);
+    }
+    return bucketOwner;
+  }
+
+  private String getBucketOwner(String volume, String bucket)
+      throws OMException {
+
+    Boolean lockAcquired = metadataManager.getLock().acquireReadLock(
+            BUCKET_LOCK, volume, bucket);
+    String dbBucketKey = metadataManager.getBucketKey(volume, bucket);
+    OmBucketInfo bucketInfo = null;
+    try {
+      bucketInfo = metadataManager.getBucketTable().get(dbBucketKey);
+    } catch (IOException ioe) {
+      if (ioe instanceof OMException) {
+        throw (OMException)ioe;
+      } else {
+        throw new OMException("getBucketOwner for Bucket " + volume + "/" +
+            bucket  + " failed: " + ioe.getMessage(),
+            ResultCodes.INTERNAL_ERROR);
+      }
+    } finally {
+      if (lockAcquired) {
+        metadataManager.getLock().releaseReadLock(BUCKET_LOCK, volume, bucket);
+      }
+    }
+    if (bucketInfo != null) {
+      return bucketInfo.getOwner();
+    } else {
+      throw new OMException("Bucket not found", ResultCodes.BUCKET_NOT_FOUND);
+    }
+  }
+
+  /**
    * CheckAcls for the ozone object.
    *
    * @return true if permission granted, false if permission denied.
@@ -2226,7 +2288,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   public boolean checkAcls(ResourceType resType, StoreType storeType,
       ACLType aclType, String vol, String bucket, String key,
       UserGroupInformation ugi, InetAddress remoteAddress, String hostName,
-      boolean throwIfPermissionDenied, String volumeOwner)
+      boolean throwIfPermissionDenied, String owner)
       throws OMException {
     OzoneObj obj = OzoneObjInfo.Builder.newBuilder()
         .setResType(resType)
@@ -2240,7 +2302,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
         .setHost(hostName)
         .setAclType(ACLIdentityType.USER)
         .setAclRights(aclType)
-        .setOwnerName(volumeOwner)
+        .setOwnerName(owner)
         .build();
 
     return checkAcls(obj, context, throwIfPermissionDenied);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
index fd1e01e..a4ef4a1 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.ozone.audit.AuditAction;
 import org.apache.hadoop.ozone.audit.AuditEventStatus;
 import org.apache.hadoop.ozone.audit.AuditLogger;
 import org.apache.hadoop.ozone.audit.AuditMessage;
+import org.apache.hadoop.ozone.om.OzoneAclUtils;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.OzonePrefixPathImpl;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -39,10 +40,7 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.LayoutVersion;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
-import org.apache.hadoop.ozone.security.acl.OzoneObj;
-import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
-import org.apache.hadoop.ozone.security.acl.RequestContext;
+import org.apache.hadoop.ozone.security.acl.*;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -195,7 +193,8 @@ public abstract class OMClientRequest implements RequestAuditor {
       OzoneObj.StoreType storeType, IAccessAuthorizer.ACLType aclType,
       String vol, String bucket, String key) throws IOException {
     checkAcls(ozoneManager, resType, storeType, aclType, vol, bucket, key,
-        ozoneManager.getVolumeOwner(vol, aclType, resType));
+        ozoneManager.getVolumeOwner(vol, aclType, resType),
+        ozoneManager.getBucketOwner(vol, bucket, aclType, resType));
   }
 
   /**
@@ -234,15 +233,66 @@ public abstract class OMClientRequest implements RequestAuditor {
     if (ozoneManager.getAclsEnabled()) {
       String volumeOwner = ozoneManager.getVolumeOwner(obj.getVolumeName(),
           contextBuilder.getAclRights(), obj.getResourceType());
-      contextBuilder.setClientUgi(createUGI());
+      String bucketOwner = ozoneManager.getBucketOwner(obj.getVolumeName(),
+          obj.getBucketName(), contextBuilder.getAclRights(),
+          obj.getResourceType());
+      UserGroupInformation currentUser = createUGI();
+      contextBuilder.setClientUgi(currentUser);
       contextBuilder.setIp(getRemoteAddress());
       contextBuilder.setHost(getHostName());
       contextBuilder.setAclType(IAccessAuthorizer.ACLIdentityType.USER);
-      contextBuilder.setOwnerName(volumeOwner);
+
+      boolean isVolOwner = isOwner(currentUser, volumeOwner);
+      IAccessAuthorizer.ACLType parentAclRight = aclType;
+      if (isVolOwner) {
+        contextBuilder.setOwnerName(volumeOwner);
+      } else {
+        contextBuilder.setOwnerName(bucketOwner);
+      }
+      if (ozoneManager.isNativeAuthorizerEnabled()) {
+        if (aclType == IAccessAuthorizer.ACLType.CREATE ||
+                aclType == IAccessAuthorizer.ACLType.DELETE ||
+                aclType == IAccessAuthorizer.ACLType.WRITE_ACL) {
+          parentAclRight = IAccessAuthorizer.ACLType.WRITE;
+        } else if (aclType == IAccessAuthorizer.ACLType.READ_ACL ||
+                aclType == IAccessAuthorizer.ACLType.LIST) {
+          parentAclRight = IAccessAuthorizer.ACLType.READ;
+        }
+      } else {
+        parentAclRight = IAccessAuthorizer.ACLType.READ;
+
+      }
+      OzoneObj volumeObj = OzoneObjInfo.Builder.newBuilder()
+              .setResType(OzoneObj.ResourceType.VOLUME)
+              .setStoreType(OzoneObj.StoreType.OZONE)
+              .setVolumeName(volumeName)
+              .setBucketName(bucketName)
+              .setKeyName(keyName).build();
+      RequestContext volumeContext = RequestContext.newBuilder()
+              .setClientUgi(currentUser)
+              .setIp(getRemoteAddress())
+              .setHost(getHostName())
+              .setAclType(IAccessAuthorizer.ACLIdentityType.USER)
+              .setAclRights(parentAclRight)
+              .setOwnerName(volumeOwner)
+              .build();
+      ozoneManager.checkAcls(volumeObj, volumeContext, true);
       ozoneManager.checkAcls(obj, contextBuilder.build(), true);
     }
   }
 
+  private boolean isOwner(UserGroupInformation callerUgi,
+                                 String ownerName) {
+    if (ownerName == null) {
+      return false;
+    }
+    if (callerUgi.getUserName().equals(ownerName) ||
+        callerUgi.getShortUserName().equals(ownerName)) {
+      return true;
+    }
+    return false;
+  }
+
   /**
    * Check Acls of ozone object with volOwner given.
    * @param ozoneManager
@@ -267,6 +317,32 @@ public abstract class OMClientRequest implements RequestAuditor {
   }
 
   /**
+   * Check Acls of ozone object with volOwner given.
+   * @param ozoneManager
+   * @param resType
+   * @param storeType
+   * @param aclType
+   * @param vol
+   * @param bucket
+   * @param key
+   * @param volOwner
+   * @param bucketOwner
+   * @throws IOException
+   */
+  @SuppressWarnings("parameternumber")
+  public void checkAcls(OzoneManager ozoneManager,
+      OzoneObj.ResourceType resType,
+      OzoneObj.StoreType storeType, IAccessAuthorizer.ACLType aclType,
+      String vol, String bucket, String key, String volOwner,
+      String bucketOwner)
+      throws IOException {
+
+    OzoneAclUtils.checkAllAcls(ozoneManager, resType, storeType, aclType,
+            vol, bucket, key, volOwner, bucketOwner, createUGI(),
+            getRemoteAddress(), getHostName());
+  }
+
+  /**
    * Return UGI object created from OMRequest userInfo. If userInfo is not
    * set, returns null.
    * @return UserGroupInformation.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index e5fbcea..92a9c07 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@ -347,8 +347,9 @@ public abstract class OMKeyRequest extends OMClientRequest {
       OzoneObj.ResourceType resourceType, String volumeOwner)
       throws IOException {
     if (ozoneManager.getAclsEnabled()) {
-      checkAcls(ozoneManager, resourceType, OzoneObj.StoreType.OZONE, aclType,
-          volume, bucket, key, volumeOwner);
+      checkAcls(ozoneManager, resourceType, OzoneObj.StoreType.OZONE,
+          aclType, volume, bucket, key, volumeOwner,
+          ozoneManager.getBucketOwner(volume, bucket, aclType, resourceType));
     }
   }
 
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/bucket/CreateBucketHandler.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/bucket/CreateBucketHandler.java
index 20e3f48..5c07662 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/bucket/CreateBucketHandler.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/bucket/CreateBucketHandler.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.shell.OzoneAddress;
 
 import org.apache.hadoop.ozone.shell.SetSpaceQuotaOptions;
+import org.apache.hadoop.security.UserGroupInformation;
 import picocli.CommandLine;
 import picocli.CommandLine.Command;
 import picocli.CommandLine.Option;
@@ -51,6 +52,11 @@ public class CreateBucketHandler extends BucketHandler {
           "false/unspecified indicates otherwise")
   private Boolean isGdprEnforced;
 
+  @Option(names = {"--user", "-u"},
+          description = "Owner of the bucket. Defaults to current" +
+              " user if not specified")
+  private String ownerName;
+
   enum AllowedBucketLayouts {FILE_SYSTEM_OPTIMIZED, OBJECT_STORE}
 
   @Option(names = { "--layout", "-l" },
@@ -68,12 +74,16 @@ public class CreateBucketHandler extends BucketHandler {
   public void execute(OzoneClient client, OzoneAddress address)
       throws IOException {
 
+    if (ownerName == null) {
+      ownerName = UserGroupInformation.getCurrentUser().getShortUserName();
+    }
+
     BucketArgs.Builder bb;
     BucketLayout bucketLayout =
         BucketLayout.valueOf(allowedBucketLayout.toString());
     bb = new BucketArgs.Builder().setStorageType(StorageType.DEFAULT)
-        .setVersioning(false).setBucketLayout(bucketLayout);
-
+        .setVersioning(false).setBucketLayout(bucketLayout)
+        .setOwner(ownerName);
     // TODO: New Client talking to old server, will it create a LEGACY bucket?
 
     if (isGdprEnforced != null) {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org