You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2019/03/01 15:46:47 UTC

[hbase] branch branch-2 updated: HBASE-21481 [acl] Superuser's permissions should not be granted or revoked by any non-su global admin

This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 6de61d6  HBASE-21481 [acl] Superuser's permissions should not be granted or revoked by any non-su global admin
6de61d6 is described below

commit 6de61d6bee6b8306b8b4f4d6858a28c8beb8b06d
Author: Reid Chan <re...@apache.org>
AuthorDate: Thu Feb 28 16:55:13 2019 +0800

    HBASE-21481 [acl] Superuser's permissions should not be granted or revoked by any non-su global admin
    
    Signed-off-by: Guanghao Zhang <zg...@apache.org>
---
 .../apache/hadoop/hbase/security/Superusers.java   |  17 ++-
 .../org/apache/hadoop/hbase/security/User.java     |   9 +-
 .../apache/hadoop/hbase/security/UserProvider.java |  10 ++
 .../hbase/security/access/AccessChecker.java       |  49 +++++++-
 .../hbase/security/access/AccessController.java    |   3 +
 .../hadoop/hbase/security/access/AuthManager.java  |  57 +++------
 .../hbase/security/access/SecureTestUtil.java      | 128 +++++++++++++++++++++
 .../hbase/security/access/TestRpcAccessChecks.java |  99 ++++++++++++++--
 8 files changed, 311 insertions(+), 61 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/security/Superusers.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/security/Superusers.java
