You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zy...@apache.org on 2018/07/10 04:38:40 UTC

[18/50] [abbrv] hbase git commit: HBASE-20357 AccessControlClient API Enhancement

http://git-wip-us.apache.org/repos/asf/hbase/blob/bb8826ca/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
index d2aa682..a0b5d9d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
@@ -38,6 +38,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -127,6 +128,9 @@ import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.security.GroupMappingServiceProvider;
+import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -212,6 +216,11 @@ public class TestAccessController extends SecureTestUtil {
     conf = TEST_UTIL.getConfiguration();
     // Up the handlers; this test needs more than usual.
     conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
+
+    conf.set(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+      MyShellBasedUnixGroupsMapping.class.getName());
+    UserGroupInformation.setConfiguration(conf);
+
     // Enable security
     enableSecurity(conf);
     // In this particular test case, we can't use SecureBulkLoadEndpoint because its doAs will fail
@@ -2215,8 +2224,12 @@ public class TestAccessController extends SecureTestUtil {
   }
 
   private void createTestTable(TableName tname) throws Exception {
+    createTestTable(tname, TEST_FAMILY);
+  }
+
+  private void createTestTable(TableName tname, byte[] cf) throws Exception {
     HTableDescriptor htd = new HTableDescriptor(tname);
-    HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAMILY);
+    HColumnDescriptor hcd = new HColumnDescriptor(cf);
     hcd.setMaxVersions(100);
     htd.addFamily(hcd);
     htd.setOwner(USER_OWNER);
@@ -3105,4 +3118,470 @@ public class TestAccessController extends SecureTestUtil {
     verifyAllowed(action, SUPERUSER);
     verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_OWNER, USER_ADMIN);
   }
