You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/07/03 14:41:43 UTC

[25/40] storm git commit: STORM-2518 Handles empty name for "USER type" ACL when normalizing ACLs

STORM-2518 Handles empty name for "USER type" ACL when normalizing ACLs

* if ACLs have both empty name and user, discard empty name ACLs
* add default ACL for user when neither empty name nor user ACL exists
* populate empty name ACLs to users in Subject: set name to user for each


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bb908f61
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bb908f61
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bb908f61

Branch: refs/heads/1.1.x-branch
Commit: bb908f61acd5301ea831158fe1e9b55c34b747ac
Parents: 5a721d7
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed May 17 17:45:44 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Jun 29 16:48:54 2017 +0900

----------------------------------------------------------------------
 .../storm/blobstore/BlobStoreAclHandler.java    | 52 +++++++++++++++++---
 1 file changed, 45 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/bb908f61/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreAclHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreAclHandler.java b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreAclHandler.java
index 5b3866d..89fe165 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreAclHandler.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreAclHandler.java
@@ -356,6 +356,7 @@ public class BlobStoreAclHandler {
         for (String user : userNames) {
             fixACLsForUser(cleanAcls, user, opMask);
         }
+        fixEmptyNameACLForUsers(cleanAcls, userNames, opMask);
         if ((who == null || userNames.isEmpty()) && !worldEverything(acls)) {
             cleanAcls.addAll(BlobStoreAclHandler.WORLD_EVERYTHING);
             LOG.debug("Access Control for key {} is normalized to world everything {}", key, cleanAcls);
@@ -378,17 +379,29 @@ public class BlobStoreAclHandler {
 
     private void fixACLsForUser(List<AccessControl> acls, String user, int mask) {
         boolean foundUserACL = false;
+        List<AccessControl> emptyUserACLs = new ArrayList<>();
+
         for (AccessControl control : acls) {
-            if (control.get_type() == AccessControlType.USER && control.get_name().equals(user)) {
-                int currentAccess = control.get_access();
-                if ((currentAccess & mask) != mask) {
-                    control.set_access(currentAccess | mask);
+            if (control.get_type() == AccessControlType.USER) {
+                if (!control.is_set_name()) {
+                    emptyUserACLs.add(control);
+                } else if (control.get_name().equals(user)) {
+                    int currentAccess = control.get_access();
+                    if ((currentAccess & mask) != mask) {
+                        control.set_access(currentAccess | mask);
+                    }
+                    foundUserACL = true;
                 }
-                foundUserACL = true;
-                break;
             }
         }
-        if (!foundUserACL) {
+
+        // if ACLs have two user ACLs for empty user and principal, discard empty user ACL
+        if (!emptyUserACLs.isEmpty() && foundUserACL) {
+            acls.removeAll(emptyUserACLs);
+        }
+
+        // add default user ACL when only empty user ACL is not present
+        if (emptyUserACLs.isEmpty() && !foundUserACL) {
             AccessControl userACL = new AccessControl();
             userACL.set_type(AccessControlType.USER);
             userACL.set_name(user);
@@ -397,6 +410,31 @@ public class BlobStoreAclHandler {
         }
     }
 
+    private void fixEmptyNameACLForUsers(List<AccessControl> acls, Set<String> users, int mask) {
+        List<AccessControl> aclsToAdd = new ArrayList<>();
+        List<AccessControl> aclsToRemove = new ArrayList<>();
+
+        for (AccessControl control : acls) {
+            if (control.get_type() == AccessControlType.USER && !control.is_set_name()) {
+                aclsToRemove.add(control);
+
+                int currentAccess = control.get_access();
+                if ((currentAccess & mask) != mask) {
+                    control.set_access(currentAccess | mask);
+                }
+
+                for (String user : users) {
+                    AccessControl copiedControl = new AccessControl(control);
+                    copiedControl.set_name(user);
+                    aclsToAdd.add(copiedControl);
+                }
+            }
+        }
+
+        acls.removeAll(aclsToRemove);
+        acls.addAll(aclsToAdd);
+    }
+
     private Set<String> getUserNamesFromSubject(Subject who) {
         Set<String> user = new HashSet<String>();
         if (who != null) {