index 1089197..b5566e6 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/security/Superusers.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/security/Superusers.java
@@ -71,7 +71,8 @@ public final class Superusers {
     String[] superUserList = conf.getStrings(SUPERUSER_CONF_KEY, new String[0]);
     for (String name : superUserList) {
       if (AuthUtil.isGroupPrincipal(name)) {
-        superGroups.add(AuthUtil.getGroupName(name));
+        // Let's keep the '@' for distinguishing from user.
+        superGroups.add(name);
       } else {
         superUsers.add(name);
       }
@@ -94,17 +95,29 @@ public final class Superusers {
       return true;
     }
     for (String group : user.getGroupNames()) {
-      if (superGroups.contains(group)) {
+      if (superGroups.contains(AuthUtil.toGroupEntry(group))) {
         return true;
       }
     }
     return false;
   }
 
+  /**
+   * @return true if current user is a super user, false otherwise.
+   * @param user to check
+   */
+  public static boolean isSuperUser(String user) {
+    return superUsers.contains(user) || superGroups.contains(user);
+  }
+
   public static Collection<String> getSuperUsers() {
     return superUsers;
   }
 
+  public static Collection<String> getSuperGroups() {
+    return superGroups;
+  }
+
   public static User getSystemUser() {
     return systemUser;
   }
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java
index 733a658..97d80ba 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java
@@ -351,7 +351,8 @@ public abstract class User {
     public static User createUserForTesting(Configuration conf,
         String name, String[] groups) {
       synchronized (UserProvider.class) {
-        if (!(UserProvider.groups instanceof TestingGroups)) {
+        if (!(UserProvider.groups instanceof TestingGroups) ||
+            conf.getBoolean(TestingGroups.TEST_CONF, false)) {
           UserProvider.groups = new TestingGroups(UserProvider.groups);
         }
       }
@@ -400,11 +401,13 @@ public abstract class User {
     }
   }
 
-  static class TestingGroups extends Groups {
+  public static class TestingGroups extends Groups {
+    public static final String TEST_CONF = "hbase.group.service.for.test.only";
+
     private final Map<String, List<String>> userToGroupsMapping = new HashMap<>();
     private Groups underlyingImplementation;
 
-    TestingGroups(Groups underlyingImplementation) {
+    public TestingGroups(Groups underlyingImplementation) {
       super(new Configuration());
       this.underlyingImplementation = underlyingImplementation;
     }
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/security/UserProvider.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/security/UserProvider.java
index 17796ee..efa18fb 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/security/UserProvider.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/security/UserProvider.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
 import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
 import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
@@ -56,6 +57,15 @@ public class UserProvider extends BaseConfigurable {
 
   static Groups groups = Groups.getUserToGroupsMappingService();
 
+  @VisibleForTesting
+  public static Groups getGroups() {
+    return groups;
+  }
+
+  public static void setGroups(Groups groups) {
+    UserProvider.groups = groups;
+  }
+
   @Override
   public void setConf(final Configuration conf) {
     super.setConf(conf);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessChecker.java
index 986efd7..c4957f9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessChecker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessChecker.java
@@ -28,12 +28,16 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.AuthUtil;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.security.AccessDeniedException;
+import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.security.access.Permission.Action;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.hadoop.security.Groups;
@@ -355,6 +359,40 @@ public final class AccessChecker {
     }
   }
 
+  /**
+   * Check if caller is granting or revoking superusers's or supergroups's permissions.
+   * @param request request name
+   * @param caller caller
+   * @param userToBeChecked target user or group
+   * @throws IOException AccessDeniedException if target user is superuser
+   */
+  public void performOnSuperuser(String request, User caller, String userToBeChecked)
+      throws IOException {
+    if (!authorizationEnabled) {
+      return;
+    }
+
+    List<String> userGroups = new ArrayList<>();
+    userGroups.add(userToBeChecked);
+    if (!AuthUtil.isGroupPrincipal(userToBeChecked)) {
+      for (String group : getUserGroups(userToBeChecked)) {
+        userGroups.add(AuthUtil.toGroupEntry(group));
+      }
+    }
+    for (String name : userGroups) {
+      if (Superusers.isSuperUser(name)) {
+        AuthResult result = AuthResult.deny(
+          request,
+          "Granting or revoking superusers's or supergroups's permissions is not allowed",
+          caller,
+          Action.ADMIN,
+          NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR);
+        logResult(result);
+        throw new AccessDeniedException(result.getReason());
+      }
+    }
+  }
+
   public void checkLockPermissions(User user, String namespace,
       TableName tableName, RegionInfo[] regionInfos, String reason)
       throws IOException {
@@ -466,7 +504,12 @@ public final class AccessChecker {
    */
   private void initGroupService(Configuration conf) {
     if (groupService == null) {
-      groupService = Groups.getUserToGroupsMappingService(conf);
+      if (conf.getBoolean(User.TestingGroups.TEST_CONF, false)) {
+        UserProvider.setGroups(new User.TestingGroups(UserProvider.getGroups()));
+        groupService = UserProvider.getGroups();
+      } else {
+        groupService = Groups.getUserToGroupsMappingService(conf);
+      }
     }
   }
 
@@ -480,7 +523,7 @@ public final class AccessChecker {
       return groupService.getGroups(user);
     } catch (IOException e) {
       LOG.error("Error occured while retrieving group for " + user, e);
-      return new ArrayList<String>();
+      return new ArrayList<>();
     }
   }
-}
\ No newline at end of file
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index 32fa16d..ff2d6aa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -2666,5 +2666,8 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
         break;
       default:
     }
+    if (!Superusers.isSuperUser(caller)) {
+      accessChecker.performOnSuperuser(request, caller, userPermission.getUser());
+    }
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AuthManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AuthManager.java
index 8da9a82..8719f62 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AuthManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AuthManager.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
 import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -46,7 +45,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 
 /**
  * Performs authorization checks for a given user's assigned permissions.
@@ -102,10 +100,10 @@ public final class AuthManager implements Closeable {
   PermissionCache<TablePermission> TBL_NO_PERMISSION = new PermissionCache<>();
 
   /**
-   * Cache for global permission.
-   * Since every user/group can only have one global permission, no need to user PermissionCache.
+   * Cache for global permission excluding superuser and supergroup.
+   * Since every user/group can only have one global permission, no need to use PermissionCache.
    */
-  private volatile Map<String, GlobalPermission> globalCache;
+  private Map<String, GlobalPermission> globalCache = new ConcurrentHashMap<>();
   /** Cache for namespace permission. */
   private ConcurrentHashMap<String, PermissionCache<NamespacePermission>> namespaceCache =
     new ConcurrentHashMap<>();
@@ -122,8 +120,6 @@ public final class AuthManager implements Closeable {
   private AuthManager(ZKWatcher watcher, Configuration conf)
       throws IOException {
     this.conf = conf;
-    // initialize global permissions based on configuration
-    globalCache = initGlobal(conf);
 
     this.zkperms = new ZKPermissionWatcher(watcher, this, conf);
     try {
@@ -138,30 +134,6 @@ public final class AuthManager implements Closeable {
     this.zkperms.close();
   }
 
-  /**
-   * Initialize with global permission assignments
-   * from the {@code hbase.superuser} configuration key.
-   */
-  private Map<String, GlobalPermission> initGlobal(Configuration conf) throws IOException {
-    UserProvider userProvider = UserProvider.instantiate(conf);
-    User user = userProvider.getCurrent();
-    if (user == null) {
-      throw new IOException("Unable to obtain the current user, " +
-        "authorization checks for internal operations will not work correctly!");
-    }
-    String currentUser = user.getShortName();
-
-    Map<String, GlobalPermission> global = new HashMap<>();
-    // the system user is always included
-    List<String> superusers = Lists.asList(currentUser, conf.getStrings(
-        Superusers.SUPERUSER_CONF_KEY, new String[0]));
-    for (String name : superusers) {
-      GlobalPermission globalPermission = new GlobalPermission(Permission.Action.values());
-      global.put(name, globalPermission);
-    }
-    return global;
-  }
-
   public ZKPermissionWatcher getZKPermissionWatcher() {
     return this.zkperms;
   }
@@ -219,19 +191,13 @@ public final class AuthManager implements Closeable {
    * @param globalPerms new global permissions
    */
   private void updateGlobalCache(ListMultimap<String, Permission> globalPerms) {
-    try {
-      Map<String, GlobalPermission> global = initGlobal(conf);
-      for (String name : globalPerms.keySet()) {
-        for (Permission permission : globalPerms.get(name)) {
-          global.put(name, (GlobalPermission) permission);
-        }
+    globalCache.clear();
+    for (String name : globalPerms.keySet()) {
+      for (Permission permission : globalPerms.get(name)) {
+        globalCache.put(name, (GlobalPermission) permission);
       }
-      globalCache = global;
-      mtime.incrementAndGet();
-    } catch (Exception e) {
-      // Never happens
-      LOG.error("Error occurred while updating the global cache", e);
     }
+    mtime.incrementAndGet();
   }
 
   /**
@@ -287,6 +253,9 @@ public final class AuthManager implements Closeable {
     if (user == null) {
       return false;
     }
+    if (Superusers.isSuperUser(user)) {
+      return true;
+    }
     if (authorizeGlobal(globalCache.get(user.getShortName()), action)) {
       return true;
     }
@@ -506,8 +475,8 @@ public final class AuthManager implements Closeable {
     try {
       List<Permission> perms = AccessControlLists.getCellPermissionsForUser(user, cell);
       if (LOG.isTraceEnabled()) {
-        LOG.trace("Perms for user " + user.getShortName() + " in cell " + cell + ": " +
-          (perms != null ? perms : ""));
+        LOG.trace("Perms for user {} in table {} in cell {}: {}",
+          user.getShortName(), table, cell, (perms != null ? perms : ""));
       }
       if (perms != null) {
         for (Permission p: perms) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
index e392b3b..5e6e3fd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
@@ -93,7 +93,11 @@ public class SecureTestUtil {
       sb.append(',');
       sb.append(currentUser); sb.append(".hfs."); sb.append(i);
     }
+    // Add a supergroup for improving test coverage.
+    sb.append(',').append("@supergroup");
     conf.set("hbase.superuser", sb.toString());
+    // hbase.group.service.for.test.only is used in test only.
+    conf.set(User.TestingGroups.TEST_CONF, "true");
   }
 
   public static void enableSecurity(Configuration conf) throws IOException {
@@ -383,6 +387,26 @@ public class SecureTestUtil {
   }
 
   /**
+   * Grant permissions globally to the given user. Will wait until all active
+   * AccessController instances have updated their permissions caches or will
+   * throw an exception upon timeout (10 seconds).
+   */
+  public static void grantGlobal(final User caller, final HBaseTestingUtility util,
+      final String user, final Permission.Action... actions) throws Exception {
+    SecureTestUtil.updateACLs(util, new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        Configuration conf = util.getConfiguration();
+        try (Connection connection = ConnectionFactory.createConnection(conf, caller)) {
+          connection.getAdmin().grant(new UserPermission(user, new GlobalPermission(actions)),
+            false);
+        }
+        return null;
+      }
+    });
+  }
+
+  /**
    * Revoke permissions globally from the given user. Will wait until all active
    * AccessController instances have updated their permissions caches or will
    * throw an exception upon timeout (10 seconds).
@@ -401,6 +425,25 @@ public class SecureTestUtil {
   }
 
   /**
+   * Revoke permissions globally from the given user. Will wait until all active
+   * AccessController instances have updated their permissions caches or will
+   * throw an exception upon timeout (10 seconds).
+   */
+  public static void revokeGlobal(final User caller, final HBaseTestingUtility util,
+      final String user, final Permission.Action... actions) throws Exception {
+    SecureTestUtil.updateACLs(util, new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        Configuration conf = util.getConfiguration();
+        try (Connection connection = ConnectionFactory.createConnection(conf, caller)) {
+          connection.getAdmin().revoke(new UserPermission(user, new GlobalPermission(actions)));
+        }
+        return null;
+      }
+    });
+  }
+
+  /**
    * Grant permissions on a namespace to the given user. Will wait until all active
    * AccessController instances have updated their permissions caches or will
    * throw an exception upon timeout (10 seconds).
@@ -420,6 +463,27 @@ public class SecureTestUtil {
   }
 
   /**
+   * Grant permissions on a namespace to the given user. Will wait until all active
+   * AccessController instances have updated their permissions caches or will
+   * throw an exception upon timeout (10 seconds).
+   */
+  public static void grantOnNamespace(final User caller, final HBaseTestingUtility util,
+      final String user, final String namespace,
+      final Permission.Action... actions) throws Exception {
+    SecureTestUtil.updateACLs(util, new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        Configuration conf = util.getConfiguration();
+        try (Connection connection = ConnectionFactory.createConnection(conf, caller)) {
+          connection.getAdmin()
+            .grant(new UserPermission(user, new NamespacePermission(namespace, actions)), false);
+        }
+        return null;
+      }
+    });
+  }
+
+  /**
    * Grant permissions on a namespace to the given user using AccessControl Client.
    * Will wait until all active AccessController instances have updated their permissions caches
    * or will throw an exception upon timeout (10 seconds).
@@ -481,6 +545,27 @@ public class SecureTestUtil {
   }
 
   /**
+   * Revoke permissions on a namespace from the given user. Will wait until all active
+   * AccessController instances have updated their permissions caches or will
+   * throw an exception upon timeout (10 seconds).
+   */
+  public static void revokeFromNamespace(final User caller, final HBaseTestingUtility util,
+      final String user, final String namespace,
+      final Permission.Action... actions) throws Exception {
+    SecureTestUtil.updateACLs(util, new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        Configuration conf = util.getConfiguration();
+        try (Connection connection = ConnectionFactory.createConnection(conf, caller)) {
+          connection.getAdmin()
+            .revoke(new UserPermission(user, new NamespacePermission(namespace, actions)));
+        }
+        return null;
+      }
+    });
+  }
+
+  /**
    * Grant permissions on a table to the given user. Will wait until all active
    * AccessController instances have updated their permissions caches or will
    * throw an exception upon timeout (10 seconds).
@@ -502,6 +587,28 @@ public class SecureTestUtil {
   }
 
   /**
+   * Grant permissions on a table to the given user. Will wait until all active
+   * AccessController instances have updated their permissions caches or will
+   * throw an exception upon timeout (10 seconds).
+   */
+  public static void grantOnTable(final User caller, final HBaseTestingUtility util,
+      final String user, final TableName table, final byte[] family, final byte[] qualifier,
+      final Permission.Action... actions) throws Exception {
+    SecureTestUtil.updateACLs(util, new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        Configuration conf = util.getConfiguration();
+        try (Connection connection = ConnectionFactory.createConnection(conf, caller)) {
+          connection.getAdmin().grant(
+            new UserPermission(user, new TablePermission(table, family, qualifier, actions)),
+            false);
+        }
+        return null;
+      }
+    });
+  }
+
+  /**
    * Grant permissions on a table to the given user using AccessControlClient. Will wait until all
    * active AccessController instances have updated their permissions caches or will
    * throw an exception upon timeout (10 seconds).
@@ -564,6 +671,27 @@ public class SecureTestUtil {
   }
 
   /**
+   * Revoke permissions on a table from the given user. Will wait until all active
+   * AccessController instances have updated their permissions caches or will
+   * throw an exception upon timeout (10 seconds).
+   */
+  public static void revokeFromTable(final User caller, final HBaseTestingUtility util,
+      final String user, final TableName table, final byte[] family, final byte[] qualifier,
+      final Permission.Action... actions) throws Exception {
+    SecureTestUtil.updateACLs(util, new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        Configuration conf = util.getConfiguration();
+        try (Connection connection = ConnectionFactory.createConnection(conf, caller)) {
+          connection.getAdmin().revoke(
+            new UserPermission(user, new TablePermission(table, family, qualifier, actions)));
+        }
+        return null;
+      }
+    });
+  }
+
+  /**
    * Revoke permissions on a table from the given user using AccessControlClient. Will wait until
    * all active AccessController instances have updated their permissions caches or will
    * throw an exception upon timeout (10 seconds).
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestRpcAccessChecks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestRpcAccessChecks.java
index 5aa9ed6..4fb5242 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestRpcAccessChecks.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestRpcAccessChecks.java
@@ -1,4 +1,3 @@
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -16,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.hadoop.hbase.security.access;
 
 import static org.apache.hadoop.hbase.AuthUtil.toGroupEntry;
@@ -32,6 +32,7 @@ import java.security.PrivilegedExceptionAction;
 import java.util.Collections;
 import java.util.HashMap;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.AuthUtil;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -103,6 +104,10 @@ public class TestRpcAccessChecks {
   private static User USER_ADMIN;
   // user without admin permissions
   private static User USER_NON_ADMIN;
+  // user in supergroup
+  private static User USER_IN_SUPERGROUPS;
+  // user with global permission but not a superuser
+  private static User USER_ADMIN_NOT_SUPER;
 
   private static final String GROUP_ADMIN = "admin_group";
   private static User USER_GROUP_ADMIN;
@@ -135,23 +140,26 @@ public class TestRpcAccessChecks {
 
     // Enable security
     enableSecurity(conf);
-    TEST_UTIL.startMiniCluster();
-
-    // Wait for the ACL table to become available
-    TEST_UTIL.waitUntilAllRegionsAssigned(AccessControlLists.ACL_TABLE_NAME);
 
     // Create users
+    // admin is superuser as well.
     USER_ADMIN = User.createUserForTesting(conf, "admin", new String[0]);
     USER_NON_ADMIN = User.createUserForTesting(conf, "non_admin", new String[0]);
     USER_GROUP_ADMIN =
         User.createUserForTesting(conf, "user_group_admin", new String[] { GROUP_ADMIN });
+    USER_IN_SUPERGROUPS =
+        User.createUserForTesting(conf, "user_in_supergroup", new String[] { "supergroup" });
+    USER_ADMIN_NOT_SUPER = User.createUserForTesting(conf, "normal_admin", new String[0]);
 
-    // Assign permissions to users and groups
-    SecureTestUtil.grantGlobal(TEST_UTIL, USER_ADMIN.getShortName(),
-      Permission.Action.ADMIN, Permission.Action.CREATE);
+    TEST_UTIL.startMiniCluster();
+    // Wait for the ACL table to become available
+    TEST_UTIL.waitUntilAllRegionsAssigned(AccessControlLists.ACL_TABLE_NAME);
+
+    // Assign permissions to groups
     SecureTestUtil.grantGlobal(TEST_UTIL, toGroupEntry(GROUP_ADMIN),
       Permission.Action.ADMIN, Permission.Action.CREATE);
-    // No permissions to USER_NON_ADMIN
+    SecureTestUtil.grantGlobal(TEST_UTIL, USER_ADMIN_NOT_SUPER.getShortName(),
+      Permission.Action.ADMIN);
   }
 
   interface Action {
@@ -361,4 +369,77 @@ public class TestRpcAccessChecks {
     };
     verifyAllowed(USER_NON_ADMIN, userAction);
   }
+
+  @Test
+  public void testGrantDeniedOnSuperUsersGroups() {
+    /** User */
+    try {
+      // Global
+      SecureTestUtil.grantGlobal(USER_ADMIN_NOT_SUPER, TEST_UTIL, USER_ADMIN.getShortName(),
+        Permission.Action.ADMIN, Permission.Action.CREATE);
+      fail("Granting superuser's global permissions is not allowed.");
+    } catch (Exception e) {
+    }
+    try {
+      // Namespace
+      SecureTestUtil.grantOnNamespace(USER_ADMIN_NOT_SUPER, TEST_UTIL, USER_ADMIN.getShortName(),
+        TEST_NAME.getMethodName(),
+        Permission.Action.ADMIN, Permission.Action.CREATE);
+      fail("Granting superuser's namespace permissions is not allowed.");
+    } catch (Exception e) {
+    }
+    try {
+      // Table
+      SecureTestUtil.grantOnTable(USER_ADMIN_NOT_SUPER, TEST_UTIL, USER_ADMIN.getName(),
+        TableName.valueOf(TEST_NAME.getMethodName()), null, null,
+        Permission.Action.ADMIN, Permission.Action.CREATE);
+      fail("Granting superuser's table permissions is not allowed.");
+    } catch (Exception e) {
+    }
+
+    /** Group */
+    try {
+      SecureTestUtil.grantGlobal(USER_ADMIN_NOT_SUPER, TEST_UTIL,
+        USER_IN_SUPERGROUPS.getShortName(), Permission.Action.ADMIN, Permission.Action.CREATE);
+      fail("Granting superuser's global permissions is not allowed.");
+    } catch (Exception e) {
+    }
+  }
+
+  @Test
+  public void testRevokeDeniedOnSuperUsersGroups() {
+    /** User */
+    try {
+      // Global
+      SecureTestUtil.revokeGlobal(USER_ADMIN_NOT_SUPER, TEST_UTIL, USER_ADMIN.getShortName(),
+        Permission.Action.ADMIN);
+      fail("Revoking superuser's global permissions is not allowed.");
+    } catch (Exception e) {
+    }
+    try {
+      // Namespace
+      SecureTestUtil.revokeFromNamespace(USER_ADMIN_NOT_SUPER, TEST_UTIL, USER_ADMIN.getShortName(),
+        TEST_NAME.getMethodName(), Permission.Action.ADMIN);
+      fail("Revoking superuser's namespace permissions is not allowed.");
+    } catch (Exception e) {
+    }
+    try {
+      // Table
+      SecureTestUtil.revokeFromTable(USER_ADMIN_NOT_SUPER, TEST_UTIL, USER_ADMIN.getName(),
+        TableName.valueOf(TEST_NAME.getMethodName()), null, null,
+        Permission.Action.ADMIN);
+      fail("Revoking superuser's table permissions is not allowed.");
+    } catch (Exception e) {
+    }
+
+    /** Group */
+    try {
+      // Global revoke
+      SecureTestUtil.revokeGlobal(USER_ADMIN_NOT_SUPER, TEST_UTIL,
+        AuthUtil.toGroupEntry("supergroup"),
+        Permission.Action.ADMIN, Permission.Action.CREATE);
+      fail("Revoking supergroup's permissions is not allowed.");
+    } catch (Exception e) {
+    }
+  }
 }