You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2021/04/06 10:24:25 UTC

[GitHub] [ozone] rakeshadr commented on a change in pull request #2011: HDDS-4691. [FSO]Authorizer: OM can do recursive ACL check for subpaths

rakeshadr commented on a change in pull request #2011:
URL: https://github.com/apache/ozone/pull/2011#discussion_r607722712



##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
##########
@@ -1780,6 +1788,48 @@ public boolean checkAccess(OzoneObj ozObject, RequestContext context)
     }
   }
 
+  /**
+   * check acls for all subpaths of a directory.
+   *
+   * @param ozObject
+   * @param context
+   * @return
+   * @throws IOException
+   */
+  private boolean checkChildrenAcls(OzoneObj ozObject, RequestContext context)
+      throws IOException {
+    OmKeyInfo keyInfo;
+    OzoneFileStatus ozoneFileStatus =
+        ozObject.getOzonePrefixPathViewer().getOzoneFileStatus();
+    keyInfo = ozoneFileStatus.getKeyInfo();
+    // Using stack to check acls for subpaths
+    Stack<OzoneFileStatus> stack = new Stack<>();
+    // check whether given file/dir  has access
+    boolean hasAccess = OzoneAclUtil.checkAclRight(keyInfo.getAcls(), context);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("user:{} has access rights for key:{} :{} ",
+          context.getClientUgi(), ozObject.getKeyName(), hasAccess);
+    }
+    if (ozoneFileStatus.isDirectory() && hasAccess) {
+      stack.add(ozoneFileStatus);
+    }
+    while (!stack.isEmpty() && hasAccess) {
+      ozoneFileStatus = stack.pop();
+      String keyPath = ozoneFileStatus.getTrimmedName();
+      Iterator<? extends OzoneFileStatus> children =
+          ozObject.getOzonePrefixPathViewer().getChildren(keyPath);
+      while (hasAccess && children.hasNext()) {
+        ozoneFileStatus = children.next();
+        keyInfo = ozoneFileStatus.getKeyInfo();
+        hasAccess = OzoneAclUtil.checkAclRight(keyInfo.getAcls(), context);
+        if (hasAccess && ozoneFileStatus.isDirectory()) {

Review comment:
       Can you please add debug log to know the access status. Something like below,
   
         if (LOG.isDebugEnabled()) {
           LOG.debug("user:{} has access rights for key:{} :{} ",
               context.getClientUgi(), keyInfo.getKeyName(), hasAccess);
         }

##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestRecursiveAclWithFSOBucket.java
##########
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.StorageType;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.client.BucketArgs;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS_NATIVE;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
+import static org.apache.hadoop.ozone.security.acl.OzoneObj.StoreType.OZONE;
+
+/**
+ * Test recursive acl checks for delete and rename for FSO Buckets.
+ */
+public class TestRecursiveAclWithFSOBucket {
+
+  @Rule public Timeout timeout = Timeout.seconds(120);
+
+  private final UserGroupInformation adminUser =
+      UserGroupInformation.createUserForTesting("om", new String[] {"ozone"});
+  private final UserGroupInformation user1 = UserGroupInformation
+      .createUserForTesting("user1", new String[] {"test1"});
+  private final UserGroupInformation user2 = UserGroupInformation
+      .createUserForTesting("user2", new String[] {"test2"});
+
+  @Before
+  public void init() throws Exception {
+    // loginUser is the user running this test.
+    // Implication: loginUser is automatically added to the OM admin list.
+    UserGroupInformation.setLoginUser(adminUser);
+  }
+
+  @Test
+  public void testKeyDeleteAndRenameWithoutPermission() throws Exception {
+    // ozone.acl.enabled = true
+    //1. started a cluster
+    MiniOzoneCluster cluster = startCluster(true);
+
+    List<String> keys = new ArrayList<>();
+    //2. Create volumes with user1
+
+    OzoneClient client = cluster.getClient();
+    ObjectStore objectStore = client.getObjectStore();
+
+    /* r = READ, w = WRITE, c = CREATE, d = DELETE
+       l = LIST, a = ALL, n = NONE, x = READ_ACL, y = WRITE_ACL */
+    String aclWorldAll = "world::a";
+    createVolumeWithOwnerAndAcl(objectStore, "volume1", "user1", aclWorldAll);
+
+    //3. Login as user1, create directories and keys
+    UserGroupInformation.setLoginUser(user1);
+    client = cluster.getClient();
+    objectStore = client.getObjectStore();
+
+    OzoneVolume volume = objectStore.getVolume("volume1");
+
+    BucketArgs omBucketArgs =
+        BucketArgs.newBuilder().setStorageType(StorageType.DISK).build();
+
+    //4. create bucket with user1
+    volume.createBucket("bucket1", omBucketArgs);
+    setBucketAcl(objectStore, volume.getName(), "bucket1", aclWorldAll);
+    OzoneBucket ozoneBucket = volume.getBucket("bucket1");
+
+    /**
+     *                       buck-1
+     *                        |
+     *                        a
+     *                        |
+     *          ------------------------------------
+     *         |           |              |        |
+     *         b1          b2             b3      file1
+     *       -----       ------           -----
+     *       |    |      |    |          |    |
+     *      c1   c2     d1   d2          e1   e2
+     *       |    |      |    |           |    |
+     *       f1   f2     f3  --------     f5   f6
+     *                      |        |
+     *                    d21        file2
+     *                     |
+     *                     f4
+     *
+     *     Test Case 1 :
+     *     Remove delete acl from file File2
+     *     Try deleting b2
+     *
+     *     Test case 2:
+     *     Remove delete acl fro dir c2
+     *     Try deleting b1
+     *
+     *     Test case 3
+     *     try deleting b3
+     */
+
+    String keyc1 = "a/b1/c1/f1";
+    String keyc2 = "a/b1/c2/f2";
+    String keyd1 = "a/b2/d1/f3";
+    String keyd21 = "a/b2/d2/d21/f4";
+    String keye1 = "/a/b3/e1/f5";
+    String keye2 = "/a/b3/e2/f6";
+
+    String file1 = "a/" + "file" + RandomStringUtils.randomNumeric(5);
+    String file2 = "a/b2/d2/" + "file" + RandomStringUtils.randomNumeric(5);
+
+    keys.add(keyc1);
+    keys.add(keyc2);
+    keys.add(keyd1);
+    keys.add(keyd21);
+    keys.add(keye1);
+    keys.add(keye2);
+    keys.add(file1);
+    keys.add(file2);
+
+    createKeys(objectStore, ozoneBucket, keys);
+
+    // Test case 1
+    // Remove acls from file2
+    // Delete/Rename on directory a/b2 should throw permission denied
+    // (since file2 is a child)
+    removeAclsFromKey(objectStore, ozoneBucket, file2);
+    OzoneObj ozoneObj;
+    List<OzoneAcl> aclList1;
+
+    UserGroupInformation.setLoginUser(user2);
+    client = cluster.getClient();
+    objectStore = client.getObjectStore();
+    volume = objectStore.getVolume("volume1");
+    ozoneBucket = volume.getBucket("bucket1");
+
+    // perform  delete
+    try {
+      ozoneBucket.deleteDirectory("a/b2", true);
+      Assert.fail("Should throw permission denied !");
+    } catch (OMException ome) {
+      // expect permission error
+      GenericTestUtils.assertExceptionContains("PERMISSION_DENIED", ome);
+    }
+
+    // perform rename
+    try {
+      ozoneBucket.renameKey("a/b2", "a/b2_renamed");
+      Assert.fail("Should throw permission denied !");
+    } catch (OMException ome) {
+      // expect permission error
+      GenericTestUtils.assertExceptionContains("PERMISSION_DENIED", ome);
+    }
+
+    // Test case 2
+    // Remove acl from directory c2, delete/rename a/b1 should throw
+    // permission denied since c2 is a subdirectory
+
+    UserGroupInformation.setLoginUser(user1);
+    removeAclsFromKey(objectStore, ozoneBucket, "a/b1/c2");
+
+    UserGroupInformation.setLoginUser(user2);
+    // perform  delete
+    try {
+      ozoneBucket.deleteDirectory("a/b1", true);
+      Assert.fail("Should throw permission denied !");
+    } catch (OMException ome) {
+      // expect permission error
+      GenericTestUtils.assertExceptionContains("PERMISSION_DENIED", ome);
+    }
+
+    // perform rename
+    try {
+      ozoneBucket.renameKey("a/b1", "a/b1_renamed");
+      Assert.fail("Should throw permission denied !");
+    } catch (OMException ome) {
+      // expect permission error
+      GenericTestUtils.assertExceptionContains("PERMISSION_DENIED", ome);
+    }
+
+    // Test case 3
+    // delete b3 and this shouldn't throw exception because acls have not
+    // been removed from subpaths.
+    ozoneBucket.deleteDirectory("a/b3", true);
+    stopCluster(cluster);

Review comment:
       Please make `@After` to the stopCluster() function and remove explicit cleanup. This will ensure proper cleanup irrespective of test pass/fail.  

##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestRecursiveAclWithFSOBucket.java
##########
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.StorageType;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.client.BucketArgs;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS_NATIVE;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
+import static org.apache.hadoop.ozone.security.acl.OzoneObj.StoreType.OZONE;
+
+/**
+ * Test recursive acl checks for delete and rename for FSO Buckets.
+ */
+public class TestRecursiveAclWithFSOBucket {
+
+  @Rule public Timeout timeout = Timeout.seconds(120);
+
+  private final UserGroupInformation adminUser =
+      UserGroupInformation.createUserForTesting("om", new String[] {"ozone"});
+  private final UserGroupInformation user1 = UserGroupInformation
+      .createUserForTesting("user1", new String[] {"test1"});
+  private final UserGroupInformation user2 = UserGroupInformation
+      .createUserForTesting("user2", new String[] {"test2"});
+
+  @Before
+  public void init() throws Exception {
+    // loginUser is the user running this test.
+    // Implication: loginUser is automatically added to the OM admin list.
+    UserGroupInformation.setLoginUser(adminUser);
+  }
+
+  @Test
+  public void testKeyDeleteAndRenameWithoutPermission() throws Exception {
+    // ozone.acl.enabled = true
+    //1. started a cluster
+    MiniOzoneCluster cluster = startCluster(true);
+
+    List<String> keys = new ArrayList<>();
+    //2. Create volumes with user1
+
+    OzoneClient client = cluster.getClient();
+    ObjectStore objectStore = client.getObjectStore();
+
+    /* r = READ, w = WRITE, c = CREATE, d = DELETE
+       l = LIST, a = ALL, n = NONE, x = READ_ACL, y = WRITE_ACL */
+    String aclWorldAll = "world::a";
+    createVolumeWithOwnerAndAcl(objectStore, "volume1", "user1", aclWorldAll);
+
+    //3. Login as user1, create directories and keys
+    UserGroupInformation.setLoginUser(user1);
+    client = cluster.getClient();
+    objectStore = client.getObjectStore();
+
+    OzoneVolume volume = objectStore.getVolume("volume1");
+
+    BucketArgs omBucketArgs =
+        BucketArgs.newBuilder().setStorageType(StorageType.DISK).build();
+
+    //4. create bucket with user1
+    volume.createBucket("bucket1", omBucketArgs);
+    setBucketAcl(objectStore, volume.getName(), "bucket1", aclWorldAll);
+    OzoneBucket ozoneBucket = volume.getBucket("bucket1");
+
+    /**
+     *                       buck-1
+     *                        |
+     *                        a
+     *                        |
+     *          ------------------------------------
+     *         |           |              |        |
+     *         b1          b2             b3      file1
+     *       -----       ------           -----
+     *       |    |      |    |          |    |
+     *      c1   c2     d1   d2          e1   e2
+     *       |    |      |    |           |    |
+     *       f1   f2     f3  --------     f5   f6
+     *                      |        |
+     *                    d21        file2
+     *                     |
+     *                     f4
+     *
+     *     Test Case 1 :
+     *     Remove delete acl from file File2
+     *     Try deleting b2
+     *
+     *     Test case 2:
+     *     Remove delete acl fro dir c2
+     *     Try deleting b1
+     *
+     *     Test case 3
+     *     try deleting b3
+     */
+
+    String keyc1 = "a/b1/c1/f1";
+    String keyc2 = "a/b1/c2/f2";
+    String keyd1 = "a/b2/d1/f3";
+    String keyd21 = "a/b2/d2/d21/f4";
+    String keye1 = "/a/b3/e1/f5";
+    String keye2 = "/a/b3/e2/f6";
+
+    String file1 = "a/" + "file" + RandomStringUtils.randomNumeric(5);
+    String file2 = "a/b2/d2/" + "file" + RandomStringUtils.randomNumeric(5);
+
+    keys.add(keyc1);
+    keys.add(keyc2);
+    keys.add(keyd1);
+    keys.add(keyd21);
+    keys.add(keye1);
+    keys.add(keye2);
+    keys.add(file1);
+    keys.add(file2);
+
+    createKeys(objectStore, ozoneBucket, keys);
+
+    // Test case 1
+    // Remove acls from file2
+    // Delete/Rename on directory a/b2 should throw permission denied
+    // (since file2 is a child)
+    removeAclsFromKey(objectStore, ozoneBucket, file2);
+    OzoneObj ozoneObj;
+    List<OzoneAcl> aclList1;
+
+    UserGroupInformation.setLoginUser(user2);
+    client = cluster.getClient();
+    objectStore = client.getObjectStore();
+    volume = objectStore.getVolume("volume1");
+    ozoneBucket = volume.getBucket("bucket1");
+
+    // perform  delete
+    try {
+      ozoneBucket.deleteDirectory("a/b2", true);
+      Assert.fail("Should throw permission denied !");
+    } catch (OMException ome) {
+      // expect permission error
+      GenericTestUtils.assertExceptionContains("PERMISSION_DENIED", ome);
+    }
+
+    // perform rename
+    try {
+      ozoneBucket.renameKey("a/b2", "a/b2_renamed");
+      Assert.fail("Should throw permission denied !");
+    } catch (OMException ome) {
+      // expect permission error
+      GenericTestUtils.assertExceptionContains("PERMISSION_DENIED", ome);
+    }
+
+    // Test case 2
+    // Remove acl from directory c2, delete/rename a/b1 should throw
+    // permission denied since c2 is a subdirectory
+
+    UserGroupInformation.setLoginUser(user1);
+    removeAclsFromKey(objectStore, ozoneBucket, "a/b1/c2");
+
+    UserGroupInformation.setLoginUser(user2);
+    // perform  delete
+    try {
+      ozoneBucket.deleteDirectory("a/b1", true);
+      Assert.fail("Should throw permission denied !");
+    } catch (OMException ome) {
+      // expect permission error
+      GenericTestUtils.assertExceptionContains("PERMISSION_DENIED", ome);

Review comment:
       Please assert result code.
   
   assertRequals("Permission check failed", ResultCodes.PERMISSION_DENIED, ome.getResult());
   
   Please do this for the catch blocks, below  the test code as well..

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
##########
@@ -1780,6 +1788,48 @@ public boolean checkAccess(OzoneObj ozObject, RequestContext context)
     }
   }
 
+  /**
+   * check acls for all subpaths of a directory.
+   *
+   * @param ozObject
+   * @param context
+   * @return
+   * @throws IOException
+   */
+  private boolean checkChildrenAcls(OzoneObj ozObject, RequestContext context)
+      throws IOException {
+    OmKeyInfo keyInfo;
+    OzoneFileStatus ozoneFileStatus =
+        ozObject.getOzonePrefixPathViewer().getOzoneFileStatus();
+    keyInfo = ozoneFileStatus.getKeyInfo();
+    // Using stack to check acls for subpaths
+    Stack<OzoneFileStatus> stack = new Stack<>();

Review comment:
       How about renaming the stack variable name to `directories` to make it clear that stack stores only dirs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org