+
+  @Test(timeout = 180000)
+  public void testGetUserPermissions() throws Throwable {
+    Connection conn = null;
+    try {
+      conn = ConnectionFactory.createConnection(conf);
+      User nSUser1 = User.createUserForTesting(conf, "nsuser1", new String[0]);
+      User nSUser2 = User.createUserForTesting(conf, "nsuser2", new String[0]);
+      User nSUser3 = User.createUserForTesting(conf, "nsuser3", new String[0]);
+
+      // Global access groups
+      User globalGroupUser1 =
+          User.createUserForTesting(conf, "globalGroupUser1", new String[] { "group_admin" });
+      User globalGroupUser2 = User.createUserForTesting(conf, "globalGroupUser2",
+        new String[] { "group_admin", "group_create" });
+      // Namespace access groups
+      User nsGroupUser1 =
+          User.createUserForTesting(conf, "nsGroupUser1", new String[] { "ns_group1" });
+      User nsGroupUser2 =
+          User.createUserForTesting(conf, "nsGroupUser2", new String[] { "ns_group2" });
+      // table Access groups
+      User tableGroupUser1 =
+          User.createUserForTesting(conf, "tableGroupUser1", new String[] { "table_group1" });
+      User tableGroupUser2 =
+          User.createUserForTesting(conf, "tableGroupUser2", new String[] { "table_group2" });
+
+      // Create namespaces
+      String nsPrefix = "testNS";
+      final String namespace1 = nsPrefix + "1";
+      NamespaceDescriptor desc1 = NamespaceDescriptor.create(namespace1).build();
+      createNamespace(TEST_UTIL, desc1);
+      String namespace2 = nsPrefix + "2";
+      NamespaceDescriptor desc2 = NamespaceDescriptor.create(namespace2).build();
+      createNamespace(TEST_UTIL, desc2);
+
+      // Grant namespace permission
+      grantOnNamespace(TEST_UTIL, nSUser1.getShortName(), namespace1, Permission.Action.ADMIN);
+      grantOnNamespace(TEST_UTIL, nSUser3.getShortName(), namespace1, Permission.Action.READ);
+      grantOnNamespace(TEST_UTIL, toGroupEntry("ns_group1"), namespace1, Permission.Action.ADMIN);
+      grantOnNamespace(TEST_UTIL, nSUser2.getShortName(), namespace2, Permission.Action.ADMIN);
+      grantOnNamespace(TEST_UTIL, nSUser3.getShortName(), namespace2, Permission.Action.ADMIN);
+      grantOnNamespace(TEST_UTIL, toGroupEntry("ns_group2"), namespace2, Permission.Action.READ,
+        Permission.Action.WRITE);
+
+      // Create tables
+      TableName table1 = TableName.valueOf(namespace1 + TableName.NAMESPACE_DELIM + "t1");
+      TableName table2 = TableName.valueOf(namespace2 + TableName.NAMESPACE_DELIM + "t2");
+      byte[] TEST_FAMILY2 = Bytes.toBytes("f2");
+      byte[] TEST_QUALIFIER2 = Bytes.toBytes("q2");
+      createTestTable(table1, TEST_FAMILY);
+      createTestTable(table2, TEST_FAMILY2);
+
+      // Grant table permissions
+      grantOnTable(TEST_UTIL, toGroupEntry("table_group1"), table1, null, null,
+        Permission.Action.ADMIN);
+      grantOnTable(TEST_UTIL, USER_ADMIN.getShortName(), table1, null, null,
+        Permission.Action.ADMIN);
+      grantOnTable(TEST_UTIL, USER_ADMIN_CF.getShortName(), table1, TEST_FAMILY, null,
+        Permission.Action.ADMIN);
+      grantOnTable(TEST_UTIL, USER_RW.getShortName(), table1, TEST_FAMILY, TEST_QUALIFIER,
+        Permission.Action.READ);
+      grantOnTable(TEST_UTIL, USER_RW.getShortName(), table1, TEST_FAMILY, TEST_QUALIFIER2,
+        Permission.Action.WRITE);
+
+      grantOnTable(TEST_UTIL, toGroupEntry("table_group2"), table2, null, null,
+        Permission.Action.ADMIN);
+      grantOnTable(TEST_UTIL, USER_ADMIN.getShortName(), table2, null, null,
+        Permission.Action.ADMIN);
+      grantOnTable(TEST_UTIL, USER_ADMIN_CF.getShortName(), table2, TEST_FAMILY2, null,
+        Permission.Action.ADMIN);
+      grantOnTable(TEST_UTIL, USER_RW.getShortName(), table2, TEST_FAMILY2, TEST_QUALIFIER,
+        Permission.Action.READ);
+      grantOnTable(TEST_UTIL, USER_RW.getShortName(), table2, TEST_FAMILY2, TEST_QUALIFIER2,
+        Permission.Action.WRITE);
+
+      List<UserPermission> userPermissions = null;
+      Collection<String> superUsers = Superusers.getSuperUsers();
+      int superUserCount = superUsers.size();
+
+      // Global User ACL
+      validateGlobalUserACLForGetUserPermissions(conn, nSUser1, globalGroupUser1, globalGroupUser2,
+        superUsers, superUserCount);
+
+      // Namespace ACL
+      validateNamespaceUserACLForGetUserPermissions(conn, nSUser1, nSUser3, nsGroupUser1,
+        nsGroupUser2, nsPrefix, namespace1, namespace2);
+
+      // Table + Users
+      validateTableACLForGetUserPermissions(conn, nSUser1, tableGroupUser1, tableGroupUser2,
+        nsPrefix, table1, table2, TEST_QUALIFIER2, superUsers);
+
+      // exception scenarios
+
+      try {
+        // test case with table name as null
+        assertEquals(3, AccessControlClient.getUserPermissions(conn, null, TEST_FAMILY).size());
+        fail("this should have thrown IllegalArgumentException");
+      } catch (IllegalArgumentException ex) {
+        // expected
+      }
+      try {
+        // test case with table name as emplty
+        assertEquals(3, AccessControlClient
+            .getUserPermissions(conn, HConstants.EMPTY_STRING, TEST_FAMILY).size());
+        fail("this should have thrown IllegalArgumentException");
+      } catch (IllegalArgumentException ex) {
+        // expected
+      }
+      try {
+        // test case with table name as namespace name
+        assertEquals(3,
+          AccessControlClient.getUserPermissions(conn, "@" + namespace2, TEST_FAMILY).size());
+        fail("this should have thrown IllegalArgumentException");
+      } catch (IllegalArgumentException ex) {
+        // expected
+      }
+
+      // Clean the table and namespace
+      deleteTable(TEST_UTIL, table1);
+      deleteTable(TEST_UTIL, table2);
+      deleteNamespace(TEST_UTIL, namespace1);
+      deleteNamespace(TEST_UTIL, namespace2);
+    } finally {
+      if (conn != null) {
+        conn.close();
+      }
+    }
+  }
+
+  @Test(timeout = 180000)
+  public void testHasPermission() throws Throwable {
+    Connection conn = null;
+    try {
+      conn = ConnectionFactory.createConnection(conf);
+      // Create user and set namespace ACL
+      User user1 = User.createUserForTesting(conf, "testHasPermissionUser1", new String[0]);
+      // Grant namespace permission
+      grantOnNamespaceUsingAccessControlClient(TEST_UTIL, conn, user1.getShortName(),
+        NamespaceDescriptor.DEFAULT_NAMESPACE.getName(), Permission.Action.ADMIN,
+        Permission.Action.CREATE, Permission.Action.READ);
+
+      // Create user and set table ACL
+      User user2 = User.createUserForTesting(conf, "testHasPermissionUser2", new String[0]);
+      // Grant namespace permission
+      grantOnTableUsingAccessControlClient(TEST_UTIL, conn, user2.getShortName(), TEST_TABLE,
+        TEST_FAMILY, TEST_QUALIFIER, Permission.Action.READ, Permission.Action.WRITE);
+
+      // Verify action privilege
+      AccessTestAction hasPermissionAction = new AccessTestAction() {
+        @Override
+        public Object run() throws Exception {
+          try (Connection conn = ConnectionFactory.createConnection(conf);
+              Table acl = conn.getTable(AccessControlLists.ACL_TABLE_NAME)) {
+            BlockingRpcChannel service = acl.coprocessorService(TEST_TABLE.getName());
+            AccessControlService.BlockingInterface protocol =
+                AccessControlService.newBlockingStub(service);
+            Permission.Action[] actions = { Permission.Action.READ, Permission.Action.WRITE };
+            AccessControlUtil.hasPermission(null, protocol, TEST_TABLE, TEST_FAMILY,
+              HConstants.EMPTY_BYTE_ARRAY, "dummy", actions);
+          }
+          return null;
+        }
+      };
+      verifyAllowed(hasPermissionAction, SUPERUSER, USER_ADMIN, USER_GROUP_ADMIN, USER_OWNER,
+        USER_ADMIN_CF, user1);
+      verifyDenied(hasPermissionAction, USER_CREATE, USER_RW, USER_RO, USER_NONE, user2);
+
+      // Check for global user
+      assertTrue(AccessControlClient.hasPermission(conn, TEST_TABLE.getNameAsString(),
+        HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, USER_ADMIN.getShortName(),
+        Permission.Action.READ, Permission.Action.WRITE, Permission.Action.CREATE,
+        Permission.Action.ADMIN));
+      assertFalse(AccessControlClient.hasPermission(conn, TEST_TABLE.getNameAsString(),
+        HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, USER_ADMIN.getShortName(),
+        Permission.Action.READ, Permission.Action.WRITE, Permission.Action.CREATE,
+        Permission.Action.ADMIN, Permission.Action.EXEC));
+
+      // Check for namespace access user
+      assertTrue(AccessControlClient.hasPermission(conn, TEST_TABLE.getNameAsString(),
+        HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, user1.getShortName(),
+        Permission.Action.ADMIN, Permission.Action.CREATE));
+      assertFalse(AccessControlClient.hasPermission(conn, TEST_TABLE.getNameAsString(),
+        HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, user1.getShortName(),
+        Permission.Action.ADMIN, Permission.Action.READ, Permission.Action.EXEC));
+
+      // Check for table owner
+      assertTrue(AccessControlClient.hasPermission(conn, TEST_TABLE.getNameAsString(),
+        HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, USER_OWNER.getShortName(),
+        Permission.Action.READ, Permission.Action.WRITE, Permission.Action.EXEC,
+        Permission.Action.CREATE, Permission.Action.ADMIN));
+
+      // Check for table user
+      assertTrue(AccessControlClient.hasPermission(conn, TEST_TABLE.getNameAsString(),
+        HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, USER_CREATE.getShortName(),
+        Permission.Action.READ, Permission.Action.WRITE));
+      assertFalse(AccessControlClient.hasPermission(conn, TEST_TABLE.getNameAsString(),
+        HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, USER_RO.getShortName(),
+        Permission.Action.READ, Permission.Action.WRITE));
+
+      // Check for family access user
+      assertTrue(AccessControlClient.hasPermission(conn, TEST_TABLE.getNameAsString(), TEST_FAMILY,
+        HConstants.EMPTY_BYTE_ARRAY, USER_RO.getShortName(), Permission.Action.READ));
+      assertTrue(AccessControlClient.hasPermission(conn, TEST_TABLE.getNameAsString(), TEST_FAMILY,
+        HConstants.EMPTY_BYTE_ARRAY, USER_RW.getShortName(), Permission.Action.READ,
+        Permission.Action.WRITE));
+      assertFalse(AccessControlClient.hasPermission(conn, TEST_TABLE.getNameAsString(),
+        HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, USER_ADMIN_CF.getShortName(),
+        Permission.Action.ADMIN, Permission.Action.CREATE));
+      assertTrue(AccessControlClient.hasPermission(conn, TEST_TABLE.getNameAsString(), TEST_FAMILY,
+        HConstants.EMPTY_BYTE_ARRAY, USER_ADMIN_CF.getShortName(), Permission.Action.ADMIN,
+        Permission.Action.CREATE));
+      assertFalse(AccessControlClient.hasPermission(conn, TEST_TABLE.getNameAsString(), TEST_FAMILY,
+        HConstants.EMPTY_BYTE_ARRAY, USER_ADMIN_CF.getShortName(), Permission.Action.READ));
+
+      // Check for qualifier access user
+      assertTrue(AccessControlClient.hasPermission(conn, TEST_TABLE.getNameAsString(), TEST_FAMILY,
+        TEST_QUALIFIER, user2.getShortName(), Permission.Action.READ, Permission.Action.WRITE));
+      assertFalse(AccessControlClient.hasPermission(conn, TEST_TABLE.getNameAsString(), TEST_FAMILY,
+        TEST_QUALIFIER, user2.getShortName(), Permission.Action.EXEC, Permission.Action.READ));
+      assertFalse(AccessControlClient.hasPermission(conn, TEST_TABLE.getNameAsString(),
+        HConstants.EMPTY_BYTE_ARRAY, TEST_QUALIFIER, USER_RW.getShortName(),
+        Permission.Action.WRITE, Permission.Action.READ));
+
+      // exception scenarios
+      try {
+        // test case with table name as null
+        assertTrue(AccessControlClient.hasPermission(conn, null, HConstants.EMPTY_BYTE_ARRAY,
+          HConstants.EMPTY_BYTE_ARRAY, null, Permission.Action.READ));
+        fail("this should have thrown IllegalArgumentException");
+      } catch (IllegalArgumentException ex) {
+        // expected
+      }
+      try {
+        // test case with username as null
+        assertTrue(AccessControlClient.hasPermission(conn, TEST_TABLE.getNameAsString(),
+          HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, null, Permission.Action.READ));
+        fail("this should have thrown IllegalArgumentException");
+      } catch (IllegalArgumentException ex) {
+        // expected
+      }
+
+      revokeFromNamespaceUsingAccessControlClient(TEST_UTIL, conn, user1.getShortName(),
+        NamespaceDescriptor.DEFAULT_NAMESPACE.getName(), Permission.Action.ADMIN,
+        Permission.Action.CREATE, Permission.Action.READ);
+      revokeFromTableUsingAccessControlClient(TEST_UTIL, conn, user2.getShortName(), TEST_TABLE,
+        TEST_FAMILY, TEST_QUALIFIER, Permission.Action.READ, Permission.Action.WRITE);
+    } finally {
+      if (conn != null) {
+        conn.close();
+      }
+    }
+  }
+
+  /*
+   * Validate Global User ACL
+   */
+  private void validateGlobalUserACLForGetUserPermissions(final Connection conn, User nSUser1,
+      User globalGroupUser1, User globalGroupUser2, Collection<String> superUsers,
+      int superUserCount) throws Throwable {
+    // Verify action privilege
+    AccessTestAction globalUserPermissionAction = new AccessTestAction() {
+      @Override
+      public Object run() throws Exception {
+        try (Connection conn = ConnectionFactory.createConnection(conf);
+            Table acl = conn.getTable(AccessControlLists.ACL_TABLE_NAME)) {
+          BlockingRpcChannel service = acl.coprocessorService(TEST_TABLE.getName());
+          AccessControlService.BlockingInterface protocol =
+              AccessControlService.newBlockingStub(service);
+          AccessControlUtil.getUserPermissions(null, protocol, "dummy");
+        }
+        return null;
+      }
+    };
+    verifyAllowed(globalUserPermissionAction, SUPERUSER, USER_ADMIN, USER_GROUP_ADMIN);
+    verifyDenied(globalUserPermissionAction, USER_GROUP_CREATE, USER_GROUP_READ, USER_GROUP_WRITE);
+
+    // Validate global user permission
+    List<UserPermission> userPermissions;
+    assertEquals(5 + superUserCount, AccessControlClient.getUserPermissions(conn, null).size());
+    assertEquals(5 + superUserCount,
+      AccessControlClient.getUserPermissions(conn, HConstants.EMPTY_STRING).size());
+    assertEquals(5 + superUserCount,
+      AccessControlClient.getUserPermissions(conn, null, HConstants.EMPTY_STRING).size());
+    userPermissions = AccessControlClient.getUserPermissions(conn, null, USER_ADMIN.getName());
+    verifyGetUserPermissionResult(userPermissions, 1, null, null, USER_ADMIN.getName(), superUsers);
+    assertEquals(0, AccessControlClient.getUserPermissions(conn, null, nSUser1.getName()).size());
+    // Global group user ACL
+    assertEquals(1,
+      AccessControlClient.getUserPermissions(conn, null, globalGroupUser1.getName()).size());
+    assertEquals(2,
+      AccessControlClient.getUserPermissions(conn, null, globalGroupUser2.getName()).size());
+  }
+
+  /*
+   * Validate Namespace User ACL
+   */
+  private void validateNamespaceUserACLForGetUserPermissions(final Connection conn, User nSUser1,
+      User nSUser3, User nsGroupUser1, User nsGroupUser2, String nsPrefix, final String namespace1,
+      String namespace2) throws Throwable {
+    AccessTestAction namespaceUserPermissionAction = new AccessTestAction() {
+      @Override
+      public Object run() throws Exception {
+        try (Connection conn = ConnectionFactory.createConnection(conf);
+            Table acl = conn.getTable(AccessControlLists.ACL_TABLE_NAME)) {
+          BlockingRpcChannel service = acl.coprocessorService(TEST_TABLE.getName());
+          AccessControlService.BlockingInterface protocol =
+              AccessControlService.newBlockingStub(service);
+          AccessControlUtil.getUserPermissions(null, protocol, Bytes.toBytes(namespace1), "dummy");
+        }
+        return null;
+      }
+    };
+    verifyAllowed(namespaceUserPermissionAction, SUPERUSER, USER_GROUP_ADMIN, USER_ADMIN, nSUser1,
+      nsGroupUser1);
+    verifyDenied(namespaceUserPermissionAction, USER_GROUP_CREATE, USER_GROUP_READ,
+      USER_GROUP_WRITE, nSUser3, nsGroupUser2);
+
+    List<UserPermission> userPermissions;
+    assertEquals(6, AccessControlClient.getUserPermissions(conn, "@" + nsPrefix + ".*").size());
+    assertEquals(3, AccessControlClient.getUserPermissions(conn, "@" + namespace1).size());
+    assertEquals(3, AccessControlClient
+        .getUserPermissions(conn, "@" + namespace1, HConstants.EMPTY_STRING).size());
+    userPermissions =
+        AccessControlClient.getUserPermissions(conn, "@" + namespace1, nSUser1.getName());
+    verifyGetUserPermissionResult(userPermissions, 1, null, null, nSUser1.getName(), null);
+    userPermissions =
+        AccessControlClient.getUserPermissions(conn, "@" + namespace1, nSUser3.getName());
+    verifyGetUserPermissionResult(userPermissions, 1, null, null, nSUser3.getName(), null);
+    assertEquals(0,
+      AccessControlClient.getUserPermissions(conn, "@" + namespace1, USER_ADMIN.getName()).size());
+    // Namespace group user ACL
+    assertEquals(1, AccessControlClient
+        .getUserPermissions(conn, "@" + namespace1, nsGroupUser1.getName()).size());
+    assertEquals(1, AccessControlClient
+        .getUserPermissions(conn, "@" + namespace2, nsGroupUser2.getName()).size());
+  }
+
+  /*
+   * Validate Table User ACL
+   */
+  private void validateTableACLForGetUserPermissions(final Connection conn, User nSUser1,
+      User tableGroupUser1, User tableGroupUser2, String nsPrefix, TableName table1,
+      TableName table2, byte[] TEST_QUALIFIER2, Collection<String> superUsers) throws Throwable {
+    AccessTestAction tableUserPermissionAction = new AccessTestAction() {
+      @Override
+      public Object run() throws Exception {
+        try (Connection conn = ConnectionFactory.createConnection(conf);
+            Table acl = conn.getTable(AccessControlLists.ACL_TABLE_NAME)) {
+          BlockingRpcChannel service = acl.coprocessorService(TEST_TABLE.getName());
+          AccessControlService.BlockingInterface protocol =
+              AccessControlService.newBlockingStub(service);
+          AccessControlUtil.getUserPermissions(null, protocol, TEST_TABLE, TEST_FAMILY,
+            TEST_QUALIFIER, "dummy");
+        }
+        return null;
+      }
+    };
+    verifyAllowed(tableUserPermissionAction, SUPERUSER, USER_ADMIN, USER_OWNER, USER_ADMIN_CF);
+    verifyDenied(tableUserPermissionAction, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_CREATE);
+
+    List<UserPermission> userPermissions;
+    assertEquals(12, AccessControlClient.getUserPermissions(conn, nsPrefix + ".*").size());
+    assertEquals(6, AccessControlClient.getUserPermissions(conn, table1.getNameAsString()).size());
+    assertEquals(6, AccessControlClient
+        .getUserPermissions(conn, table1.getNameAsString(), HConstants.EMPTY_STRING).size());
+    userPermissions = AccessControlClient.getUserPermissions(conn, table1.getNameAsString(),
+      USER_ADMIN_CF.getName());
+    verifyGetUserPermissionResult(userPermissions, 1, null, null, USER_ADMIN_CF.getName(), null);
+    assertEquals(0, AccessControlClient
+        .getUserPermissions(conn, table1.getNameAsString(), nSUser1.getName()).size());
+    // Table group user ACL
+    assertEquals(1, AccessControlClient
+        .getUserPermissions(conn, table1.getNameAsString(), tableGroupUser1.getName()).size());
+    assertEquals(1, AccessControlClient
+        .getUserPermissions(conn, table2.getNameAsString(), tableGroupUser2.getName()).size());
+
+    // Table Users + CF
+    assertEquals(12, AccessControlClient
+        .getUserPermissions(conn, nsPrefix + ".*", HConstants.EMPTY_BYTE_ARRAY).size());
+    userPermissions = AccessControlClient.getUserPermissions(conn, nsPrefix + ".*", TEST_FAMILY);
+    verifyGetUserPermissionResult(userPermissions, 3, TEST_FAMILY, null, null, null);
+    assertEquals(0, AccessControlClient
+        .getUserPermissions(conn, table1.getNameAsString(), Bytes.toBytes("dummmyCF")).size());
+
+    // Table Users + CF + User
+    assertEquals(3,
+      AccessControlClient
+          .getUserPermissions(conn, table1.getNameAsString(), TEST_FAMILY, HConstants.EMPTY_STRING)
+          .size());
+    userPermissions = AccessControlClient.getUserPermissions(conn, table1.getNameAsString(),
+      TEST_FAMILY, USER_ADMIN_CF.getName());
+    verifyGetUserPermissionResult(userPermissions, 1, null, null, USER_ADMIN_CF.getName(),
+      superUsers);
+    assertEquals(0, AccessControlClient
+        .getUserPermissions(conn, table1.getNameAsString(), TEST_FAMILY, nSUser1.getName()).size());
+
+    // Table Users + CF + CQ
+    assertEquals(3, AccessControlClient.getUserPermissions(conn, table1.getNameAsString(),
+      TEST_FAMILY, HConstants.EMPTY_BYTE_ARRAY).size());
+    assertEquals(1, AccessControlClient
+        .getUserPermissions(conn, table1.getNameAsString(), TEST_FAMILY, TEST_QUALIFIER).size());
+    assertEquals(1, AccessControlClient
+        .getUserPermissions(conn, table1.getNameAsString(), TEST_FAMILY, TEST_QUALIFIER2).size());
+    assertEquals(2, AccessControlClient.getUserPermissions(conn, table1.getNameAsString(),
+      HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, USER_RW.getName()).size());
+    assertEquals(0, AccessControlClient
+        .getUserPermissions(conn, table1.getNameAsString(), TEST_FAMILY, Bytes.toBytes("dummmyCQ"))
+        .size());
+
+    // Table Users + CF + CQ + User
+    assertEquals(3, AccessControlClient.getUserPermissions(conn, table1.getNameAsString(),
+      TEST_FAMILY, HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_STRING).size());
+    assertEquals(1, AccessControlClient.getUserPermissions(conn, table1.getNameAsString(),
+      TEST_FAMILY, TEST_QUALIFIER, USER_RW.getName()).size());
+    assertEquals(1, AccessControlClient.getUserPermissions(conn, table1.getNameAsString(),
+      TEST_FAMILY, TEST_QUALIFIER2, USER_RW.getName()).size());
+    assertEquals(0, AccessControlClient.getUserPermissions(conn, table1.getNameAsString(),
+      TEST_FAMILY, TEST_QUALIFIER2, nSUser1.getName()).size());
+  }
+
+  /*
+   * Validate the user permission against the specified column family, column qualifier and user
+   * name.
+   */
+  private void verifyGetUserPermissionResult(List<UserPermission> userPermissions, int resultCount,
+      byte[] cf, byte[] cq, String userName, Collection<String> superUsers) {
+    assertEquals(resultCount, userPermissions.size());
+
+    for (UserPermission perm : userPermissions) {
+      if (cf != null) {
+        assertTrue(Bytes.equals(cf, perm.getFamily()));
+      }
+      if (cq != null) {
+        assertTrue(Bytes.equals(cq, perm.getQualifier()));
+      }
+      if (userName != null
+          && (superUsers == null || !superUsers.contains(Bytes.toString(perm.getUser())))) {
+        assertTrue(userName.equals(Bytes.toString(perm.getUser())));
+      }
+    }
+  }
+
+  /*
+   * Dummy ShellBasedUnixGroupsMapping class to retrieve the groups for the test users.
+   */
+  public static class MyShellBasedUnixGroupsMapping extends ShellBasedUnixGroupsMapping
+      implements GroupMappingServiceProvider {
+    @Override
+    public List<String> getGroups(String user) throws IOException {
+      if (user.equals("globalGroupUser1")) {
+        return Arrays.asList(new String[] { "group_admin" });
+      } else if (user.equals("globalGroupUser2")) {
+        return Arrays.asList(new String[] { "group_admin", "group_create" });
+      } else if (user.equals("nsGroupUser1")) {
+        return Arrays.asList(new String[] { "ns_group1" });
+      } else if (user.equals("nsGroupUser2")) {
+        return Arrays.asList(new String[] { "ns_group2" });
+      } else if (user.equals("tableGroupUser1")) {
+        return Arrays.asList(new String[] { "table_group1" });
+      } else if (user.equals("tableGroupUser2")) {
+        return Arrays.asList(new String[] { "table_group2" });
+      } else {
+        return super.getGroups(user);
+      }
+    }
+  }
 }