You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by re...@apache.org on 2018/11/15 03:37:31 UTC

[1/3] hbase git commit: HBASE-21255 [acl] Refactor TablePermission into three classes (Global, Namespace, Table)

Repository: hbase
Updated Branches:
  refs/heads/master 9e42a9e31 -> 130057f13


http://git-wip-us.apache.org/repos/asf/hbase/blob/130057f1/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotDescriptionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotDescriptionUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotDescriptionUtils.java
index fb22ac0..0c1e761 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotDescriptionUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotDescriptionUtils.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.access.AccessControlLists;
 import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
-import org.apache.hadoop.hbase.security.access.TablePermission;
+import org.apache.hadoop.hbase.security.access.UserPermission;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -435,10 +435,10 @@ public final class SnapshotDescriptionUtils {
 
   private static SnapshotDescription writeAclToSnapshotDescription(SnapshotDescription snapshot,
       Configuration conf) throws IOException {
-    ListMultimap<String, TablePermission> perms =
-        User.runAsLoginUser(new PrivilegedExceptionAction<ListMultimap<String, TablePermission>>() {
+    ListMultimap<String, UserPermission> perms =
+        User.runAsLoginUser(new PrivilegedExceptionAction<ListMultimap<String, UserPermission>>() {
           @Override
-          public ListMultimap<String, TablePermission> run() throws Exception {
+          public ListMultimap<String, UserPermission> run() throws Exception {
             return AccessControlLists.getTablePermissions(conf,
               TableName.valueOf(snapshot.getTable()));
           }

http://git-wip-us.apache.org/repos/asf/hbase/blob/130057f1/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
index 78bb5f6..1b70054 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
@@ -274,7 +274,7 @@ public class TestAccessController extends SecureTestUtil {
   public static void tearDownAfterClass() throws Exception {
     cleanUp();
     TEST_UTIL.shutdownMiniCluster();
-    int total = TableAuthManager.getTotalRefCount();
+    int total = AuthManager.getTotalRefCount();
     assertTrue("Unexpected reference count: " + total, total == 0);
   }
 
@@ -1634,12 +1634,12 @@ public class TestAccessController extends SecureTestUtil {
       }
 
       UserPermission ownerperm =
-          new UserPermission(Bytes.toBytes(USER_OWNER.getName()), tableName, null, Action.values());
+          new UserPermission(USER_OWNER.getName(), tableName, Action.values());
       assertTrue("Owner should have all permissions on table",
         hasFoundUserPermission(ownerperm, perms));
 
       User user = User.createUserForTesting(TEST_UTIL.getConfiguration(), "user", new String[0]);
-      byte[] userName = Bytes.toBytes(user.getShortName());
+      String userName = user.getShortName();
 
       UserPermission up =
           new UserPermission(userName, tableName, family1, qualifier, Permission.Action.READ);
@@ -1725,7 +1725,7 @@ public class TestAccessController extends SecureTestUtil {
       }
 
       UserPermission newOwnerperm =
-          new UserPermission(Bytes.toBytes(newOwner.getName()), tableName, null, Action.values());
+          new UserPermission(newOwner.getName(), tableName, Action.values());
       assertTrue("New owner should have all permissions on table",
         hasFoundUserPermission(newOwnerperm, perms));
     } finally {
@@ -1749,12 +1749,10 @@ public class TestAccessController extends SecureTestUtil {
 
     Collection<String> superUsers = Superusers.getSuperUsers();
     List<UserPermission> adminPerms = new ArrayList<>(superUsers.size() + 1);
-    adminPerms.add(new UserPermission(Bytes.toBytes(USER_ADMIN.getShortName()),
-      AccessControlLists.ACL_TABLE_NAME, null, null, Bytes.toBytes("ACRW")));
-
+    adminPerms.add(new UserPermission(USER_ADMIN.getShortName(), Bytes.toBytes("ACRW")));
     for(String user: superUsers) {
-      adminPerms.add(new UserPermission(Bytes.toBytes(user), AccessControlLists.ACL_TABLE_NAME,
-          null, null, Action.values()));
+      // Global permission
+      adminPerms.add(new UserPermission(user, Action.values()));
     }
     assertTrue("Only super users, global users and user admin has permission on table hbase:acl " +
         "per setup", perms.size() == 5 + superUsers.size() &&
@@ -2432,7 +2430,7 @@ public class TestAccessController extends SecureTestUtil {
     verifyAllowed(getAction, testGrantRevoke);
     verifyDenied(putAction, testGrantRevoke);
 
-    // Grant global READ permissions to testGrantRevoke.
+    // Grant global WRITE permissions to testGrantRevoke.
     try {
       grantGlobalUsingAccessControlClient(TEST_UTIL, systemUserConnection, userName,
               Permission.Action.WRITE);
@@ -2757,8 +2755,11 @@ public class TestAccessController extends SecureTestUtil {
       assertTrue(namespacePermissions != null);
       assertEquals(expectedAmount, namespacePermissions.size());
       for (UserPermission namespacePermission : namespacePermissions) {
-        assertFalse(namespacePermission.isGlobal());  // Verify it is not a global user permission
-        assertEquals(expectedNamespace, namespacePermission.getNamespace());  // Verify namespace is set
+        // Verify it is not a global user permission
+        assertFalse(namespacePermission.getAccessScope() == Permission.Scope.GLOBAL);
+        // Verify namespace is set
+        NamespacePermission nsPerm = (NamespacePermission) namespacePermission.getPermission();
+        assertEquals(expectedNamespace, nsPerm.getNamespace());
       }
     } catch (Throwable thw) {
       throw new HBaseException(thw);
@@ -3125,8 +3126,8 @@ public class TestAccessController extends SecureTestUtil {
       Permission.Action[] expectedAction = { Action.READ };
       boolean userFound = false;
       for (UserPermission p : userPermissions) {
-        if (testUserPerms.getShortName().equals(Bytes.toString(p.getUser()))) {
-          assertArrayEquals(expectedAction, p.getActions());
+        if (testUserPerms.getShortName().equals(p.getUser())) {
+          assertArrayEquals(expectedAction, p.getPermission().getActions());
           userFound = true;
           break;
         }
@@ -3593,15 +3594,24 @@ public class TestAccessController extends SecureTestUtil {
     assertEquals(resultCount, userPermissions.size());
 
     for (UserPermission perm : userPermissions) {
-      if (cf != null) {
-        assertTrue(Bytes.equals(cf, perm.getFamily()));
-      }
-      if (cq != null) {
-        assertTrue(Bytes.equals(cq, perm.getQualifier()));
-      }
-      if (userName != null
-          && (superUsers == null || !superUsers.contains(Bytes.toString(perm.getUser())))) {
-        assertTrue(userName.equals(Bytes.toString(perm.getUser())));
+      if (perm.getPermission() instanceof TablePermission) {
+        TablePermission tablePerm = (TablePermission) perm.getPermission();
+        if (cf != null) {
+          assertTrue(Bytes.equals(cf, tablePerm.getFamily()));
+        }
+        if (cq != null) {
+          assertTrue(Bytes.equals(cq, tablePerm.getQualifier()));
+        }
+        if (userName != null
+          && (superUsers == null || !superUsers.contains(perm.getUser()))) {
+          assertTrue(userName.equals(perm.getUser()));
+        }
+      } else if (perm.getPermission() instanceof NamespacePermission ||
+          perm.getPermission() instanceof GlobalPermission) {
+        if (userName != null &&
+          (superUsers == null || !superUsers.contains(perm.getUser()))) {
+          assertTrue(userName.equals(perm.getUser()));
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/130057f1/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController2.java
index 21c1438..eb2a5ac 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController2.java
@@ -201,17 +201,17 @@ public class TestAccessController2 extends SecureTestUtil {
     TEST_UTIL.waitTableAvailable(TEST_TABLE.getTableName());
     // Verify that owner permissions have been granted to the test user on the
     // table just created
-    List<TablePermission> perms =
+    List<UserPermission> perms =
       AccessControlLists.getTablePermissions(conf, TEST_TABLE.getTableName())
        .get(testUser.getShortName());
     assertNotNull(perms);
     assertFalse(perms.isEmpty());
     // Should be RWXCA
-    assertTrue(perms.get(0).implies(Permission.Action.READ));
-    assertTrue(perms.get(0).implies(Permission.Action.WRITE));
-    assertTrue(perms.get(0).implies(Permission.Action.EXEC));
-    assertTrue(perms.get(0).implies(Permission.Action.CREATE));
-    assertTrue(perms.get(0).implies(Permission.Action.ADMIN));
+    assertTrue(perms.get(0).getPermission().implies(Permission.Action.READ));
+    assertTrue(perms.get(0).getPermission().implies(Permission.Action.WRITE));
+    assertTrue(perms.get(0).getPermission().implies(Permission.Action.EXEC));
+    assertTrue(perms.get(0).getPermission().implies(Permission.Action.CREATE));
+    assertTrue(perms.get(0).getPermission().implies(Permission.Action.ADMIN));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hbase/blob/130057f1/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController3.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController3.java
index 6ca2ef8..7b10e3f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController3.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController3.java
@@ -57,7 +57,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Performs checks for reference counting w.r.t. TableAuthManager which is used by
+ * Performs checks for reference counting w.r.t. AuthManager which is used by
  * AccessController.
  *
  * NOTE: Only one test in  here. In AMv2, there is problem deleting because

http://git-wip-us.apache.org/repos/asf/hbase/blob/130057f1/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestNamespaceCommands.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestNamespaceCommands.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestNamespaceCommands.java
index 66e37bc..d37794d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestNamespaceCommands.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestNamespaceCommands.java
@@ -206,11 +206,9 @@ public class TestNamespaceCommands extends SecureTestUtil {
     String userTestNamespace = "userTestNsp";
     Table acl = UTIL.getConnection().getTable(AccessControlLists.ACL_TABLE_NAME);
     try {
-      ListMultimap<String, TablePermission> perms =
-          AccessControlLists.getNamespacePermissions(conf, TEST_NAMESPACE);
-
-      perms = AccessControlLists.getNamespacePermissions(conf, TEST_NAMESPACE);
-      for (Map.Entry<String, TablePermission> entry : perms.entries()) {
+      ListMultimap<String, UserPermission> perms =
+        AccessControlLists.getNamespacePermissions(conf, TEST_NAMESPACE);
+      for (Map.Entry<String, UserPermission> entry : perms.entries()) {
         LOG.debug(Objects.toString(entry));
       }
       assertEquals(6, perms.size());
@@ -223,15 +221,13 @@ public class TestNamespaceCommands extends SecureTestUtil {
       assertTrue(result != null);
       perms = AccessControlLists.getNamespacePermissions(conf, TEST_NAMESPACE);
       assertEquals(7, perms.size());
-      List<TablePermission> namespacePerms = perms.get(userTestNamespace);
+      List<UserPermission> namespacePerms = perms.get(userTestNamespace);
       assertTrue(perms.containsKey(userTestNamespace));
       assertEquals(1, namespacePerms.size());
       assertEquals(TEST_NAMESPACE,
-        namespacePerms.get(0).getNamespace());
-      assertEquals(null, namespacePerms.get(0).getFamily());
-      assertEquals(null, namespacePerms.get(0).getQualifier());
-      assertEquals(1, namespacePerms.get(0).getActions().length);
-      assertEquals(Permission.Action.WRITE, namespacePerms.get(0).getActions()[0]);
+        ((NamespacePermission) namespacePerms.get(0).getPermission()).getNamespace());
+      assertEquals(1, namespacePerms.get(0).getPermission().getActions().length);
+      assertEquals(Permission.Action.WRITE, namespacePerms.get(0).getPermission().getActions()[0]);
 
       // Revoke and check state in ACL table
       revokeFromNamespace(UTIL, userTestNamespace, TEST_NAMESPACE,

http://git-wip-us.apache.org/repos/asf/hbase/blob/130057f1/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestRpcAccessChecks.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestRpcAccessChecks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestRpcAccessChecks.java
index 55873bb..5aa9ed6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestRpcAccessChecks.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestRpcAccessChecks.java
@@ -147,8 +147,10 @@ public class TestRpcAccessChecks {
         User.createUserForTesting(conf, "user_group_admin", new String[] { GROUP_ADMIN });
 
     // Assign permissions to users and groups
-    SecureTestUtil.grantGlobal(TEST_UTIL, USER_ADMIN.getShortName(), Permission.Action.ADMIN);
-    SecureTestUtil.grantGlobal(TEST_UTIL, toGroupEntry(GROUP_ADMIN), Permission.Action.ADMIN);
+    SecureTestUtil.grantGlobal(TEST_UTIL, USER_ADMIN.getShortName(),
+      Permission.Action.ADMIN, Permission.Action.CREATE);
+    SecureTestUtil.grantGlobal(TEST_UTIL, toGroupEntry(GROUP_ADMIN),
+      Permission.Action.ADMIN, Permission.Action.CREATE);
     // No permissions to USER_NON_ADMIN
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/130057f1/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java
index 7243690..1c478b2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java
@@ -143,26 +143,24 @@ public class TestTablePermissions {
     try (Connection connection = ConnectionFactory.createConnection(conf)) {
       // add some permissions
       addUserPermission(conf,
-          new UserPermission(Bytes.toBytes("george"), TEST_TABLE, null, (byte[])null,
-              UserPermission.Action.READ, UserPermission.Action.WRITE),
-              connection.getTable(AccessControlLists.ACL_TABLE_NAME));
+        new UserPermission("george", TEST_TABLE, Permission.Action.READ, Permission.Action.WRITE),
+        connection.getTable(AccessControlLists.ACL_TABLE_NAME));
       addUserPermission(conf,
-          new UserPermission(Bytes.toBytes("hubert"), TEST_TABLE, null, (byte[])null,
-              UserPermission.Action.READ),
-          connection.getTable(AccessControlLists.ACL_TABLE_NAME));
+        new UserPermission("hubert", TEST_TABLE, Permission.Action.READ),
+        connection.getTable(AccessControlLists.ACL_TABLE_NAME));
       addUserPermission(conf,
-          new UserPermission(Bytes.toBytes("humphrey"),
-              TEST_TABLE, TEST_FAMILY, TEST_QUALIFIER,
-              UserPermission.Action.READ),
-          connection.getTable(AccessControlLists.ACL_TABLE_NAME));
+        new UserPermission("humphrey", TEST_TABLE, TEST_FAMILY, TEST_QUALIFIER,
+          Permission.Action.READ),
+        connection.getTable(AccessControlLists.ACL_TABLE_NAME));
     }
     // retrieve the same
-    ListMultimap<String,TablePermission> perms =
+    ListMultimap<String, UserPermission> perms =
         AccessControlLists.getTablePermissions(conf, TEST_TABLE);
-    List<TablePermission> userPerms = perms.get("george");
+    List<UserPermission> userPerms = perms.get("george");
     assertNotNull("Should have permissions for george", userPerms);
     assertEquals("Should have 1 permission for george", 1, userPerms.size());
-    TablePermission permission = userPerms.get(0);
+    assertEquals(Permission.Scope.TABLE, userPerms.get(0).getAccessScope());
+    TablePermission permission = (TablePermission) userPerms.get(0).getPermission();
     assertEquals("Permission should be for " + TEST_TABLE,
         TEST_TABLE, permission.getTableName());
     assertNull("Column family should be empty", permission.getFamily());
@@ -170,14 +168,15 @@ public class TestTablePermissions {
     // check actions
     assertNotNull(permission.getActions());
     assertEquals(2, permission.getActions().length);
-    List<TablePermission.Action> actions = Arrays.asList(permission.getActions());
+    List<Permission.Action> actions = Arrays.asList(permission.getActions());
     assertTrue(actions.contains(TablePermission.Action.READ));
     assertTrue(actions.contains(TablePermission.Action.WRITE));
 
     userPerms = perms.get("hubert");
     assertNotNull("Should have permissions for hubert", userPerms);
     assertEquals("Should have 1 permission for hubert", 1, userPerms.size());
-    permission = userPerms.get(0);
+    assertEquals(Permission.Scope.TABLE, userPerms.get(0).getAccessScope());
+    permission = (TablePermission) userPerms.get(0).getPermission();
     assertEquals("Permission should be for " + TEST_TABLE,
         TEST_TABLE, permission.getTableName());
     assertNull("Column family should be empty", permission.getFamily());
@@ -192,7 +191,8 @@ public class TestTablePermissions {
     userPerms = perms.get("humphrey");
     assertNotNull("Should have permissions for humphrey", userPerms);
     assertEquals("Should have 1 permission for humphrey", 1, userPerms.size());
-    permission = userPerms.get(0);
+    assertEquals(Permission.Scope.TABLE, userPerms.get(0).getAccessScope());
+    permission = (TablePermission) userPerms.get(0).getPermission();
     assertEquals("Permission should be for " + TEST_TABLE,
         TEST_TABLE, permission.getTableName());
     assertTrue("Permission should be for family " + Bytes.toString(TEST_FAMILY),
@@ -211,11 +211,11 @@ public class TestTablePermissions {
     try (Connection connection = ConnectionFactory.createConnection(conf);
         Table table = connection.getTable(AccessControlLists.ACL_TABLE_NAME)) {
       AccessControlLists.addUserPermission(conf,
-          new UserPermission(Bytes.toBytes("hubert"), TEST_TABLE2, null, (byte[])null,
-              TablePermission.Action.READ, TablePermission.Action.WRITE), table);
+        new UserPermission("hubert", TEST_TABLE2, Permission.Action.READ, Permission.Action.WRITE),
+        table);
     }
     // check full load
-    Map<byte[], ListMultimap<String,TablePermission>> allPerms =
+    Map<byte[], ListMultimap<String, UserPermission>> allPerms =
         AccessControlLists.loadAll(conf);
     assertEquals("Full permission map should have entries for both test tables",
         2, allPerms.size());
@@ -223,20 +223,22 @@ public class TestTablePermissions {
     userPerms = allPerms.get(TEST_TABLE.getName()).get("hubert");
     assertNotNull(userPerms);
     assertEquals(1, userPerms.size());
-    permission = userPerms.get(0);
+    assertEquals(Permission.Scope.TABLE, userPerms.get(0).getAccessScope());
+    permission = (TablePermission) userPerms.get(0).getPermission();
     assertEquals(TEST_TABLE, permission.getTableName());
     assertEquals(1, permission.getActions().length);
-    assertEquals(TablePermission.Action.READ, permission.getActions()[0]);
+    assertEquals(Permission.Action.READ, permission.getActions()[0]);
 
     userPerms = allPerms.get(TEST_TABLE2.getName()).get("hubert");
     assertNotNull(userPerms);
     assertEquals(1, userPerms.size());
-    permission = userPerms.get(0);
+    assertEquals(Permission.Scope.TABLE, userPerms.get(0).getAccessScope());
+    permission = (TablePermission) userPerms.get(0).getPermission();
     assertEquals(TEST_TABLE2, permission.getTableName());
     assertEquals(2, permission.getActions().length);
     actions = Arrays.asList(permission.getActions());
-    assertTrue(actions.contains(TablePermission.Action.READ));
-    assertTrue(actions.contains(TablePermission.Action.WRITE));
+    assertTrue(actions.contains(Permission.Action.READ));
+    assertTrue(actions.contains(Permission.Action.WRITE));
   }
 
   @Test
@@ -244,30 +246,27 @@ public class TestTablePermissions {
     Configuration conf = UTIL.getConfiguration();
     try (Connection connection = ConnectionFactory.createConnection(conf)) {
       addUserPermission(conf,
-          new UserPermission(Bytes.toBytes("albert"), TEST_TABLE, null,
-              (byte[])null, TablePermission.Action.READ), connection.getTable(AccessControlLists.ACL_TABLE_NAME));
+        new UserPermission("albert", TEST_TABLE, Permission.Action.READ),
+          connection.getTable(AccessControlLists.ACL_TABLE_NAME));
       addUserPermission(conf,
-          new UserPermission(Bytes.toBytes("betty"), TEST_TABLE, null,
-              (byte[])null, TablePermission.Action.READ,
-              TablePermission.Action.WRITE), connection.getTable(AccessControlLists.ACL_TABLE_NAME));
+        new UserPermission("betty", TEST_TABLE, Permission.Action.READ, Permission.Action.WRITE),
+          connection.getTable(AccessControlLists.ACL_TABLE_NAME));
       addUserPermission(conf,
-          new UserPermission(Bytes.toBytes("clark"),
-              TEST_TABLE, TEST_FAMILY,
-              TablePermission.Action.READ), connection.getTable(AccessControlLists.ACL_TABLE_NAME));
+        new UserPermission("clark", TEST_TABLE, TEST_FAMILY, Permission.Action.READ),
+          connection.getTable(AccessControlLists.ACL_TABLE_NAME));
       addUserPermission(conf,
-          new UserPermission(Bytes.toBytes("dwight"),
-              TEST_TABLE, TEST_FAMILY, TEST_QUALIFIER,
-              TablePermission.Action.WRITE), connection.getTable(AccessControlLists.ACL_TABLE_NAME));
+        new UserPermission("dwight", TEST_TABLE, TEST_FAMILY, TEST_QUALIFIER,
+          Permission.Action.WRITE), connection.getTable(AccessControlLists.ACL_TABLE_NAME));
     }
     // verify permissions survive changes in table metadata
-    ListMultimap<String,TablePermission> preperms =
+    ListMultimap<String, UserPermission> preperms =
         AccessControlLists.getTablePermissions(conf, TEST_TABLE);
 
     Table table = UTIL.getConnection().getTable(TEST_TABLE);
-    table.put(new Put(Bytes.toBytes("row1"))
-            .addColumn(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes("v1")));
-    table.put(new Put(Bytes.toBytes("row2"))
-            .addColumn(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes("v2")));
+    table.put(
+      new Put(Bytes.toBytes("row1")).addColumn(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes("v1")));
+    table.put(
+      new Put(Bytes.toBytes("row2")).addColumn(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes("v2")));
     Admin admin = UTIL.getAdmin();
     try {
       admin.split(TEST_TABLE);
@@ -283,7 +282,7 @@ public class TestTablePermissions {
     // wait for split
     Thread.sleep(10000);
 
-    ListMultimap<String,TablePermission> postperms =
+    ListMultimap<String, UserPermission> postperms =
         AccessControlLists.getTablePermissions(conf, TEST_TABLE);
 
     checkMultimapEqual(preperms, postperms);
@@ -292,41 +291,42 @@ public class TestTablePermissions {
   @Test
   public void testSerialization() throws Exception {
     Configuration conf = UTIL.getConfiguration();
-    ListMultimap<String,TablePermission> permissions = createPermissions();
+    ListMultimap<String, UserPermission> permissions = createPermissions();
     byte[] permsData = AccessControlLists.writePermissionsAsBytes(permissions, conf);
 
-    ListMultimap<String, TablePermission> copy =
-        AccessControlLists.readPermissions(permsData, conf);
+    ListMultimap<String, UserPermission> copy =
+        AccessControlLists.readUserPermission(permsData, conf);
 
     checkMultimapEqual(permissions, copy);
   }
 
-  private ListMultimap<String,TablePermission> createPermissions() {
-    ListMultimap<String,TablePermission> permissions = ArrayListMultimap.create();
-    permissions.put("george", new TablePermission(TEST_TABLE, null,
-        TablePermission.Action.READ));
-    permissions.put("george", new TablePermission(TEST_TABLE, TEST_FAMILY,
-        TablePermission.Action.WRITE));
-    permissions.put("george", new TablePermission(TEST_TABLE2, null,
-        TablePermission.Action.READ));
-    permissions.put("hubert", new TablePermission(TEST_TABLE2, null,
-        TablePermission.Action.READ, TablePermission.Action.WRITE));
-    permissions.put("bruce",new TablePermission(TEST_NAMESPACE,
-        TablePermission.Action.READ));
+  private ListMultimap<String, UserPermission> createPermissions() {
+    ListMultimap<String, UserPermission> permissions = ArrayListMultimap.create();
+    permissions.put("george",
+      new UserPermission("george", TEST_TABLE, Permission.Action.READ));
+    permissions.put("george",
+      new UserPermission("george", TEST_TABLE, TEST_FAMILY, Permission.Action.WRITE));
+    permissions.put("george",
+      new UserPermission("george", TEST_TABLE2, Permission.Action.READ));
+    permissions.put("hubert",
+      new UserPermission("hubert", TEST_TABLE2, Permission.Action.READ,
+        Permission.Action.WRITE));
+    permissions.put("bruce",
+      new UserPermission("bruce", TEST_NAMESPACE, Permission.Action.READ));
     return permissions;
   }
 
-  public void checkMultimapEqual(ListMultimap<String,TablePermission> first,
-      ListMultimap<String,TablePermission> second) {
+  public void checkMultimapEqual(ListMultimap<String, UserPermission> first,
+      ListMultimap<String, UserPermission> second) {
     assertEquals(first.size(), second.size());
     for (String key : first.keySet()) {
-      List<TablePermission> firstPerms = first.get(key);
-      List<TablePermission> secondPerms = second.get(key);
+      List<UserPermission> firstPerms = first.get(key);
+      List<UserPermission> secondPerms = second.get(key);
       assertNotNull(secondPerms);
       assertEquals(firstPerms.size(), secondPerms.size());
       LOG.info("First permissions: "+firstPerms.toString());
       LOG.info("Second permissions: "+secondPerms.toString());
-      for (TablePermission p : firstPerms) {
+      for (UserPermission p : firstPerms) {
         assertTrue("Permission "+p.toString()+" not found", secondPerms.contains(p));
       }
     }
@@ -334,13 +334,13 @@ public class TestTablePermissions {
 
   @Test
   public void testEquals() throws Exception {
-    TablePermission p1 = new TablePermission(TEST_TABLE, null, TablePermission.Action.READ);
-    TablePermission p2 = new TablePermission(TEST_TABLE, null, TablePermission.Action.READ);
+    Permission p1 = new TablePermission(TEST_TABLE, Permission.Action.READ);
+    Permission p2 = new TablePermission(TEST_TABLE, Permission.Action.READ);
     assertTrue(p1.equals(p2));
     assertTrue(p2.equals(p1));
 
-    p1 = new TablePermission(TEST_TABLE, null, TablePermission.Action.READ, TablePermission.Action.WRITE);
-    p2 = new TablePermission(TEST_TABLE, null, TablePermission.Action.WRITE, TablePermission.Action.READ);
+    p1 = new TablePermission(TEST_TABLE, TablePermission.Action.READ, TablePermission.Action.WRITE);
+    p2 = new TablePermission(TEST_TABLE, TablePermission.Action.WRITE, TablePermission.Action.READ);
     assertTrue(p1.equals(p2));
     assertTrue(p2.equals(p1));
 
@@ -354,34 +354,30 @@ public class TestTablePermissions {
     assertTrue(p1.equals(p2));
     assertTrue(p2.equals(p1));
 
-    p1 = new TablePermission(TEST_TABLE, null, TablePermission.Action.READ);
+    p1 = new TablePermission(TEST_TABLE, TablePermission.Action.READ);
     p2 = new TablePermission(TEST_TABLE, TEST_FAMILY, TablePermission.Action.READ);
     assertFalse(p1.equals(p2));
     assertFalse(p2.equals(p1));
 
-    p1 = new TablePermission(TEST_TABLE, null, TablePermission.Action.READ);
-    p2 = new TablePermission(TEST_TABLE, null, TablePermission.Action.WRITE);
+    p1 = new TablePermission(TEST_TABLE, TablePermission.Action.READ);
+    p2 = new TablePermission(TEST_TABLE, TablePermission.Action.WRITE);
     assertFalse(p1.equals(p2));
     assertFalse(p2.equals(p1));
-    p2 = new TablePermission(TEST_TABLE, null, TablePermission.Action.READ, TablePermission.Action.WRITE);
+    p2 = new TablePermission(TEST_TABLE, TablePermission.Action.READ, TablePermission.Action.WRITE);
     assertFalse(p1.equals(p2));
     assertFalse(p2.equals(p1));
 
-    p1 = new TablePermission(TEST_TABLE, null, TablePermission.Action.READ);
-    p2 = new TablePermission(TEST_TABLE2, null, TablePermission.Action.READ);
+    p1 = new TablePermission(TEST_TABLE, TablePermission.Action.READ);
+    p2 = new TablePermission(TEST_TABLE2, TablePermission.Action.READ);
     assertFalse(p1.equals(p2));
     assertFalse(p2.equals(p1));
 
-    p2 = new TablePermission(TEST_TABLE, null);
-    assertFalse(p1.equals(p2));
-    assertFalse(p2.equals(p1));
-
-    p1 = new TablePermission(TEST_NAMESPACE, TablePermission.Action.READ);
-    p2 = new TablePermission(TEST_NAMESPACE, TablePermission.Action.READ);
+    p1 = new NamespacePermission(TEST_NAMESPACE, TablePermission.Action.READ);
+    p2 = new NamespacePermission(TEST_NAMESPACE, TablePermission.Action.READ);
     assertEquals(p1, p2);
 
-    p1 = new TablePermission(TEST_NAMESPACE, TablePermission.Action.READ);
-    p2 = new TablePermission(TEST_NAMESPACE2, TablePermission.Action.READ);
+    p1 = new NamespacePermission(TEST_NAMESPACE, TablePermission.Action.READ);
+    p2 = new NamespacePermission(TEST_NAMESPACE2, TablePermission.Action.READ);
     assertFalse(p1.equals(p2));
     assertFalse(p2.equals(p1));
   }
@@ -393,56 +389,58 @@ public class TestTablePermissions {
     // add some permissions
     try (Connection connection = ConnectionFactory.createConnection(conf)) {
       addUserPermission(conf,
-          new UserPermission(Bytes.toBytes("user1"),
+          new UserPermission("user1",
               Permission.Action.READ, Permission.Action.WRITE), connection.getTable(AccessControlLists.ACL_TABLE_NAME));
       addUserPermission(conf,
-          new UserPermission(Bytes.toBytes("user2"),
+          new UserPermission("user2",
               Permission.Action.CREATE), connection.getTable(AccessControlLists.ACL_TABLE_NAME));
       addUserPermission(conf,
-          new UserPermission(Bytes.toBytes("user3"),
+          new UserPermission("user3",
               Permission.Action.ADMIN, Permission.Action.READ, Permission.Action.CREATE),
           connection.getTable(AccessControlLists.ACL_TABLE_NAME));
     }
-    ListMultimap<String,TablePermission> perms = AccessControlLists.getTablePermissions(conf, null);
-    List<TablePermission> user1Perms = perms.get("user1");
+    ListMultimap<String, UserPermission> perms =
+      AccessControlLists.getTablePermissions(conf, null);
+    List<UserPermission> user1Perms = perms.get("user1");
     assertEquals("Should have 1 permission for user1", 1, user1Perms.size());
     assertEquals("user1 should have WRITE permission",
                  new Permission.Action[] { Permission.Action.READ, Permission.Action.WRITE },
-                 user1Perms.get(0).getActions());
+                 user1Perms.get(0).getPermission().getActions());
 
-    List<TablePermission> user2Perms = perms.get("user2");
+    List<UserPermission> user2Perms = perms.get("user2");
     assertEquals("Should have 1 permission for user2", 1, user2Perms.size());
     assertEquals("user2 should have CREATE permission",
                  new Permission.Action[] { Permission.Action.CREATE },
-                 user2Perms.get(0).getActions());
+                 user2Perms.get(0).getPermission().getActions());
 
-    List<TablePermission> user3Perms = perms.get("user3");
+    List<UserPermission> user3Perms = perms.get("user3");
     assertEquals("Should have 1 permission for user3", 1, user3Perms.size());
     assertEquals("user3 should have ADMIN, READ, CREATE permission",
                  new Permission.Action[] {
                     Permission.Action.READ, Permission.Action.CREATE, Permission.Action.ADMIN
                  },
-                 user3Perms.get(0).getActions());
+                 user3Perms.get(0).getPermission().getActions());
   }
 
   @Test
   public void testAuthManager() throws Exception {
     Configuration conf = UTIL.getConfiguration();
-    /* test a race condition causing TableAuthManager to sometimes fail global permissions checks
+    /**
+     * test a race condition causing AuthManager to sometimes fail global permissions checks
      * when the global cache is being updated
      */
-    TableAuthManager authManager = TableAuthManager.getOrCreate(ZKW, conf);
+    AuthManager authManager = AuthManager.getOrCreate(ZKW, conf);
     // currently running user is the system user and should have global admin perms
     User currentUser = User.getCurrent();
-    assertTrue(authManager.authorize(currentUser, Permission.Action.ADMIN));
+    assertTrue(authManager.authorizeUserGlobal(currentUser, Permission.Action.ADMIN));
     try (Connection connection = ConnectionFactory.createConnection(conf)) {
       for (int i=1; i<=50; i++) {
-        addUserPermission(conf, new UserPermission(Bytes.toBytes("testauth"+i),
-            Permission.Action.ADMIN, Permission.Action.READ, Permission.Action.WRITE),
-            connection.getTable(AccessControlLists.ACL_TABLE_NAME));
+        addUserPermission(conf, new UserPermission("testauth"+i,
+          Permission.Action.ADMIN, Permission.Action.READ, Permission.Action.WRITE),
+          connection.getTable(AccessControlLists.ACL_TABLE_NAME));
         // make sure the system user still shows as authorized
         assertTrue("Failed current user auth check on iter "+i,
-            authManager.authorize(currentUser, Permission.Action.ADMIN));
+          authManager.authorizeUserGlobal(currentUser, Permission.Action.ADMIN));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/130057f1/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionWatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionWatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionWatcher.java
index c8ab863..cfd6512 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionWatcher.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionWatcher.java
@@ -41,6 +41,9 @@ import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
+import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
+
 /**
  * Test the reading and writing of access permissions to and from zookeeper.
  */
@@ -53,8 +56,8 @@ public class TestZKPermissionWatcher {
 
   private static final Logger LOG = LoggerFactory.getLogger(TestZKPermissionWatcher.class);
   private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
-  private static TableAuthManager AUTH_A;
-  private static TableAuthManager AUTH_B;
+  private static AuthManager AUTH_A;
+  private static AuthManager AUTH_B;
   private final static Abortable ABORTABLE = new Abortable() {
     private final AtomicBoolean abort = new AtomicBoolean(false);
 
@@ -81,9 +84,9 @@ public class TestZKPermissionWatcher {
 
     // start minicluster
     UTIL.startMiniCluster();
-    AUTH_A = TableAuthManager.getOrCreate(new ZKWatcher(conf,
+    AUTH_A = AuthManager.getOrCreate(new ZKWatcher(conf,
       "TestZKPermissionsWatcher_1", ABORTABLE), conf);
-    AUTH_B = TableAuthManager.getOrCreate(new ZKWatcher(conf,
+    AUTH_B = AuthManager.getOrCreate(new ZKWatcher(conf,
       "TestZKPermissionsWatcher_2", ABORTABLE), conf);
   }
 
@@ -98,30 +101,25 @@ public class TestZKPermissionWatcher {
     User george = User.createUserForTesting(conf, "george", new String[] { });
     User hubert = User.createUserForTesting(conf, "hubert", new String[] { });
 
-    assertFalse(AUTH_A.authorizeUser(george, TEST_TABLE, null,
-      TablePermission.Action.READ));
-    assertFalse(AUTH_A.authorizeUser(george, TEST_TABLE, null,
-      TablePermission.Action.WRITE));
-    assertFalse(AUTH_A.authorizeUser(hubert, TEST_TABLE, null,
-      TablePermission.Action.READ));
-    assertFalse(AUTH_A.authorizeUser(hubert, TEST_TABLE, null,
-      TablePermission.Action.WRITE));
-
-    assertFalse(AUTH_B.authorizeUser(george, TEST_TABLE, null,
-      TablePermission.Action.READ));
-    assertFalse(AUTH_B.authorizeUser(george, TEST_TABLE, null,
-      TablePermission.Action.WRITE));
-    assertFalse(AUTH_B.authorizeUser(hubert, TEST_TABLE, null,
-      TablePermission.Action.READ));
-    assertFalse(AUTH_B.authorizeUser(hubert, TEST_TABLE, null,
-      TablePermission.Action.WRITE));
+    assertFalse(AUTH_A.authorizeUserTable(george, TEST_TABLE, Permission.Action.READ));
+    assertFalse(AUTH_A.authorizeUserTable(george, TEST_TABLE, Permission.Action.WRITE));
+    assertFalse(AUTH_A.authorizeUserTable(hubert, TEST_TABLE, Permission.Action.READ));
+    assertFalse(AUTH_A.authorizeUserTable(hubert, TEST_TABLE, Permission.Action.WRITE));
+
+    assertFalse(AUTH_B.authorizeUserTable(george, TEST_TABLE, Permission.Action.READ));
+    assertFalse(AUTH_B.authorizeUserTable(george, TEST_TABLE, Permission.Action.WRITE));
+    assertFalse(AUTH_B.authorizeUserTable(hubert, TEST_TABLE, Permission.Action.READ));
+    assertFalse(AUTH_B.authorizeUserTable(hubert, TEST_TABLE, Permission.Action.WRITE));
 
     // update ACL: george RW
-    List<TablePermission> acl = new ArrayList<>(1);
-    acl.add(new TablePermission(TEST_TABLE, null, TablePermission.Action.READ,
-      TablePermission.Action.WRITE));
+    List<UserPermission> acl = new ArrayList<>(1);
+    acl.add(new UserPermission(george.getShortName(), TEST_TABLE,
+      Permission.Action.READ, Permission.Action.WRITE));
+    ListMultimap<String, UserPermission> multimap = ArrayListMultimap.create();
+    multimap.putAll(george.getShortName(), acl);
+    byte[] serialized = AccessControlLists.writePermissionsAsBytes(multimap, conf);
+    AUTH_A.getZKPermissionWatcher().writeToZookeeper(TEST_TABLE.getName(), serialized);
     final long mtimeB = AUTH_B.getMTime();
-    AUTH_A.setTableUserPermissions(george.getShortName(), TEST_TABLE, acl);
     // Wait for the update to propagate
     UTIL.waitFor(10000, 100, new Predicate<Exception>() {
       @Override
@@ -132,28 +130,22 @@ public class TestZKPermissionWatcher {
     Thread.sleep(1000);
 
     // check it
-    assertTrue(AUTH_A.authorizeUser(george, TEST_TABLE, null,
-      TablePermission.Action.READ));
-    assertTrue(AUTH_A.authorizeUser(george, TEST_TABLE, null,
-      TablePermission.Action.WRITE));
-    assertTrue(AUTH_B.authorizeUser(george, TEST_TABLE, null,
-      TablePermission.Action.READ));
-    assertTrue(AUTH_B.authorizeUser(george, TEST_TABLE, null,
-      TablePermission.Action.WRITE));
-    assertFalse(AUTH_A.authorizeUser(hubert, TEST_TABLE, null,
-      TablePermission.Action.READ));
-    assertFalse(AUTH_A.authorizeUser(hubert, TEST_TABLE, null,
-      TablePermission.Action.WRITE));
-    assertFalse(AUTH_B.authorizeUser(hubert, TEST_TABLE, null,
-      TablePermission.Action.READ));
-    assertFalse(AUTH_B.authorizeUser(hubert, TEST_TABLE, null,
-      TablePermission.Action.WRITE));
+    assertTrue(AUTH_A.authorizeUserTable(george, TEST_TABLE, Permission.Action.READ));
+    assertTrue(AUTH_A.authorizeUserTable(george, TEST_TABLE, Permission.Action.WRITE));
+    assertTrue(AUTH_B.authorizeUserTable(george, TEST_TABLE, Permission.Action.READ));
+    assertTrue(AUTH_B.authorizeUserTable(george, TEST_TABLE, Permission.Action.WRITE));
+    assertFalse(AUTH_A.authorizeUserTable(hubert, TEST_TABLE, Permission.Action.READ));
+    assertFalse(AUTH_A.authorizeUserTable(hubert, TEST_TABLE, Permission.Action.WRITE));
+    assertFalse(AUTH_B.authorizeUserTable(hubert, TEST_TABLE, Permission.Action.READ));
+    assertFalse(AUTH_B.authorizeUserTable(hubert, TEST_TABLE, Permission.Action.WRITE));
 
     // update ACL: hubert R
-    acl = new ArrayList<>(1);
-    acl.add(new TablePermission(TEST_TABLE, null, TablePermission.Action.READ));
+    List<UserPermission> acl2 = new ArrayList<>(1);
+    acl2.add(new UserPermission(hubert.getShortName(), TEST_TABLE, TablePermission.Action.READ));
     final long mtimeA = AUTH_A.getMTime();
-    AUTH_B.setTableUserPermissions("hubert", TEST_TABLE, acl);
+    multimap.putAll(hubert.getShortName(), acl2);
+    byte[] serialized2 = AccessControlLists.writePermissionsAsBytes(multimap, conf);
+    AUTH_B.getZKPermissionWatcher().writeToZookeeper(TEST_TABLE.getName(), serialized2);
     // Wait for the update to propagate
     UTIL.waitFor(10000, 100, new Predicate<Exception>() {
       @Override
@@ -164,21 +156,13 @@ public class TestZKPermissionWatcher {
     Thread.sleep(1000);
 
     // check it
-    assertTrue(AUTH_A.authorizeUser(george, TEST_TABLE, null,
-      TablePermission.Action.READ));
-    assertTrue(AUTH_A.authorizeUser(george, TEST_TABLE, null,
-      TablePermission.Action.WRITE));
-    assertTrue(AUTH_B.authorizeUser(george, TEST_TABLE, null,
-      TablePermission.Action.READ));
-    assertTrue(AUTH_B.authorizeUser(george, TEST_TABLE, null,
-      TablePermission.Action.WRITE));
-    assertTrue(AUTH_A.authorizeUser(hubert, TEST_TABLE, null,
-      TablePermission.Action.READ));
-    assertFalse(AUTH_A.authorizeUser(hubert, TEST_TABLE, null,
-      TablePermission.Action.WRITE));
-    assertTrue(AUTH_B.authorizeUser(hubert, TEST_TABLE, null,
-      TablePermission.Action.READ));
-    assertFalse(AUTH_B.authorizeUser(hubert, TEST_TABLE, null,
-      TablePermission.Action.WRITE));
+    assertTrue(AUTH_A.authorizeUserTable(george, TEST_TABLE, Permission.Action.READ));
+    assertTrue(AUTH_A.authorizeUserTable(george, TEST_TABLE, Permission.Action.WRITE));
+    assertTrue(AUTH_B.authorizeUserTable(george, TEST_TABLE, Permission.Action.READ));
+    assertTrue(AUTH_B.authorizeUserTable(george, TEST_TABLE, Permission.Action.WRITE));
+    assertTrue(AUTH_A.authorizeUserTable(hubert, TEST_TABLE, Permission.Action.READ));
+    assertFalse(AUTH_A.authorizeUserTable(hubert, TEST_TABLE, Permission.Action.WRITE));
+    assertTrue(AUTH_B.authorizeUserTable(hubert, TEST_TABLE, Permission.Action.READ));
+    assertFalse(AUTH_B.authorizeUserTable(hubert, TEST_TABLE, Permission.Action.WRITE));
   }
 }


[2/3] hbase git commit: HBASE-21255 [acl] Refactor TablePermission into three classes (Global, Namespace, Table)

Posted by re...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/130057f1/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
index b6d8fe9..34480d3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
@@ -131,14 +131,15 @@ public class AccessControlLists {
    * @throws IOException in the case of an error accessing the metadata table
    */
   static void addUserPermission(Configuration conf, UserPermission userPerm, Table t,
-                                boolean mergeExistingPermissions) throws IOException {
-    Permission.Action[] actions = userPerm.getActions();
-    byte[] rowKey = userPermissionRowKey(userPerm);
+      boolean mergeExistingPermissions) throws IOException {
+    Permission permission = userPerm.getPermission();
+    Permission.Action[] actions = permission.getActions();
+    byte[] rowKey = userPermissionRowKey(permission);
     Put p = new Put(rowKey);
     byte[] key = userPermissionKey(userPerm);
 
     if ((actions == null) || (actions.length == 0)) {
-      String msg = "No actions associated with user '" + Bytes.toString(userPerm.getUser()) + "'";
+      String msg = "No actions associated with user '" + userPerm.getUser() + "'";
       LOG.warn(msg);
       throw new IOException(msg);
     }
@@ -148,16 +149,14 @@ public class AccessControlLists {
       List<UserPermission> perms = getUserPermissions(conf, rowKey, null, null, null, false);
       UserPermission currentPerm = null;
       for (UserPermission perm : perms) {
-        if (Bytes.equals(perm.getUser(), userPerm.getUser())
-                && ((userPerm.isGlobal() && ACL_TABLE_NAME.equals(perm.getTableName()))
-                || perm.tableFieldsEqual(userPerm))) {
+        if (userPerm.equalsExceptActions(perm)) {
           currentPerm = perm;
           break;
         }
       }
 
-      if(currentPerm != null && currentPerm.getActions() != null){
-        actionSet.addAll(Arrays.asList(currentPerm.getActions()));
+      if (currentPerm != null && currentPerm.getPermission().getActions() != null){
+        actionSet.addAll(Arrays.asList(currentPerm.getPermission().getActions()));
       }
     }
 
@@ -225,24 +224,27 @@ public class AccessControlLists {
    */
   static void removeUserPermission(Configuration conf, UserPermission userPerm, Table t)
       throws IOException {
-    if (null == userPerm.getActions()) {
+    if (null == userPerm.getPermission().getActions() ||
+        userPerm.getPermission().getActions().length == 0) {
       removePermissionRecord(conf, userPerm, t);
     } else {
       // Get all the global user permissions from the acl table
       List<UserPermission> permsList =
-          getUserPermissions(conf, userPermissionRowKey(userPerm), null, null, null, false);
+        getUserPermissions(conf, userPermissionRowKey(userPerm.getPermission()),
+          null, null, null, false);
       List<Permission.Action> remainingActions = new ArrayList<>();
-      List<Permission.Action> dropActions = Arrays.asList(userPerm.getActions());
+      List<Permission.Action> dropActions = Arrays.asList(userPerm.getPermission().getActions());
       for (UserPermission perm : permsList) {
         // Find the user and remove only the requested permissions
-        if (Bytes.toString(perm.getUser()).equals(Bytes.toString(userPerm.getUser()))) {
-          for (Permission.Action oldAction : perm.getActions()) {
+        if (perm.getUser().equals(userPerm.getUser())) {
+          for (Permission.Action oldAction : perm.getPermission().getActions()) {
             if (!dropActions.contains(oldAction)) {
               remainingActions.add(oldAction);
             }
           }
           if (!remainingActions.isEmpty()) {
-            perm.setActions(remainingActions.toArray(new Permission.Action[remainingActions.size()]));
+            perm.getPermission().setActions(
+              remainingActions.toArray(new Permission.Action[remainingActions.size()]));
             addUserPermission(conf, perm, t);
           } else {
             removePermissionRecord(conf, userPerm, t);
@@ -258,7 +260,7 @@ public class AccessControlLists {
 
   private static void removePermissionRecord(Configuration conf, UserPermission userPerm, Table t)
       throws IOException {
-    Delete d = new Delete(userPermissionRowKey(userPerm));
+    Delete d = new Delete(userPermissionRowKey(userPerm.getPermission()));
     d.addColumns(ACL_LIST_FAMILY, userPermissionKey(userPerm));
     try {
       t.delete(d);
@@ -348,14 +350,17 @@ public class AccessControlLists {
     removeTablePermissions(tableName, column, t, true);
   }
 
-  static byte[] userPermissionRowKey(UserPermission userPerm) {
+  static byte[] userPermissionRowKey(Permission permission) {
     byte[] row;
-    if(userPerm.hasNamespace()) {
-      row = Bytes.toBytes(toNamespaceEntry(userPerm.getNamespace()));
-    } else if(userPerm.isGlobal()) {
-      row = ACL_GLOBAL_NAME;
+    if (permission instanceof TablePermission) {
+      TablePermission tablePerm = (TablePermission) permission;
+      row = tablePerm.getTableName().getName();
+    } else if (permission instanceof NamespacePermission) {
+      NamespacePermission nsPerm = (NamespacePermission) permission;
+      row = Bytes.toBytes(toNamespaceEntry(nsPerm.getNamespace()));
     } else {
-      row = userPerm.getTableName().getName();
+      // permission instanceof TablePermission
+      row = ACL_GLOBAL_NAME;
     }
     return row;
   }
@@ -366,10 +371,15 @@ public class AccessControlLists {
    *  username,family
    *  username,family,qualifier
    */
-  static byte[] userPermissionKey(UserPermission userPerm) {
-    byte[] qualifier = userPerm.getQualifier();
-    byte[] family = userPerm.getFamily();
-    byte[] key = userPerm.getUser();
+  static byte[] userPermissionKey(UserPermission permission) {
+    byte[] key = Bytes.toBytes(permission.getUser());
+    byte[] qualifier = null;
+    byte[] family = null;
+    if (permission.getPermission().getAccessScope() == Permission.Scope.TABLE) {
+      TablePermission tablePermission = (TablePermission) permission.getPermission();
+      family = tablePermission.getFamily();
+      qualifier = tablePermission.getQualifier();
+    }
 
     if (family != null && family.length > 0) {
       key = Bytes.add(key, Bytes.add(new byte[]{ACL_KEY_DELIMITER}, family));
@@ -404,14 +414,15 @@ public class AccessControlLists {
    * @return a map of the permissions for this table.
    * @throws IOException
    */
-  static Map<byte[], ListMultimap<String,TablePermission>> loadAll(Region aclRegion)
+  static Map<byte[], ListMultimap<String, UserPermission>> loadAll(Region aclRegion)
       throws IOException {
 
     if (!isAclRegion(aclRegion)) {
       throw new IOException("Can only load permissions from "+ACL_TABLE_NAME);
     }
 
-    Map<byte[], ListMultimap<String, TablePermission>> allPerms = new TreeMap<>(Bytes.BYTES_RAWCOMPARATOR);
+    Map<byte[], ListMultimap<String, UserPermission>> allPerms =
+      new TreeMap<>(Bytes.BYTES_RAWCOMPARATOR);
 
     // do a full scan of _acl_ table
 
@@ -426,18 +437,18 @@ public class AccessControlLists {
         List<Cell> row = new ArrayList<>();
 
         boolean hasNext = iScanner.next(row);
-        ListMultimap<String,TablePermission> perms = ArrayListMultimap.create();
+        ListMultimap<String, UserPermission> perms = ArrayListMultimap.create();
         byte[] entry = null;
         for (Cell kv : row) {
           if (entry == null) {
             entry = CellUtil.cloneRow(kv);
           }
-          Pair<String, TablePermission> permissionsOfUserOnTable =
+          Pair<String, Permission> permissionsOfUserOnTable =
               parsePermissionRecord(entry, kv, null, null, false, null);
           if (permissionsOfUserOnTable != null) {
             String username = permissionsOfUserOnTable.getFirst();
-            TablePermission permissions = permissionsOfUserOnTable.getSecond();
-            perms.put(username, permissions);
+            Permission permission = permissionsOfUserOnTable.getSecond();
+            perms.put(username, new UserPermission(username, permission));
           }
         }
         if (entry != null) {
@@ -460,9 +471,10 @@ public class AccessControlLists {
    * Load all permissions from the region server holding {@code _acl_},
    * primarily intended for testing purposes.
    */
-  static Map<byte[], ListMultimap<String,TablePermission>> loadAll(
+  static Map<byte[], ListMultimap<String, UserPermission>> loadAll(
       Configuration conf) throws IOException {
-    Map<byte[], ListMultimap<String,TablePermission>> allPerms = new TreeMap<>(Bytes.BYTES_RAWCOMPARATOR);
+    Map<byte[], ListMultimap<String, UserPermission>> allPerms =
+      new TreeMap<>(Bytes.BYTES_RAWCOMPARATOR);
 
     // do a full scan of _acl_, filtering on only first table region rows
 
@@ -476,7 +488,7 @@ public class AccessControlLists {
         scanner = table.getScanner(scan);
         try {
           for (Result row : scanner) {
-            ListMultimap<String, TablePermission> resultPerms =
+            ListMultimap<String, UserPermission> resultPerms =
                 parsePermissions(row.getRow(), row, null, null, null, false);
             allPerms.put(row.getRow(), resultPerms);
           }
@@ -489,14 +501,14 @@ public class AccessControlLists {
     return allPerms;
   }
 
-  public static ListMultimap<String, TablePermission> getTablePermissions(Configuration conf,
+  public static ListMultimap<String, UserPermission> getTablePermissions(Configuration conf,
       TableName tableName) throws IOException {
     return getPermissions(conf, tableName != null ? tableName.getName() : null, null, null, null,
       null, false);
   }
 
   @VisibleForTesting
-  public static ListMultimap<String, TablePermission> getNamespacePermissions(Configuration conf,
+  public static ListMultimap<String, UserPermission> getNamespacePermissions(Configuration conf,
       String namespace) throws IOException {
     return getPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)), null, null, null, null,
       false);
@@ -509,11 +521,11 @@ public class AccessControlLists {
    * See {@link AccessControlLists class documentation} for the key structure used for storage.
    * </p>
    */
-  static ListMultimap<String, TablePermission> getPermissions(Configuration conf, byte[] entryName,
+  static ListMultimap<String, UserPermission> getPermissions(Configuration conf, byte[] entryName,
       Table t, byte[] cf, byte[] cq, String user, boolean hasFilterUser) throws IOException {
     if (entryName == null) entryName = ACL_GLOBAL_NAME;
     // for normal user tables, we just read the table row from _acl_
-    ListMultimap<String, TablePermission> perms = ArrayListMultimap.create();
+    ListMultimap<String, UserPermission> perms = ArrayListMultimap.create();
     Get get = new Get(entryName);
     get.addFamily(ACL_LIST_FAMILY);
     Result row = null;
@@ -570,25 +582,12 @@ public class AccessControlLists {
    */
   static List<UserPermission> getUserPermissions(Configuration conf, byte[] entryName, byte[] cf,
       byte[] cq, String user, boolean hasFilterUser) throws IOException {
-    ListMultimap<String, TablePermission> allPerms =
+    ListMultimap<String, UserPermission> allPerms =
         getPermissions(conf, entryName, null, cf, cq, user, hasFilterUser);
-
     List<UserPermission> perms = new ArrayList<>();
-    if (isNamespaceEntry(entryName)) { // Namespace
-      for (Map.Entry<String, TablePermission> entry : allPerms.entries()) {
-        UserPermission up = new UserPermission(Bytes.toBytes(entry.getKey()),
-            entry.getValue().getNamespace(), entry.getValue().getActions());
-        perms.add(up);
-      }
-    } else { // Table
-      for (Map.Entry<String, TablePermission> entry : allPerms.entries()) {
-        UserPermission up = new UserPermission(Bytes.toBytes(entry.getKey()),
-            entry.getValue().getTableName(), entry.getValue().getFamily(),
-            entry.getValue().getQualifier(), entry.getValue().getActions());
-        perms.add(up);
-      }
+    for (Map.Entry<String, UserPermission> entry : allPerms.entries()) {
+      perms.add(entry.getValue());
     }
-
     return perms;
   }
 
@@ -596,25 +595,25 @@ public class AccessControlLists {
    * Parse and filter permission based on the specified column family, column qualifier and user
    * name.
    */
-  private static ListMultimap<String, TablePermission> parsePermissions(byte[] entryName,
+  private static ListMultimap<String, UserPermission> parsePermissions(byte[] entryName,
       Result result, byte[] cf, byte[] cq, String user, boolean hasFilterUser) {
-    ListMultimap<String, TablePermission> perms = ArrayListMultimap.create();
+    ListMultimap<String, UserPermission> perms = ArrayListMultimap.create();
     if (result != null && result.size() > 0) {
       for (Cell kv : result.rawCells()) {
-        Pair<String, TablePermission> permissionsOfUserOnTable =
+        Pair<String, Permission> permissionsOfUserOnTable =
             parsePermissionRecord(entryName, kv, cf, cq, hasFilterUser, user);
 
         if (permissionsOfUserOnTable != null) {
           String username = permissionsOfUserOnTable.getFirst();
-          TablePermission permissions = permissionsOfUserOnTable.getSecond();
-          perms.put(username, permissions);
+          Permission permission = permissionsOfUserOnTable.getSecond();
+          perms.put(username, new UserPermission(username, permission));
         }
       }
     }
     return perms;
   }
 
-  private static Pair<String, TablePermission> parsePermissionRecord(byte[] entryName, Cell kv,
+  private static Pair<String, Permission> parsePermissionRecord(byte[] entryName, Cell kv,
       byte[] cf, byte[] cq, boolean filterPerms, String filterUser) {
     // return X given a set of permissions encoded in the permissionRecord kv.
     byte[] family = CellUtil.cloneFamily(kv);
@@ -625,9 +624,10 @@ public class AccessControlLists {
     byte[] key = CellUtil.cloneQualifier(kv);
     byte[] value = CellUtil.cloneValue(kv);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Read acl: kv ["+
-          Bytes.toStringBinary(key)+": "+
-          Bytes.toStringBinary(value)+"]");
+      LOG.debug("Read acl: entry[" +
+        Bytes.toStringBinary(entryName) + "], kv [" +
+        Bytes.toStringBinary(key) + ": " +
+        Bytes.toStringBinary(value)+"]");
     }
 
     // check for a column family appended to the key
@@ -652,11 +652,20 @@ public class AccessControlLists {
       }
 
       return new Pair<>(username,
-          new TablePermission(Bytes.toString(fromNamespaceEntry(entryName)), value));
+        new NamespacePermission(Bytes.toString(fromNamespaceEntry(entryName)), value));
     }
 
-    //Handle table and global entry
-    //TODO global entry should be handled differently
+    // Handle global entry
+    if (isGlobalEntry(entryName)) {
+      // Filter the permissions cell record if client query
+      if (filterPerms && !validateFilterUser(username, filterUser, filterUserGroups)) {
+        return null;
+      }
+
+      return new Pair<>(username, new GlobalPermission(value));
+    }
+
+    // Handle table entry
     int idx = username.indexOf(ACL_KEY_DELIMITER);
     byte[] permFamily = null;
     byte[] permQualifier = null;
@@ -738,7 +747,7 @@ public class AccessControlLists {
    * Writes a set of permissions as {@link org.apache.hadoop.io.Writable} instances and returns the
    * resulting byte array. Writes a set of permission [user: table permission]
    */
-  public static byte[] writePermissionsAsBytes(ListMultimap<String, TablePermission> perms,
+  public static byte[] writePermissionsAsBytes(ListMultimap<String, UserPermission> perms,
       Configuration conf) {
     return ProtobufUtil.prependPBMagic(AccessControlUtil.toUserTablePermissions(perms).toByteArray());
   }
@@ -750,11 +759,11 @@ public class AccessControlLists {
 
   private static final int WRITABLE_NOT_ENCODED = 0;
 
-  private static List<TablePermission> readWritablePermissions(DataInput in, Configuration conf)
-      throws IOException, ClassNotFoundException {
+  private static List<Permission> readWritableUserPermission(DataInput in,
+      Configuration conf) throws IOException, ClassNotFoundException {
     assert WritableUtils.readVInt(in) == LIST_CODE;
     int length = in.readInt();
-    List<TablePermission> list = new ArrayList<>(length);
+    List<Permission> list = new ArrayList<>(length);
     for (int i = 0; i < length; i++) {
       assert WritableUtils.readVInt(in) == WRITABLE_CODE;
       assert WritableUtils.readVInt(in) == WRITABLE_NOT_ENCODED;
@@ -762,38 +771,67 @@ public class AccessControlLists {
       Class<? extends Writable> clazz = conf.getClassByName(className).asSubclass(Writable.class);
       Writable instance = WritableFactories.newInstance(clazz, conf);
       instance.readFields(in);
-      list.add((TablePermission) instance);
+      list.add((Permission) instance);
     }
     return list;
   }
 
-  /**
-   * Reads a set of permissions as {@link org.apache.hadoop.io.Writable} instances from the input
-   * stream.
-   */
-  public static ListMultimap<String, TablePermission> readPermissions(byte[] data,
+  @VisibleForTesting
+  public static ListMultimap<String, UserPermission> readUserPermission(byte[] data,
       Configuration conf) throws DeserializationException {
     if (ProtobufUtil.isPBMagicPrefix(data)) {
       int pblen = ProtobufUtil.lengthOfPBMagic();
       try {
         AccessControlProtos.UsersAndPermissions.Builder builder =
-            AccessControlProtos.UsersAndPermissions.newBuilder();
+          AccessControlProtos.UsersAndPermissions.newBuilder();
         ProtobufUtil.mergeFrom(builder, data, pblen, data.length - pblen);
-        return AccessControlUtil.toUserTablePermissions(builder.build());
+        return AccessControlUtil.toUserPermission(builder.build());
       } catch (IOException e) {
         throw new DeserializationException(e);
       }
     } else {
       // TODO: We have to re-write non-PB data as PB encoded. Otherwise we will carry old Writables
       // forever (here and a couple of other places).
-      ListMultimap<String, TablePermission> perms = ArrayListMultimap.create();
+      ListMultimap<String, UserPermission> userPermission = ArrayListMultimap.create();
       try {
         DataInput in = new DataInputStream(new ByteArrayInputStream(data));
         int length = in.readInt();
         for (int i = 0; i < length; i++) {
           String user = Text.readString(in);
-          List<TablePermission> userPerms = readWritablePermissions(in, conf);
-          perms.putAll(user, userPerms);
+          List<Permission> perms = readWritableUserPermission(in, conf);
+          for (Permission p : perms) {
+            userPermission.put(user, new UserPermission(user, p));
+          }
+        }
+      } catch (IOException | ClassNotFoundException e) {
+        throw new DeserializationException(e);
+      }
+      return userPermission;
+    }
+  }
+
+  public static ListMultimap<String, Permission> readPermissions(byte[] data,
+      Configuration conf) throws DeserializationException {
+    if (ProtobufUtil.isPBMagicPrefix(data)) {
+      int pblen = ProtobufUtil.lengthOfPBMagic();
+      try {
+        AccessControlProtos.UsersAndPermissions.Builder builder =
+          AccessControlProtos.UsersAndPermissions.newBuilder();
+        ProtobufUtil.mergeFrom(builder, data, pblen, data.length - pblen);
+        return AccessControlUtil.toPermission(builder.build());
+      } catch (IOException e) {
+        throw new DeserializationException(e);
+      }
+    } else {
+      // TODO: We have to re-write non-PB data as PB encoded. Otherwise we will carry old Writables
+      // forever (here and a couple of other places).
+      ListMultimap<String, Permission> perms = ArrayListMultimap.create();
+      try {
+        DataInput in = new DataInputStream(new ByteArrayInputStream(data));
+        int length = in.readInt();
+        for (int i = 0; i < length; i++) {
+          String user = Text.readString(in);
+          perms.putAll(user, readWritableUserPermission(in, conf));
         }
       } catch (IOException | ClassNotFoundException e) {
         throw new DeserializationException(e);
@@ -802,6 +840,10 @@ public class AccessControlLists {
     }
   }
 
+  public static boolean isGlobalEntry(byte[] entryName) {
+    return entryName != null && TableName.valueOf(entryName).equals(ACL_TABLE_NAME);
+  }
+
   public static boolean isNamespaceEntry(String entryName) {
     return entryName != null && entryName.charAt(0) == NAMESPACE_PREFIX;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/130057f1/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index 1100500..1a84bfd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -237,20 +237,20 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
     return regionEnv != null ? regionEnv.getRegion() : null;
   }
 
-  public TableAuthManager getAuthManager() {
+  public AuthManager getAuthManager() {
     return accessChecker.getAuthManager();
   }
 
   private void initialize(RegionCoprocessorEnvironment e) throws IOException {
     final Region region = e.getRegion();
     Configuration conf = e.getConfiguration();
-    Map<byte[], ListMultimap<String, TablePermission>> tables = AccessControlLists.loadAll(region);
+    Map<byte[], ListMultimap<String, UserPermission>> tables = AccessControlLists.loadAll(region);
     // For each table, write out the table's permissions to the respective
     // znode for that table.
-    for (Map.Entry<byte[], ListMultimap<String,TablePermission>> t:
+    for (Map.Entry<byte[], ListMultimap<String, UserPermission>> t:
       tables.entrySet()) {
       byte[] entry = t.getKey();
-      ListMultimap<String,TablePermission> perms = t.getValue();
+      ListMultimap<String, UserPermission> perms = t.getValue();
       byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
       getAuthManager().getZKPermissionWatcher().writeToZookeeper(entry, serialized);
     }
@@ -286,7 +286,7 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
     try (Table t = e.getConnection().getTable(AccessControlLists.ACL_TABLE_NAME)) {
       for (byte[] entry : entries) {
         currentEntry = entry;
-        ListMultimap<String, TablePermission> perms =
+        ListMultimap<String, UserPermission> perms =
             AccessControlLists.getPermissions(conf, entry, t, null, null, null, false);
         byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
         zkw.writeToZookeeper(entry, serialized);
@@ -330,7 +330,7 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
     }
 
     // 2. check for the table-level, if successful we can short-circuit
-    if (getAuthManager().authorize(user, tableName, (byte[])null, permRequest)) {
+    if (getAuthManager().authorizeUserTable(user, tableName, permRequest)) {
       return AuthResult.allow(request, "Table permission granted", user,
         permRequest, tableName, families);
     }
@@ -340,7 +340,7 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
       // all families must pass
       for (Map.Entry<byte [], ? extends Collection<?>> family : families.entrySet()) {
         // a) check for family level access
-        if (getAuthManager().authorize(user, tableName, family.getKey(),
+        if (getAuthManager().authorizeUserTable(user, tableName, family.getKey(),
             permRequest)) {
           continue;  // family-level permission overrides per-qualifier
         }
@@ -351,17 +351,17 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
             // for each qualifier of the family
             Set<byte[]> familySet = (Set<byte[]>)family.getValue();
             for (byte[] qualifier : familySet) {
-              if (!getAuthManager().authorize(user, tableName, family.getKey(),
-                                         qualifier, permRequest)) {
+              if (!getAuthManager().authorizeUserTable(user, tableName,
+                    family.getKey(), qualifier, permRequest)) {
                 return AuthResult.deny(request, "Failed qualifier check", user,
-                    permRequest, tableName, makeFamilyMap(family.getKey(), qualifier));
+                  permRequest, tableName, makeFamilyMap(family.getKey(), qualifier));
               }
             }
           } else if (family.getValue() instanceof List) { // List<Cell>
             List<Cell> cellList = (List<Cell>)family.getValue();
             for (Cell cell : cellList) {
-              if (!getAuthManager().authorize(user, tableName, family.getKey(),
-                CellUtil.cloneQualifier(cell), permRequest)) {
+              if (!getAuthManager().authorizeUserTable(user, tableName, family.getKey(),
+                  CellUtil.cloneQualifier(cell), permRequest)) {
                 return AuthResult.deny(request, "Failed qualifier check", user, permRequest,
                   tableName, makeFamilyMap(family.getKey(), CellUtil.cloneQualifier(cell)));
               }
@@ -370,7 +370,7 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
         } else {
           // no qualifiers and family-level check already failed
           return AuthResult.deny(request, "Failed family check", user, permRequest,
-              tableName, makeFamilyMap(family.getKey(), null));
+            tableName, makeFamilyMap(family.getKey(), null));
         }
       }
 
@@ -487,14 +487,13 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
           familyMap.entrySet()) {
         if (family.getValue() != null && !family.getValue().isEmpty()) {
           for (byte[] qualifier : family.getValue()) {
-            if (getAuthManager().matchPermission(user, tableName,
-                family.getKey(), qualifier, perm)) {
+            if (getAuthManager().authorizeUserTable(user, tableName,
+                  family.getKey(), qualifier, perm)) {
               return true;
             }
           }
         } else {
-          if (getAuthManager().matchPermission(user, tableName, family.getKey(),
-              perm)) {
+          if (getAuthManager().authorizeUserFamily(user, tableName, family.getKey(), perm)) {
             return true;
           }
         }
@@ -683,7 +682,7 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
           foundColumn = true;
           for (Action action: actions) {
             // Are there permissions for this user for the cell?
-            if (!getAuthManager().authorize(user, getTableName(e), cell, action)) {
+            if (!getAuthManager().authorizeCell(user, getTableName(e), cell, action)) {
               // We can stop if the cell ACL denies access
               return false;
             }
@@ -798,7 +797,7 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
 
     // set the user-provider.
     this.userProvider = UserProvider.instantiate(env.getConfiguration());
-    // Throws RuntimeException if fails to load TableAuthManager so that coprocessor is unloaded.
+    // Throws RuntimeException if fails to load AuthManager so that coprocessor is unloaded.
     accessChecker = new AccessChecker(env.getConfiguration(), zk);
     tableAcls = new MapMaker().weakValues().makeMap();
   }
@@ -886,8 +885,8 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
         // default the table owner to current user, if not specified.
         if (owner == null)
           owner = getActiveUser(c).getShortName();
-        final UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
-            desc.getTableName(), null, Action.values());
+        final UserPermission userPermission = new UserPermission(owner,
+          desc.getTableName(), Action.values());
         // switch to the real hbase master user for doing the RPC on the ACL table
         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
           @Override
@@ -895,7 +894,7 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
             try (Table table = c.getEnvironment().getConnection().
                 getTable(AccessControlLists.ACL_TABLE_NAME)) {
               AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(),
-                  userperm, table);
+                userPermission, table);
             }
             return null;
           }
@@ -988,8 +987,8 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
       @Override
       public Void run() throws Exception {
-        UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
-            currentDesc.getTableName(), null, Action.values());
+        UserPermission userperm = new UserPermission(owner,
+          currentDesc.getTableName(), Action.values());
         try (Table table = c.getEnvironment().getConnection().
             getTable(AccessControlLists.ACL_TABLE_NAME)) {
           AccessControlLists.addUserPermission(conf, userperm, table);
@@ -2044,19 +2043,24 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
         if (!initialized) {
           throw new CoprocessorException("AccessController not yet initialized");
         }
+        User caller = RpcServer.getRequestUser().orElse(null);
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Received request to grant access permission " + perm.toString());
+          LOG.debug("Received request from {} to grant access permission {}",
+            caller.getName(), perm.toString());
         }
-        User caller = RpcServer.getRequestUser().orElse(null);
 
         switch(request.getUserPermission().getPermission().getType()) {
           case Global :
+            accessChecker.requireGlobalPermission(caller, "grant", Action.ADMIN, "");
+            break;
           case Table :
-            accessChecker.requirePermission(caller, "grant", perm.getTableName(),
-                perm.getFamily(), perm.getQualifier(), null, Action.ADMIN);
+            TablePermission tablePerm = (TablePermission) perm.getPermission();
+            accessChecker.requirePermission(caller, "grant", tablePerm.getTableName(),
+              tablePerm.getFamily(), tablePerm.getQualifier(), null, Action.ADMIN);
             break;
           case Namespace :
-            accessChecker.requireNamespacePermission(caller, "grant", perm.getNamespace(),
+            NamespacePermission namespacePer = (NamespacePermission) perm.getPermission();
+            accessChecker.requireNamespacePermission(caller, "grant", namespacePer.getNamespace(),
                 null, Action.ADMIN);
            break;
         }
@@ -2102,20 +2106,25 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
         if (!initialized) {
           throw new CoprocessorException("AccessController not yet initialized");
         }
+        User caller = RpcServer.getRequestUser().orElse(null);
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Received request to revoke access permission " + perm.toString());
+          LOG.debug("Received request from {} to revoke access permission {}",
+            caller.getShortName(), perm.toString());
         }
-        User caller = RpcServer.getRequestUser().orElse(null);
 
         switch(request.getUserPermission().getPermission().getType()) {
           case Global :
+            accessChecker.requireGlobalPermission(caller, "revoke", Action.ADMIN, "");
+            break;
           case Table :
-            accessChecker.requirePermission(caller, "revoke", perm.getTableName(), perm.getFamily(),
-              perm.getQualifier(), null, Action.ADMIN);
+            TablePermission tablePerm = (TablePermission) perm.getPermission();
+            accessChecker.requirePermission(caller, "revoke", tablePerm.getTableName(),
+              tablePerm.getFamily(), tablePerm.getQualifier(), null, Action.ADMIN);
             break;
           case Namespace :
-            accessChecker.requireNamespacePermission(caller, "revoke", perm.getNamespace(),
-                null, Action.ADMIN);
+            NamespacePermission namespacePer = (NamespacePermission) perm.getPermission();
+            accessChecker.requireNamespacePermission(caller, "revoke",
+              namespacePer.getNamespace(), null, Action.ADMIN);
             break;
         }
 
@@ -2189,7 +2198,7 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
         } else if (request.getType() == AccessControlProtos.Permission.Type.Namespace) {
           final String namespace = request.getNamespaceName().toStringUtf8();
           accessChecker.requireNamespacePermission(caller, "userPermissions",
-            namespace,userName, Action.ADMIN);
+            namespace, userName, Action.ADMIN);
           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
             @Override
             public List<UserPermission> run() throws Exception {
@@ -2225,8 +2234,7 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
             // them. Also using acl as table name to be inline with the results of global admin and
             // will help in avoiding any leakage of information about being superusers.
             for (String user : Superusers.getSuperUsers()) {
-              perms.add(new UserPermission(Bytes.toBytes(user), AccessControlLists.ACL_TABLE_NAME,
-                  null, Action.values()));
+              perms.add(new UserPermission(user, Action.values()));
             }
           }
         }
@@ -2295,7 +2303,7 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
 
           for (Action action : permission.getActions()) {
             AuthResult result;
-            if (getAuthManager().authorize(user, action)) {
+            if (getAuthManager().authorizeUserGlobal(user, action)) {
               result = AuthResult.allow("checkPermissions", "Global action allowed", user,
                 action, null, null);
             } else {

http://git-wip-us.apache.org/repos/asf/hbase/blob/130057f1/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AuthManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AuthManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AuthManager.java
new file mode 100644
index 0000000..8da9a82
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AuthManager.java
@@ -0,0 +1,608 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.access;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.AuthUtil;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.security.Superusers;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
+/**
+ * Performs authorization checks for a given user's assigned permissions.
+ * <p>
+ *   There're following scopes: <b>Global</b>, <b>Namespace</b>, <b>Table</b>, <b>Family</b>,
+ *   <b>Qualifier</b>, <b>Cell</b>.
+ *   Generally speaking, higher scopes can overrides lower scopes,
+ *   except for Cell permission can be granted even a user has not permission on specified table,
+ *   which means the user can get/scan only those granted cells parts.
+ * </p>
+ * e.g, if user A has global permission R(ead), he can
+ * read table T without checking table scope permission, so authorization checks alway starts from
+ * Global scope.
+ * <p>
+ *   For each scope, not only user but also groups he belongs to will be checked.
+ * </p>
+ */
+@InterfaceAudience.Private
+public final class AuthManager implements Closeable {
+
+  /**
+   * Cache of permissions, it is thread safe.
+   * @param <T> T extends Permission
+   */
+  private static class PermissionCache<T extends Permission> {
+    private final Object mutex = new Object();
+    private Map<String, Set<T>> cache = new HashMap<>();
+
+    void put(String name, T perm) {
+      synchronized (mutex) {
+        Set<T> perms = cache.getOrDefault(name, new HashSet<>());
+        perms.add(perm);
+        cache.put(name, perms);
+      }
+    }
+
+    Set<T> get(String name) {
+      synchronized (mutex) {
+        return cache.get(name);
+      }
+    }
+
+    void clear() {
+      synchronized (mutex) {
+        for (Map.Entry<String, Set<T>> entry : cache.entrySet()) {
+          entry.getValue().clear();
+        }
+        cache.clear();
+      }
+    }
+  }
+  PermissionCache<NamespacePermission> NS_NO_PERMISSION = new PermissionCache<>();
+  PermissionCache<TablePermission> TBL_NO_PERMISSION = new PermissionCache<>();
+
+  /**
+   * Cache for global permission.
+   * Since every user/group can only have one global permission, no need to user PermissionCache.
+   */
+  private volatile Map<String, GlobalPermission> globalCache;
+  /** Cache for namespace permission. */
+  private ConcurrentHashMap<String, PermissionCache<NamespacePermission>> namespaceCache =
+    new ConcurrentHashMap<>();
+  /** Cache for table permission. */
+  private ConcurrentHashMap<TableName, PermissionCache<TablePermission>> tableCache =
+    new ConcurrentHashMap<>();
+
+  private static final Logger LOG = LoggerFactory.getLogger(AuthManager.class);
+
+  private Configuration conf;
+  private ZKPermissionWatcher zkperms;
+  private final AtomicLong mtime = new AtomicLong(0L);
+
+  private AuthManager(ZKWatcher watcher, Configuration conf)
+      throws IOException {
+    this.conf = conf;
+    // initialize global permissions based on configuration
+    globalCache = initGlobal(conf);
+
+    this.zkperms = new ZKPermissionWatcher(watcher, this, conf);
+    try {
+      this.zkperms.start();
+    } catch (KeeperException ke) {
+      LOG.error("ZooKeeper initialization failed", ke);
+    }
+  }
+
+  @Override
+  public void close() {
+    this.zkperms.close();
+  }
+
+  /**
+   * Initialize with global permission assignments
+   * from the {@code hbase.superuser} configuration key.
+   */
+  private Map<String, GlobalPermission> initGlobal(Configuration conf) throws IOException {
+    UserProvider userProvider = UserProvider.instantiate(conf);
+    User user = userProvider.getCurrent();
+    if (user == null) {
+      throw new IOException("Unable to obtain the current user, " +
+        "authorization checks for internal operations will not work correctly!");
+    }
+    String currentUser = user.getShortName();
+
+    Map<String, GlobalPermission> global = new HashMap<>();
+    // the system user is always included
+    List<String> superusers = Lists.asList(currentUser, conf.getStrings(
+        Superusers.SUPERUSER_CONF_KEY, new String[0]));
+    for (String name : superusers) {
+      GlobalPermission globalPermission = new GlobalPermission(Permission.Action.values());
+      global.put(name, globalPermission);
+    }
+    return global;
+  }
+
+  public ZKPermissionWatcher getZKPermissionWatcher() {
+    return this.zkperms;
+  }
+
+  /**
+   * Update acl info for table.
+   * @param table name of table
+   * @param data updated acl data
+   * @throws IOException exception when deserialize data
+   */
+  public void refreshTableCacheFromWritable(TableName table, byte[] data) throws IOException {
+    if (data != null && data.length > 0) {
+      try {
+        ListMultimap<String, Permission> perms =
+          AccessControlLists.readPermissions(data, conf);
+        if (perms != null) {
+          if (Bytes.equals(table.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
+            updateGlobalCache(perms);
+          } else {
+            updateTableCache(table, perms);
+          }
+        }
+      } catch (DeserializationException e) {
+        throw new IOException(e);
+      }
+    } else {
+      LOG.info("Skipping permission cache refresh because writable data is empty");
+    }
+  }
+
+  /**
+   * Update acl info for namespace.
+   * @param namespace namespace
+   * @param data updated acl data
+   * @throws IOException exception when deserialize data
+   */
+  public void refreshNamespaceCacheFromWritable(String namespace, byte[] data) throws IOException {
+    if (data != null && data.length > 0) {
+      try {
+        ListMultimap<String, Permission> perms =
+          AccessControlLists.readPermissions(data, conf);
+        if (perms != null) {
+          updateNamespaceCache(namespace, perms);
+        }
+      } catch (DeserializationException e) {
+        throw new IOException(e);
+      }
+    } else {
+      LOG.debug("Skipping permission cache refresh because writable data is empty");
+    }
+  }
+
+  /**
+   * Updates the internal global permissions cache.
+   * @param globalPerms new global permissions
+   */
+  private void updateGlobalCache(ListMultimap<String, Permission> globalPerms) {
+    try {
+      Map<String, GlobalPermission> global = initGlobal(conf);
+      for (String name : globalPerms.keySet()) {
+        for (Permission permission : globalPerms.get(name)) {
+          global.put(name, (GlobalPermission) permission);
+        }
+      }
+      globalCache = global;
+      mtime.incrementAndGet();
+    } catch (Exception e) {
+      // Never happens
+      LOG.error("Error occurred while updating the global cache", e);
+    }
+  }
+
+  /**
+   * Updates the internal table permissions cache for specified table.
+   * @param table updated table name
+   * @param tablePerms new table permissions
+   */
+  private void updateTableCache(TableName table, ListMultimap<String, Permission> tablePerms) {
+    PermissionCache<TablePermission> cacheToUpdate =
+      tableCache.getOrDefault(table, new PermissionCache<>());
+    clearCache(cacheToUpdate);
+    updateCache(tablePerms, cacheToUpdate);
+    tableCache.put(table, cacheToUpdate);
+    mtime.incrementAndGet();
+  }
+
+  /**
+   * Updates the internal namespace permissions cache for specified namespace.
+   * @param namespace updated namespace
+   * @param nsPerms new namespace permissions
+   */
+  private void updateNamespaceCache(String namespace,
+      ListMultimap<String, Permission> nsPerms) {
+    PermissionCache<NamespacePermission> cacheToUpdate =
+      namespaceCache.getOrDefault(namespace, new PermissionCache<>());
+    clearCache(cacheToUpdate);
+    updateCache(nsPerms, cacheToUpdate);
+    namespaceCache.put(namespace, cacheToUpdate);
+    mtime.incrementAndGet();
+  }
+
+  private void clearCache(PermissionCache cacheToUpdate) {
+    cacheToUpdate.clear();
+  }
+
+  @SuppressWarnings("unchecked")
+  private void updateCache(ListMultimap<String, ? extends Permission> newPermissions,
+      PermissionCache cacheToUpdate) {
+    for (String name : newPermissions.keySet()) {
+      for (Permission permission : newPermissions.get(name)) {
+        cacheToUpdate.put(name, permission);
+      }
+    }
+  }
+
+  /**
+   * Check if user has given action privilige in global scope.
+   * @param user user name
+   * @param action one of action in [Read, Write, Create, Exec, Admin]
+   * @return true if user has, false otherwise
+   */
+  public boolean authorizeUserGlobal(User user, Permission.Action action) {
+    if (user == null) {
+      return false;
+    }
+    if (authorizeGlobal(globalCache.get(user.getShortName()), action)) {
+      return true;
+    }
+    for (String group : user.getGroupNames()) {
+      if (authorizeGlobal(globalCache.get(AuthUtil.toGroupEntry(group)), action)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean authorizeGlobal(GlobalPermission permissions, Permission.Action action) {
+    return permissions != null && permissions.implies(action);
+  }
+
+  /**
+   * Check if user has given action privilige in namespace scope.
+   * @param user user name
+   * @param namespace namespace
+   * @param action one of action in [Read, Write, Create, Exec, Admin]
+   * @return true if user has, false otherwise
+   */
+  public boolean authorizeUserNamespace(User user, String namespace, Permission.Action action) {
+    if (user == null) {
+      return false;
+    }
+    if (authorizeUserGlobal(user, action)) {
+      return true;
+    }
+    PermissionCache<NamespacePermission> nsPermissions = namespaceCache.getOrDefault(namespace,
+      NS_NO_PERMISSION);
+    if (authorizeNamespace(nsPermissions.get(user.getShortName()), namespace, action)) {
+      return true;
+    }
+    for (String group : user.getGroupNames()) {
+      if (authorizeNamespace(nsPermissions.get(AuthUtil.toGroupEntry(group)), namespace, action)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean authorizeNamespace(Set<NamespacePermission> permissions,
+      String namespace, Permission.Action action) {
+    if (permissions == null) {
+      return false;
+    }
+    for (NamespacePermission permission : permissions) {
+      if (permission.implies(namespace, action)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Checks if the user has access to the full table or at least a family/qualifier
+   * for the specified action.
+   * @param user user name
+   * @param table table name
+   * @param action action in one of [Read, Write, Create, Exec, Admin]
+   * @return true if the user has access to the table, false otherwise
+   */
+  public boolean accessUserTable(User user, TableName table, Permission.Action action) {
+    if (user == null) {
+      return false;
+    }
+    if (table == null) {
+      table = AccessControlLists.ACL_TABLE_NAME;
+    }
+    if (authorizeUserNamespace(user, table.getNamespaceAsString(), action)) {
+      return true;
+    }
+    PermissionCache<TablePermission> tblPermissions = tableCache.getOrDefault(table,
+      TBL_NO_PERMISSION);
+    if (hasAccessTable(tblPermissions.get(user.getShortName()), action)) {
+      return true;
+    }
+    for (String group : user.getGroupNames()) {
+      if (hasAccessTable(tblPermissions.get(AuthUtil.toGroupEntry(group)), action)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean hasAccessTable(Set<TablePermission> permissions, Permission.Action action) {
+    if (permissions == null) {
+      return false;
+    }
+    for (TablePermission permission : permissions) {
+      if (permission.implies(action)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Check if user has given action privilige in table scope.
+   * @param user user name
+   * @param table table name
+   * @param action one of action in [Read, Write, Create, Exec, Admin]
+   * @return true if user has, false otherwise
+   */
+  public boolean authorizeUserTable(User user, TableName table, Permission.Action action) {
+    return authorizeUserTable(user, table, null, null, action);
+  }
+
+  /**
+   * Check if user has given action privilige in table:family scope.
+   * @param user user name
+   * @param table table name
+   * @param family family name
+   * @param action one of action in [Read, Write, Create, Exec, Admin]
+   * @return true if user has, false otherwise
+   */
+  public boolean authorizeUserTable(User user, TableName table, byte[] family,
+      Permission.Action action) {
+    return authorizeUserTable(user, table, family, null, action);
+  }
+
+  /**
+   * Check if user has given action privilige in table:family:qualifier scope.
+   * @param user user name
+   * @param table table name
+   * @param family family name
+   * @param qualifier qualifier name
+   * @param action one of action in [Read, Write, Create, Exec, Admin]
+   * @return true if user has, false otherwise
+   */
+  public boolean authorizeUserTable(User user, TableName table, byte[] family,
+      byte[] qualifier, Permission.Action action) {
+    if (user == null) {
+      return false;
+    }
+    if (table == null) {
+      table = AccessControlLists.ACL_TABLE_NAME;
+    }
+    if (authorizeUserNamespace(user, table.getNamespaceAsString(), action)) {
+      return true;
+    }
+    PermissionCache<TablePermission> tblPermissions = tableCache.getOrDefault(table,
+      TBL_NO_PERMISSION);
+    if (authorizeTable(tblPermissions.get(user.getShortName()), table, family, qualifier, action)) {
+      return true;
+    }
+    for (String group : user.getGroupNames()) {
+      if (authorizeTable(tblPermissions.get(AuthUtil.toGroupEntry(group)),
+          table, family, qualifier, action)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean authorizeTable(Set<TablePermission> permissions,
+      TableName table, byte[] family, byte[] qualifier, Permission.Action action) {
+    if (permissions == null) {
+      return false;
+    }
+    for (TablePermission permission : permissions) {
+      if (permission.implies(table, family, qualifier, action)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Check if user has given action privilige in table:family scope.
+   * This method is for backward compatibility.
+   * @param user user name
+   * @param table table name
+   * @param family family names
+   * @param action one of action in [Read, Write, Create, Exec, Admin]
+   * @return true if user has, false otherwise
+   */
+  public boolean authorizeUserFamily(User user, TableName table,
+      byte[] family, Permission.Action action) {
+    PermissionCache<TablePermission> tblPermissions = tableCache.getOrDefault(table,
+      TBL_NO_PERMISSION);
+    if (authorizeFamily(tblPermissions.get(user.getShortName()), table, family, action)) {
+      return true;
+    }
+    for (String group : user.getGroupNames()) {
+      if (authorizeFamily(tblPermissions.get(AuthUtil.toGroupEntry(group)),
+          table, family, action)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean authorizeFamily(Set<TablePermission> permissions,
+      TableName table, byte[] family, Permission.Action action) {
+    if (permissions == null) {
+      return false;
+    }
+    for (TablePermission permission : permissions) {
+      if (permission.implies(table, family, action)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Check if user has given action privilige in cell scope.
+   * @param user user name
+   * @param table table name
+   * @param cell cell to be checked
+   * @param action one of action in [Read, Write, Create, Exec, Admin]
+   * @return true if user has, false otherwise
+   */
+  public boolean authorizeCell(User user, TableName table, Cell cell, Permission.Action action) {
+    try {
+      List<Permission> perms = AccessControlLists.getCellPermissionsForUser(user, cell);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Perms for user " + user.getShortName() + " in cell " + cell + ": " +
+          (perms != null ? perms : ""));
+      }
+      if (perms != null) {
+        for (Permission p: perms) {
+          if (p.implies(action)) {
+            return true;
+          }
+        }
+      }
+    } catch (IOException e) {
+      // We failed to parse the KV tag
+      LOG.error("Failed parse of ACL tag in cell " + cell);
+      // Fall through to check with the table and CF perms we were able
+      // to collect regardless
+    }
+    return false;
+  }
+
+  /**
+   * Remove given namespace from AuthManager's namespace cache.
+   * @param ns namespace
+   */
+  public void removeNamespace(byte[] ns) {
+    namespaceCache.remove(Bytes.toString(ns));
+  }
+
+  /**
+   * Remove given table from AuthManager's table cache.
+   * @param table table name
+   */
+  public void removeTable(TableName table) {
+    tableCache.remove(table);
+  }
+
+  /**
+   * Last modification logical time
+   * @return time
+   */
+  public long getMTime() {
+    return mtime.get();
+  }
+
+  private static Map<ZKWatcher, AuthManager> managerMap = new HashMap<>();
+
+  private static Map<AuthManager, Integer> refCount = new HashMap<>();
+
+  /**
+   * Returns a AuthManager from the cache. If not cached, constructs a new one.
+   * Returned instance should be released back by calling {@link #release(AuthManager)}.
+   * @param watcher zk watcher
+   * @param conf configuration
+   * @return an AuthManager
+   * @throws IOException zookeeper initialization failed
+   */
+  public synchronized static AuthManager getOrCreate(
+      ZKWatcher watcher, Configuration conf) throws IOException {
+    AuthManager instance = managerMap.get(watcher);
+    if (instance == null) {
+      instance = new AuthManager(watcher, conf);
+      managerMap.put(watcher, instance);
+    }
+    int ref = refCount.get(instance) == null ? 0 : refCount.get(instance);
+    refCount.put(instance, ref + 1);
+    return instance;
+  }
+
+  @VisibleForTesting
+  public static int getTotalRefCount() {
+    int total = 0;
+    for (int count : refCount.values()) {
+      total += count;
+    }
+    return total;
+  }
+
+  /**
+   * Releases the resources for the given AuthManager if the reference count is down to 0.
+   * @param instance AuthManager to be released
+   */
+  public synchronized static void release(AuthManager instance) {
+    if (refCount.get(instance) == null || refCount.get(instance) < 1) {
+      String msg = "Something wrong with the AuthManager reference counting: " + instance
+          + " whose count is " + refCount.get(instance);
+      LOG.error(HBaseMarkers.FATAL, msg);
+      instance.close();
+      managerMap.remove(instance.getZKPermissionWatcher().getWatcher());
+      instance.getZKPermissionWatcher().getWatcher().abort(msg, null);
+    } else {
+      int ref = refCount.get(instance);
+      --ref;
+      refCount.put(instance, ref);
+      if (ref == 0) {
+        instance.close();
+        managerMap.remove(instance.getZKPermissionWatcher().getWatcher());
+        refCount.remove(instance);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/130057f1/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java
deleted file mode 100644
index 76feff4..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java
+++ /dev/null
@@ -1,787 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.security.access;
-
-import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
-
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
-import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.AuthUtil;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.log.HBaseMarkers;
-import org.apache.hadoop.hbase.security.Superusers;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Performs authorization checks for a given user's assigned permissions
- */
-@InterfaceAudience.Private
-public class TableAuthManager implements Closeable {
-  private static class PermissionCache<T extends Permission> {
-    /** Cache of user permissions */
-    private ListMultimap<String,T> userCache = ArrayListMultimap.create();
-    /** Cache of group permissions */
-    private ListMultimap<String,T> groupCache = ArrayListMultimap.create();
-
-    public List<T> getUser(String user) {
-      return userCache.get(user);
-    }
-
-    public void putUser(String user, T perm) {
-      userCache.put(user, perm);
-    }
-
-    public List<T> replaceUser(String user, Iterable<? extends T> perms) {
-      return userCache.replaceValues(user, perms);
-    }
-
-    public List<T> getGroup(String group) {
-      return groupCache.get(group);
-    }
-
-    public void putGroup(String group, T perm) {
-      groupCache.put(group, perm);
-    }
-
-    public List<T> replaceGroup(String group, Iterable<? extends T> perms) {
-      return groupCache.replaceValues(group, perms);
-    }
-
-    /**
-     * Returns a combined map of user and group permissions, with group names
-     * distinguished according to {@link AuthUtil#isGroupPrincipal(String)}.
-     */
-    public ListMultimap<String,T> getAllPermissions() {
-      ListMultimap<String,T> tmp = ArrayListMultimap.create();
-      tmp.putAll(userCache);
-      for (String group : groupCache.keySet()) {
-        tmp.putAll(AuthUtil.toGroupEntry(group), groupCache.get(group));
-      }
-      return tmp;
-    }
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(TableAuthManager.class);
-
-  /** Cache of global permissions */
-  private volatile PermissionCache<Permission> globalCache;
-
-  private ConcurrentSkipListMap<TableName, PermissionCache<TablePermission>> tableCache =
-      new ConcurrentSkipListMap<>();
-
-  private ConcurrentSkipListMap<String, PermissionCache<TablePermission>> nsCache =
-    new ConcurrentSkipListMap<>();
-
-  private Configuration conf;
-  private ZKPermissionWatcher zkperms;
-  private final AtomicLong mtime = new AtomicLong(0L);
-
-  private TableAuthManager(ZKWatcher watcher, Configuration conf)
-      throws IOException {
-    this.conf = conf;
-
-    // initialize global permissions based on configuration
-    globalCache = initGlobal(conf);
-
-    this.zkperms = new ZKPermissionWatcher(watcher, this, conf);
-    try {
-      this.zkperms.start();
-    } catch (KeeperException ke) {
-      LOG.error("ZooKeeper initialization failed", ke);
-    }
-  }
-
-  @Override
-  public void close() {
-    this.zkperms.close();
-  }
-
-  /**
-   * Returns a new {@code PermissionCache} initialized with permission assignments
-   * from the {@code hbase.superuser} configuration key.
-   */
-  private PermissionCache<Permission> initGlobal(Configuration conf) throws IOException {
-    UserProvider userProvider = UserProvider.instantiate(conf);
-    User user = userProvider.getCurrent();
-    if (user == null) {
-      throw new IOException("Unable to obtain the current user, " +
-          "authorization checks for internal operations will not work correctly!");
-    }
-    PermissionCache<Permission> newCache = new PermissionCache<>();
-    String currentUser = user.getShortName();
-
-    // the system user is always included
-    List<String> superusers = Lists.asList(currentUser, conf.getStrings(
-        Superusers.SUPERUSER_CONF_KEY, new String[0]));
-    if (superusers != null) {
-      for (String name : superusers) {
-        if (AuthUtil.isGroupPrincipal(name)) {
-          newCache.putGroup(AuthUtil.getGroupName(name),
-              new Permission(Permission.Action.values()));
-        } else {
-          newCache.putUser(name, new Permission(Permission.Action.values()));
-        }
-      }
-    }
-    return newCache;
-  }
-
-  public ZKPermissionWatcher getZKPermissionWatcher() {
-    return this.zkperms;
-  }
-
-  public void refreshTableCacheFromWritable(TableName table,
-                                       byte[] data) throws IOException {
-    if (data != null && data.length > 0) {
-      ListMultimap<String,TablePermission> perms;
-      try {
-        perms = AccessControlLists.readPermissions(data, conf);
-      } catch (DeserializationException e) {
-        throw new IOException(e);
-      }
-
-      if (perms != null) {
-        if (Bytes.equals(table.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
-          updateGlobalCache(perms);
-        } else {
-          updateTableCache(table, perms);
-        }
-      }
-    } else {
-      LOG.debug("Skipping permission cache refresh because writable data is empty");
-    }
-  }
-
-  public void refreshNamespaceCacheFromWritable(String namespace, byte[] data) throws IOException {
-    if (data != null && data.length > 0) {
-      ListMultimap<String,TablePermission> perms;
-      try {
-        perms = AccessControlLists.readPermissions(data, conf);
-      } catch (DeserializationException e) {
-        throw new IOException(e);
-      }
-      if (perms != null) {
-        updateNsCache(namespace, perms);
-      }
-    } else {
-      LOG.debug("Skipping permission cache refresh because writable data is empty");
-    }
-  }
-
-  /**
-   * Updates the internal global permissions cache
-   *
-   * @param userPerms
-   */
-  private void updateGlobalCache(ListMultimap<String,TablePermission> userPerms) {
-    PermissionCache<Permission> newCache = null;
-    try {
-      newCache = initGlobal(conf);
-      for (Map.Entry<String,TablePermission> entry : userPerms.entries()) {
-        if (AuthUtil.isGroupPrincipal(entry.getKey())) {
-          newCache.putGroup(AuthUtil.getGroupName(entry.getKey()),
-              new Permission(entry.getValue().getActions()));
-        } else {
-          newCache.putUser(entry.getKey(), new Permission(entry.getValue().getActions()));
-        }
-      }
-      globalCache = newCache;
-      mtime.incrementAndGet();
-    } catch (IOException e) {
-      // Never happens
-      LOG.error("Error occurred while updating the global cache", e);
-    }
-  }
-
-  /**
-   * Updates the internal permissions cache for a single table, splitting
-   * the permissions listed into separate caches for users and groups to optimize
-   * group lookups.
-   *
-   * @param table
-   * @param tablePerms
-   */
-  private void updateTableCache(TableName table,
-                                ListMultimap<String,TablePermission> tablePerms) {
-    PermissionCache<TablePermission> newTablePerms = new PermissionCache<>();
-
-    for (Map.Entry<String,TablePermission> entry : tablePerms.entries()) {
-      if (AuthUtil.isGroupPrincipal(entry.getKey())) {
-        newTablePerms.putGroup(AuthUtil.getGroupName(entry.getKey()), entry.getValue());
-      } else {
-        newTablePerms.putUser(entry.getKey(), entry.getValue());
-      }
-    }
-
-    tableCache.put(table, newTablePerms);
-    mtime.incrementAndGet();
-  }
-
-  /**
-   * Updates the internal permissions cache for a single table, splitting
-   * the permissions listed into separate caches for users and groups to optimize
-   * group lookups.
-   *
-   * @param namespace
-   * @param tablePerms
-   */
-  private void updateNsCache(String namespace,
-                             ListMultimap<String, TablePermission> tablePerms) {
-    PermissionCache<TablePermission> newTablePerms = new PermissionCache<>();
-
-    for (Map.Entry<String, TablePermission> entry : tablePerms.entries()) {
-      if (AuthUtil.isGroupPrincipal(entry.getKey())) {
-        newTablePerms.putGroup(AuthUtil.getGroupName(entry.getKey()), entry.getValue());
-      } else {
-        newTablePerms.putUser(entry.getKey(), entry.getValue());
-      }
-    }
-
-    nsCache.put(namespace, newTablePerms);
-    mtime.incrementAndGet();
-  }
-
-  private PermissionCache<TablePermission> getTablePermissions(TableName table) {
-    return computeIfAbsent(tableCache, table, PermissionCache::new);
-  }
-
-  private PermissionCache<TablePermission> getNamespacePermissions(String namespace) {
-    return computeIfAbsent(nsCache, namespace, PermissionCache::new);
-  }
-
-  /**
-   * Authorizes a global permission
-   * @param perms
-   * @param action
-   * @return true if authorized, false otherwise
-   */
-  private boolean authorize(List<Permission> perms, Permission.Action action) {
-    if (perms != null) {
-      for (Permission p : perms) {
-        if (p.implies(action)) {
-          return true;
-        }
-      }
-    } else if (LOG.isDebugEnabled()) {
-      LOG.debug("No permissions found for " + action);
-    }
-
-    return false;
-  }
-
-  /**
-   * Authorize a global permission based on ACLs for the given user and the
-   * user's groups.
-   * @param user
-   * @param action
-   * @return true if known and authorized, false otherwise
-   */
-  public boolean authorize(User user, Permission.Action action) {
-    if (user == null) {
-      return false;
-    }
-
-    if (authorize(globalCache.getUser(user.getShortName()), action)) {
-      return true;
-    }
-
-    String[] groups = user.getGroupNames();
-    if (groups != null) {
-      for (String group : groups) {
-        if (authorize(globalCache.getGroup(group), action)) {
-          return true;
-        }
-      }
-    }
-    return false;
-  }
-
-  private boolean authorize(List<TablePermission> perms,
-                            TableName table, byte[] family,
-                            byte[] qualifier, Permission.Action action) {
-    if (perms != null) {
-      for (TablePermission p : perms) {
-        if (p.implies(table, family, qualifier, action)) {
-          return true;
-        }
-      }
-    } else if (LOG.isDebugEnabled()) {
-      LOG.debug("No permissions found for table="+table);
-    }
-    return false;
-  }
-
-  private boolean hasAccess(List<TablePermission> perms,
-                            TableName table, Permission.Action action) {
-    if (perms != null) {
-      for (TablePermission p : perms) {
-        if (p.implies(action)) {
-          return true;
-        }
-      }
-    } else if (LOG.isDebugEnabled()) {
-      LOG.debug("No permissions found for table="+table);
-    }
-    return false;
-  }
-
-  /**
-   * Authorize a user for a given KV. This is called from AccessControlFilter.
-   */
-  public boolean authorize(User user, TableName table, Cell cell, Permission.Action action) {
-    try {
-      List<Permission> perms = AccessControlLists.getCellPermissionsForUser(user, cell);
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Perms for user " + user.getShortName() + " in cell " + cell + ": " +
-          (perms != null ? perms : ""));
-      }
-      if (perms != null) {
-        for (Permission p: perms) {
-          if (p.implies(action)) {
-            return true;
-          }
-        }
-      }
-    } catch (IOException e) {
-      // We failed to parse the KV tag
-      LOG.error("Failed parse of ACL tag in cell " + cell);
-      // Fall through to check with the table and CF perms we were able
-      // to collect regardless
-    }
-    return false;
-  }
-
-  public boolean authorize(User user, String namespace, Permission.Action action) {
-    // Global authorizations supercede namespace level
-    if (authorize(user, action)) {
-      return true;
-    }
-    // Check namespace permissions
-    PermissionCache<TablePermission> tablePerms = nsCache.get(namespace);
-    if (tablePerms != null) {
-      List<TablePermission> userPerms = tablePerms.getUser(user.getShortName());
-      if (authorize(userPerms, namespace, action)) {
-        return true;
-      }
-      String[] groupNames = user.getGroupNames();
-      if (groupNames != null) {
-        for (String group : groupNames) {
-          List<TablePermission> groupPerms = tablePerms.getGroup(group);
-          if (authorize(groupPerms, namespace, action)) {
-            return true;
-          }
-        }
-      }
-    }
-    return false;
-  }
-
-  private boolean authorize(List<TablePermission> perms, String namespace,
-                            Permission.Action action) {
-    if (perms != null) {
-      for (TablePermission p : perms) {
-        if (p.implies(namespace, action)) {
-          return true;
-        }
-      }
-    } else if (LOG.isDebugEnabled()) {
-      LOG.debug("No permissions for authorize() check, table=" + namespace);
-    }
-
-    return false;
-  }
-
-  /**
-   * Checks authorization to a given table and column family for a user, based on the
-   * stored user permissions.
-   *
-   * @param user
-   * @param table
-   * @param family
-   * @param action
-   * @return true if known and authorized, false otherwise
-   */
-  public boolean authorizeUser(User user, TableName table, byte[] family,
-      Permission.Action action) {
-    return authorizeUser(user, table, family, null, action);
-  }
-
-  public boolean authorizeUser(User user, TableName table, byte[] family,
-      byte[] qualifier, Permission.Action action) {
-    if (table == null) table = AccessControlLists.ACL_TABLE_NAME;
-    // Global and namespace authorizations supercede table level
-    if (authorize(user, table.getNamespaceAsString(), action)) {
-      return true;
-    }
-    // Check table permissions
-    return authorize(getTablePermissions(table).getUser(user.getShortName()), table, family,
-        qualifier, action);
-  }
-
-  /**
-   * Checks if the user has access to the full table or at least a family/qualifier
-   * for the specified action.
-   *
-   * @param user
-   * @param table
-   * @param action
-   * @return true if the user has access to the table, false otherwise
-   */
-  public boolean userHasAccess(User user, TableName table, Permission.Action action) {
-    if (table == null) table = AccessControlLists.ACL_TABLE_NAME;
-    // Global and namespace authorizations supercede table level
-    if (authorize(user, table.getNamespaceAsString(), action)) {
-      return true;
-    }
-    // Check table permissions
-    return hasAccess(getTablePermissions(table).getUser(user.getShortName()), table, action);
-  }
-
-  /**
-   * Checks global authorization for a given action for a group, based on the stored
-   * permissions.
-   */
-  public boolean authorizeGroup(String groupName, Permission.Action action) {
-    List<Permission> perms = globalCache.getGroup(groupName);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("authorizing " + (perms != null && !perms.isEmpty() ? perms.get(0) : "") +
-        " for " + action);
-    }
-    return authorize(perms, action);
-  }
-
-  /**
-   * Checks authorization to a given table, column family and column for a group, based
-   * on the stored permissions.
-   * @param groupName
-   * @param table
-   * @param family
-   * @param qualifier
-   * @param action
-   * @return true if known and authorized, false otherwise
-   */
-  public boolean authorizeGroup(String groupName, TableName table, byte[] family,
-      byte[] qualifier, Permission.Action action) {
-    // Global authorization supercedes table level
-    if (authorizeGroup(groupName, action)) {
-      return true;
-    }
-    if (table == null) table = AccessControlLists.ACL_TABLE_NAME;
-    // Namespace authorization supercedes table level
-    String namespace = table.getNamespaceAsString();
-    if (authorize(getNamespacePermissions(namespace).getGroup(groupName), namespace, action)) {
-      return true;
-    }
-    // Check table level
-    List<TablePermission> tblPerms = getTablePermissions(table).getGroup(groupName);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("authorizing " + (tblPerms != null && !tblPerms.isEmpty() ? tblPerms.get(0) : "") +
-        " for " +groupName + " on " + table + "." + Bytes.toString(family) + "." +
-        Bytes.toString(qualifier) + " with " + action);
-    }
-    return authorize(tblPerms, table, family, qualifier, action);
-  }
-
-  /**
-   * Checks if the user has access to the full table or at least a family/qualifier
-   * for the specified action.
-   * @param groupName
-   * @param table
-   * @param action
-   * @return true if the group has access to the table, false otherwise
-   */
-  public boolean groupHasAccess(String groupName, TableName table, Permission.Action action) {
-    // Global authorization supercedes table level
-    if (authorizeGroup(groupName, action)) {
-      return true;
-    }
-    if (table == null) table = AccessControlLists.ACL_TABLE_NAME;
-    // Namespace authorization supercedes table level
-    if (hasAccess(getNamespacePermissions(table.getNamespaceAsString()).getGroup(groupName),
-        table, action)) {
-      return true;
-    }
-    // Check table level
-    return hasAccess(getTablePermissions(table).getGroup(groupName), table, action);
-  }
-
-  public boolean authorize(User user, TableName table, byte[] family,
-      byte[] qualifier, Permission.Action action) {
-    if (authorizeUser(user, table, family, qualifier, action)) {
-      return true;
-    }
-
-    String[] groups = user.getGroupNames();
-    if (groups != null) {
-      for (String group : groups) {
-        if (authorizeGroup(group, table, family, qualifier, action)) {
-          return true;
-        }
-      }
-    }
-    return false;
-  }
-
-  public boolean hasAccess(User user, TableName table, Permission.Action action) {
-    if (userHasAccess(user, table, action)) {
-      return true;
-    }
-
-    String[] groups = user.getGroupNames();
-    if (groups != null) {
-      for (String group : groups) {
-        if (groupHasAccess(group, table, action)) {
-          return true;
-        }
-      }
-    }
-    return false;
-  }
-
-  public boolean authorize(User user, TableName table, byte[] family,
-      Permission.Action action) {
-    return authorize(user, table, family, null, action);
-  }
-
-  /**
-   * Returns true if the given user has a {@link TablePermission} matching up
-   * to the column family portion of a permission.  Note that this permission
-   * may be scoped to a given column qualifier and does not guarantee that
-   * authorize() on the same column family would return true.
-   */
-  public boolean matchPermission(User user,
-      TableName table, byte[] family, Permission.Action action) {
-    PermissionCache<TablePermission> tablePerms = tableCache.get(table);
-    if (tablePerms != null) {
-      List<TablePermission> userPerms = tablePerms.getUser(user.getShortName());
-      if (userPerms != null) {
-        for (TablePermission p : userPerms) {
-          if (p.matchesFamily(table, family, action)) {
-            return true;
-          }
-        }
-      }
-
-      String[] groups = user.getGroupNames();
-      if (groups != null) {
-        for (String group : groups) {
-          List<TablePermission> groupPerms = tablePerms.getGroup(group);
-          if (groupPerms != null) {
-            for (TablePermission p : groupPerms) {
-              if (p.matchesFamily(table, family, action)) {
-                return true;
-              }
-            }
-          }
-        }
-      }
-    }
-
-    return false;
-  }
-
-  public boolean matchPermission(User user,
-      TableName table, byte[] family, byte[] qualifier,
-      Permission.Action action) {
-    PermissionCache<TablePermission> tablePerms = tableCache.get(table);
-    if (tablePerms != null) {
-      List<TablePermission> userPerms = tablePerms.getUser(user.getShortName());
-      if (userPerms != null) {
-        for (TablePermission p : userPerms) {
-          if (p.matchesFamilyQualifier(table, family, qualifier, action)) {
-            return true;
-          }
-        }
-      }
-
-      String[] groups = user.getGroupNames();
-      if (groups != null) {
-        for (String group : groups) {
-          List<TablePermission> groupPerms = tablePerms.getGroup(group);
-          if (groupPerms != null) {
-            for (TablePermission p : groupPerms) {
-              if (p.matchesFamilyQualifier(table, family, qualifier, action)) {
-                return true;
-              }
-            }
-          }
-        }
-      }
-    }
-    return false;
-  }
-
-  public void removeNamespace(byte[] ns) {
-    nsCache.remove(Bytes.toString(ns));
-  }
-
-  public void removeTable(TableName table) {
-    tableCache.remove(table);
-  }
-
-  /**
-   * Overwrites the existing permission set for a given user for a table, and
-   * triggers an update for zookeeper synchronization.
-   * @param username
-   * @param table
-   * @param perms
-   */
-  public void setTableUserPermissions(String username, TableName table,
-      List<TablePermission> perms) {
-    PermissionCache<TablePermission> tablePerms = getTablePermissions(table);
-    tablePerms.replaceUser(username, perms);
-    writeTableToZooKeeper(table, tablePerms);
-  }
-
-  /**
-   * Overwrites the existing permission set for a group and triggers an update
-   * for zookeeper synchronization.
-   * @param group
-   * @param table
-   * @param perms
-   */
-  public void setTableGroupPermissions(String group, TableName table,
-      List<TablePermission> perms) {
-    PermissionCache<TablePermission> tablePerms = getTablePermissions(table);
-    tablePerms.replaceGroup(group, perms);
-    writeTableToZooKeeper(table, tablePerms);
-  }
-
-  /**
-   * Overwrites the existing permission set for a given user for a table, and
-   * triggers an update for zookeeper synchronization.
-   * @param username
-   * @param namespace
-   * @param perms
-   */
-  public void setNamespaceUserPermissions(String username, String namespace,
-      List<TablePermission> perms) {
-    PermissionCache<TablePermission> tablePerms = getNamespacePermissions(namespace);
-    tablePerms.replaceUser(username, perms);
-    writeNamespaceToZooKeeper(namespace, tablePerms);
-  }
-
-  /**
-   * Overwrites the existing permission set for a group and triggers an update
-   * for zookeeper synchronization.
-   * @param group
-   * @param namespace
-   * @param perms
-   */
-  public void setNamespaceGroupPermissions(String group, String namespace,
-      List<TablePermission> perms) {
-    PermissionCache<TablePermission> tablePerms = getNamespacePermissions(namespace);
-    tablePerms.replaceGroup(group, perms);
-    writeNamespaceToZooKeeper(namespace, tablePerms);
-  }
-
-  public void writeTableToZooKeeper(TableName table,
-      PermissionCache<TablePermission> tablePerms) {
-    byte[] serialized = new byte[0];
-    if (tablePerms != null) {
-      serialized = AccessControlLists.writePermissionsAsBytes(tablePerms.getAllPermissions(), conf);
-    }
-    zkperms.writeToZookeeper(table.getName(), serialized);
-  }
-
-  public void writeNamespaceToZooKeeper(String namespace,
-      PermissionCache<TablePermission> tablePerms) {
-    byte[] serialized = new byte[0];
-    if (tablePerms != null) {
-      serialized = AccessControlLists.writePermissionsAsBytes(tablePerms.getAllPermissions(), conf);
-    }
-    zkperms.writeToZookeeper(Bytes.toBytes(AccessControlLists.toNamespaceEntry(namespace)),
-        serialized);
-  }
-
-  public long getMTime() {
-    return mtime.get();
-  }
-
-  private static Map<ZKWatcher,TableAuthManager> managerMap = new HashMap<>();
-
-  private static Map<TableAuthManager, Integer> refCount = new HashMap<>();
-
-  /** Returns a TableAuthManager from the cache. If not cached, constructs a new one. Returned
-   * instance should be released back by calling {@link #release(TableAuthManager)}. */
-  public synchronized static TableAuthManager getOrCreate(
-          ZKWatcher watcher, Configuration conf) throws IOException {
-    TableAuthManager instance = managerMap.get(watcher);
-    if (instance == null) {
-      instance = new TableAuthManager(watcher, conf);
-      managerMap.put(watcher, instance);
-    }
-    int ref = refCount.get(instance) == null ? 0 : refCount.get(instance).intValue();
-    refCount.put(instance, ref + 1);
-    return instance;
-  }
-
-  @VisibleForTesting
-  public static int getTotalRefCount() {
-    int total = 0;
-    for (int count : refCount.values()) {
-      total += count;
-    }
-    return total;
-  }
-
-  /**
-   * Releases the resources for the given TableAuthManager if the reference count is down to 0.
-   * @param instance TableAuthManager to be released
-   */
-  public synchronized static void release(TableAuthManager instance) {
-    if (refCount.get(instance) == null || refCount.get(instance) < 1) {
-      String msg = "Something wrong with the TableAuthManager reference counting: " + instance
-          + " whose count is " + refCount.get(instance);
-      LOG.error(HBaseMarkers.FATAL, msg);
-      instance.close();
-      managerMap.remove(instance.getZKPermissionWatcher().getWatcher());
-      instance.getZKPermissionWatcher().getWatcher().abort(msg, null);
-    } else {
-      int ref = refCount.get(instance);
-      refCount.put(instance, ref-1);
-      if (ref-1 == 0) {
-        instance.close();
-        managerMap.remove(instance.getZKPermissionWatcher().getWatcher());
-        refCount.remove(instance);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/130057f1/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java
index 2266e86..fa3c30f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java
@@ -49,21 +49,21 @@ import java.util.concurrent.RejectedExecutionException;
  * {@code /hbase/acl/tablename}, with the znode data containing a serialized
  * list of the permissions granted for the table.  The {@code AccessController}
  * instances on all other cluster hosts watch the znodes for updates, which
- * trigger updates in the {@link TableAuthManager} permission cache.
+ * trigger updates in the {@link AuthManager} permission cache.
  */
 @InterfaceAudience.Private
 public class ZKPermissionWatcher extends ZKListener implements Closeable {
   private static final Logger LOG = LoggerFactory.getLogger(ZKPermissionWatcher.class);
   // parent node for permissions lists
   static final String ACL_NODE = "acl";
-  private final TableAuthManager authManager;
+  private final AuthManager authManager;
   private final String aclZNode;
   private final CountDownLatch initialized = new CountDownLatch(1);
   private final ExecutorService executor;
   private Future<?> childrenChangedFuture;
 
   public ZKPermissionWatcher(ZKWatcher watcher,
-      TableAuthManager authManager, Configuration conf) {
+      AuthManager authManager, Configuration conf) {
     super(watcher);
     this.authManager = authManager;
     String aclZnodeParent = conf.get("zookeeper.znode.acl.parent", ACL_NODE);

http://git-wip-us.apache.org/repos/asf/hbase/blob/130057f1/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java
index 7edf734..6aa378c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.hadoop.hbase.security.access.AccessControlClient;
+import org.apache.hadoop.hbase.security.access.Permission;
 import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
 import org.apache.hadoop.hbase.security.access.TablePermission;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -858,15 +859,16 @@ public class RestoreSnapshotHelper {
       Configuration conf) throws IOException {
     if (snapshot.hasUsersAndPermissions() && snapshot.getUsersAndPermissions() != null) {
       LOG.info("Restore snapshot acl to table. snapshot: " + snapshot + ", table: " + newTableName);
-      ListMultimap<String, TablePermission> perms =
+      ListMultimap<String, Permission> perms =
           ShadedAccessControlUtil.toUserTablePermissions(snapshot.getUsersAndPermissions());
       try (Connection conn = ConnectionFactory.createConnection(conf)) {
-        for (Entry<String, TablePermission> e : perms.entries()) {
+        for (Entry<String, Permission> e : perms.entries()) {
           String user = e.getKey();
-          TablePermission perm = e.getValue();
-          perm.setTableName(newTableName);
-          AccessControlClient.grant(conn, perm.getTableName(), user, perm.getFamily(),
-            perm.getQualifier(), perm.getActions());
+          TablePermission tablePerm = (TablePermission) e.getValue();
+          TablePermission newPerm = new TablePermission(newTableName,
+            tablePerm.getFamily(), tablePerm.getQualifier(), tablePerm.getActions());
+          AccessControlClient.grant(conn, newPerm.getTableName(), user, newPerm.getFamily(),
+            newPerm.getQualifier(), newPerm.getActions());
         }
       } catch (Throwable e) {
         throw new IOException("Grant acl into newly creatd table failed. snapshot: " + snapshot


[3/3] hbase git commit: HBASE-21255 [acl] Refactor TablePermission into three classes (Global, Namespace, Table)

Posted by re...@apache.org.
HBASE-21255 [acl] Refactor TablePermission into three classes (Global, Namespace, Table)

Signed-off-by: Michael Stack <st...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/130057f1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/130057f1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/130057f1

Branch: refs/heads/master
Commit: 130057f13774f6b213cdb06952c805a29d59396e
Parents: 9e42a9e
Author: Reid Chan <re...@apache.org>
Authored: Wed Nov 14 11:12:14 2018 +0800
Committer: Reid Chan <re...@apache.org>
Committed: Thu Nov 15 11:34:16 2018 +0800

----------------------------------------------------------------------
 .../security/access/AccessControlUtil.java      | 202 +++--
 .../hbase/security/access/GlobalPermission.java |  67 ++
 .../security/access/NamespacePermission.java    | 121 +++
 .../hbase/security/access/Permission.java       | 166 ++--
 .../access/ShadedAccessControlUtil.java         | 136 ++--
 .../hbase/security/access/TablePermission.java  | 323 +++-----
 .../hbase/security/access/UserPermission.java   | 191 ++---
 .../java/org/apache/hadoop/hbase/AuthUtil.java  |   2 +-
 .../hbase/rsgroup/TestRSGroupsWithACL.java      |   4 +-
 .../hbase/security/access/AccessChecker.java    |  24 +-
 .../security/access/AccessControlFilter.java    |  16 +-
 .../security/access/AccessControlLists.java     | 206 +++--
 .../hbase/security/access/AccessController.java |  86 +-
 .../hbase/security/access/AuthManager.java      | 608 ++++++++++++++
 .../hbase/security/access/TableAuthManager.java | 787 -------------------
 .../security/access/ZKPermissionWatcher.java    |   6 +-
 .../hbase/snapshot/RestoreSnapshotHelper.java   |  14 +-
 .../snapshot/SnapshotDescriptionUtils.java      |   8 +-
 .../security/access/TestAccessController.java   |  56 +-
 .../security/access/TestAccessController2.java  |  12 +-
 .../security/access/TestAccessController3.java  |   2 +-
 .../security/access/TestNamespaceCommands.java  |  18 +-
 .../security/access/TestRpcAccessChecks.java    |   6 +-
 .../security/access/TestTablePermissions.java   | 190 +++--
 .../access/TestZKPermissionWatcher.java         | 104 ++-
 25 files changed, 1635 insertions(+), 1720 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/130057f1/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlUtil.java
index 1b5a70c..b37440c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlUtil.java
@@ -47,7 +47,7 @@ public class AccessControlUtil {
   private AccessControlUtil() {}
 
   /**
-   * Create a request to grant user permissions.
+   * Create a request to grant user table permissions.
    *
    * @param username the short user name who to grant permissions
    * @param tableName optional table name the permissions apply
@@ -88,7 +88,7 @@ public class AccessControlUtil {
   }
 
   /**
-   * Create a request to grant user permissions.
+   * Create a request to grant user namespace permissions.
    *
    * @param username the short user name who to grant permissions
    * @param namespace optional table name the permissions apply
@@ -119,7 +119,7 @@ public class AccessControlUtil {
   }
 
   /**
-   * Create a request to revoke user permissions.
+   * Create a request to revoke user global permissions.
    *
    * @param username the short user name whose permissions to be revoked
    * @param actions the permissions to be revoked
@@ -145,7 +145,7 @@ public class AccessControlUtil {
   }
 
   /**
-   * Create a request to revoke user permissions.
+   * Create a request to revoke user namespace permissions.
    *
    * @param username the short user name whose permissions to be revoked
    * @param namespace optional table name the permissions apply
@@ -176,7 +176,7 @@ public class AccessControlUtil {
   }
 
   /**
-   * Create a request to grant user permissions.
+   * Create a request to grant user global permissions.
    *
    * @param username the short user name who to grant permissions
    * @param actions the permissions to be granted
@@ -240,23 +240,6 @@ public class AccessControlUtil {
     return result;
   }
 
-
-  /**
-   * Converts a Permission proto to a client Permission object.
-   *
-   * @param proto the protobuf Permission
-   * @return the converted Permission
-   */
-  public static Permission toPermission(AccessControlProtos.Permission proto) {
-    if (proto.getType() != AccessControlProtos.Permission.Type.Global) {
-      return toTablePermission(proto);
-    } else {
-      List<Permission.Action> actions = toPermissionActions(
-          proto.getGlobalPermission().getActionList());
-      return new Permission(actions.toArray(new Permission.Action[actions.size()]));
-    }
-  }
-
   /**
    * Converts a TablePermission proto to a client TablePermission object.
    * @param proto the protobuf TablePermission
@@ -282,48 +265,45 @@ public class AccessControlUtil {
   }
 
   /**
-   * Converts a Permission proto to a client TablePermission object.
+   * Converts a Permission proto to a client Permission object.
    * @param proto the protobuf Permission
-   * @return the converted TablePermission
+   * @return the converted Permission
    */
-  public static TablePermission toTablePermission(AccessControlProtos.Permission proto) {
-    if(proto.getType() == AccessControlProtos.Permission.Type.Global) {
+  public static Permission toPermission(AccessControlProtos.Permission proto) {
+    if (proto.getType() == AccessControlProtos.Permission.Type.Global) {
       AccessControlProtos.GlobalPermission perm = proto.getGlobalPermission();
       List<Permission.Action> actions = toPermissionActions(perm.getActionList());
-
-      return new TablePermission(null, null, null,
-          actions.toArray(new Permission.Action[actions.size()]));
+      return new GlobalPermission(actions.toArray(new Permission.Action[actions.size()]));
     }
-    if(proto.getType() == AccessControlProtos.Permission.Type.Namespace) {
+    if (proto.getType() == AccessControlProtos.Permission.Type.Namespace) {
       AccessControlProtos.NamespacePermission perm = proto.getNamespacePermission();
       List<Permission.Action> actions = toPermissionActions(perm.getActionList());
-
-      if(!proto.hasNamespacePermission()) {
+      if (!proto.hasNamespacePermission()) {
         throw new IllegalStateException("Namespace must not be empty in NamespacePermission");
       }
-      String namespace = perm.getNamespaceName().toStringUtf8();
-      return new TablePermission(namespace, actions.toArray(new Permission.Action[actions.size()]));
+      return new NamespacePermission(perm.getNamespaceName().toStringUtf8(),
+        actions.toArray(new Permission.Action[actions.size()]));
     }
-    if(proto.getType() == AccessControlProtos.Permission.Type.Table) {
+    if (proto.getType() == AccessControlProtos.Permission.Type.Table) {
       AccessControlProtos.TablePermission perm = proto.getTablePermission();
       List<Permission.Action> actions = toPermissionActions(perm.getActionList());
-
       byte[] qualifier = null;
       byte[] family = null;
       TableName table = null;
-
       if (!perm.hasTableName()) {
         throw new IllegalStateException("TableName cannot be empty");
       }
       table = ProtobufUtil.toTableName(perm.getTableName());
-
-      if (perm.hasFamily()) family = perm.getFamily().toByteArray();
-      if (perm.hasQualifier()) qualifier = perm.getQualifier().toByteArray();
-
+      if (perm.hasFamily()) {
+        family = perm.getFamily().toByteArray();
+      }
+      if (perm.hasQualifier()) {
+        qualifier = perm.getQualifier().toByteArray();
+      }
       return new TablePermission(table, family, qualifier,
-          actions.toArray(new Permission.Action[actions.size()]));
+        actions.toArray(new Permission.Action[actions.size()]));
     }
-    throw new IllegalStateException("Unrecognize Perm Type: "+proto.getType());
+    throw new IllegalStateException("Unrecognize Perm Type: " + proto.getType());
   }
 
   /**
@@ -334,56 +314,51 @@ public class AccessControlUtil {
    */
   public static AccessControlProtos.Permission toPermission(Permission perm) {
     AccessControlProtos.Permission.Builder ret = AccessControlProtos.Permission.newBuilder();
-    if (perm instanceof TablePermission) {
-      TablePermission tablePerm = (TablePermission)perm;
-      if(tablePerm.hasNamespace()) {
-        ret.setType(AccessControlProtos.Permission.Type.Namespace);
-
-        AccessControlProtos.NamespacePermission.Builder builder =
-            AccessControlProtos.NamespacePermission.newBuilder();
-        builder.setNamespaceName(ByteString.copyFromUtf8(tablePerm.getNamespace()));
-        Permission.Action[] actions = perm.getActions();
-        if (actions != null) {
-          for (Permission.Action a : actions) {
-            builder.addAction(toPermissionAction(a));
-          }
-        }
-        ret.setNamespacePermission(builder);
-        return ret.build();
-      } else if (tablePerm.hasTable()) {
-        ret.setType(AccessControlProtos.Permission.Type.Table);
-
-        AccessControlProtos.TablePermission.Builder builder =
-            AccessControlProtos.TablePermission.newBuilder();
-        builder.setTableName(ProtobufUtil.toProtoTableName(tablePerm.getTableName()));
-        if (tablePerm.hasFamily()) {
-          builder.setFamily(ByteStringer.wrap(tablePerm.getFamily()));
-        }
-        if (tablePerm.hasQualifier()) {
-          builder.setQualifier(ByteStringer.wrap(tablePerm.getQualifier()));
+    if (perm instanceof NamespacePermission) {
+      NamespacePermission namespace = (NamespacePermission) perm;
+      ret.setType(AccessControlProtos.Permission.Type.Namespace);
+      AccessControlProtos.NamespacePermission.Builder builder =
+        AccessControlProtos.NamespacePermission.newBuilder();
+      builder.setNamespaceName(ByteString.copyFromUtf8(namespace.getNamespace()));
+      Permission.Action[] actions = perm.getActions();
+      if (actions != null) {
+        for (Permission.Action a : actions) {
+          builder.addAction(toPermissionAction(a));
         }
-        Permission.Action actions[] = perm.getActions();
-        if (actions != null) {
-          for (Permission.Action a : actions) {
-            builder.addAction(toPermissionAction(a));
-          }
+      }
+      ret.setNamespacePermission(builder);
+    } else if (perm instanceof TablePermission) {
+      TablePermission table = (TablePermission) perm;
+      ret.setType(AccessControlProtos.Permission.Type.Table);
+      AccessControlProtos.TablePermission.Builder builder =
+        AccessControlProtos.TablePermission.newBuilder();
+      builder.setTableName(ProtobufUtil.toProtoTableName(table.getTableName()));
+      if (table.hasFamily()) {
+        builder.setFamily(ByteStringer.wrap(table.getFamily()));
+      }
+      if (table.hasQualifier()) {
+        builder.setQualifier(ByteStringer.wrap(table.getQualifier()));
+      }
+      Permission.Action[] actions = perm.getActions();
+      if (actions != null) {
+        for (Permission.Action a : actions) {
+          builder.addAction(toPermissionAction(a));
         }
-        ret.setTablePermission(builder);
-        return ret.build();
       }
-    }
-
-    ret.setType(AccessControlProtos.Permission.Type.Global);
-
-    AccessControlProtos.GlobalPermission.Builder builder =
+      ret.setTablePermission(builder);
+    } else {
+      // perm instanceof GlobalPermission
+      ret.setType(AccessControlProtos.Permission.Type.Global);
+      AccessControlProtos.GlobalPermission.Builder builder =
         AccessControlProtos.GlobalPermission.newBuilder();
-    Permission.Action actions[] = perm.getActions();
-    if (actions != null) {
-      for (Permission.Action a: actions) {
-        builder.addAction(toPermissionAction(a));
+      Permission.Action[] actions = perm.getActions();
+      if (actions != null) {
+        for (Permission.Action a: actions) {
+          builder.addAction(toPermissionAction(a));
+        }
       }
+      ret.setGlobalPermission(builder);
     }
-    ret.setGlobalPermission(builder);
     return ret.build();
   }
 
@@ -456,8 +431,8 @@ public class AccessControlUtil {
    */
   public static AccessControlProtos.UserPermission toUserPermission(UserPermission perm) {
     return AccessControlProtos.UserPermission.newBuilder()
-        .setUser(ByteStringer.wrap(perm.getUser()))
-        .setPermission(toPermission(perm))
+        .setUser(ByteString.copyFromUtf8(perm.getUser()))
+        .setPermission(toPermission(perm.getPermission()))
         .build();
   }
 
@@ -480,8 +455,7 @@ public class AccessControlUtil {
    * @return the converted UserPermission
    */
   public static UserPermission toUserPermission(AccessControlProtos.UserPermission proto) {
-    return new UserPermission(proto.getUser().toByteArray(),
-        toTablePermission(proto.getPermission()));
+    return new UserPermission(proto.getUser().toStringUtf8(), toPermission(proto.getPermission()));
   }
 
   /**
@@ -492,15 +466,15 @@ public class AccessControlUtil {
    * @return the protobuf UserTablePermissions
    */
   public static AccessControlProtos.UsersAndPermissions toUserTablePermissions(
-      ListMultimap<String, TablePermission> perm) {
+      ListMultimap<String, UserPermission> perm) {
     AccessControlProtos.UsersAndPermissions.Builder builder =
         AccessControlProtos.UsersAndPermissions.newBuilder();
-    for (Map.Entry<String, Collection<TablePermission>> entry : perm.asMap().entrySet()) {
+    for (Map.Entry<String, Collection<UserPermission>> entry : perm.asMap().entrySet()) {
       AccessControlProtos.UsersAndPermissions.UserPermissions.Builder userPermBuilder =
           AccessControlProtos.UsersAndPermissions.UserPermissions.newBuilder();
       userPermBuilder.setUser(ByteString.copyFromUtf8(entry.getKey()));
-      for (TablePermission tablePerm: entry.getValue()) {
-        userPermBuilder.addPermissions(toPermission(tablePerm));
+      for (UserPermission userPerm: entry.getValue()) {
+        userPermBuilder.addPermissions(toPermission(userPerm.getPermission()));
       }
       builder.addUserPermissions(userPermBuilder.build());
     }
@@ -844,28 +818,46 @@ public class AccessControlUtil {
   }
 
   /**
-   * Convert a protobuf UserTablePermissions to a
-   * ListMultimap&lt;String, TablePermission&gt; where key is username.
-   *
-   * @param proto the protobuf UserPermission
-   * @return the converted UserPermission
+   * Convert a protobuf UserTablePermissions to a ListMultimap&lt;Username, UserPermission&gt
+   * @param proto the proto UsersAndPermissions
+   * @return a ListMultimap with user and its permissions
+   */
+  public static ListMultimap<String, UserPermission> toUserPermission(
+      AccessControlProtos.UsersAndPermissions proto) {
+    ListMultimap<String, UserPermission> userPermission = ArrayListMultimap.create();
+    AccessControlProtos.UsersAndPermissions.UserPermissions userPerm;
+    for (int i = 0; i < proto.getUserPermissionsCount(); i++) {
+      userPerm = proto.getUserPermissions(i);
+      String username = userPerm.getUser().toStringUtf8();
+      for (int j = 0; j < userPerm.getPermissionsCount(); j++) {
+        userPermission.put(username,
+          new UserPermission(username, toPermission(userPerm.getPermissions(j))));
+      }
+    }
+    return userPermission;
+  }
+
+  /**
+   * Convert a protobuf UserTablePermissions to a ListMultimap&lt;Username, Permission&gt
+   * @param proto the proto UsersAndPermissions
+   * @return a ListMultimap with user and its permissions
    */
-  public static ListMultimap<String, TablePermission> toUserTablePermissions(
+  public static ListMultimap<String, Permission> toPermission(
       AccessControlProtos.UsersAndPermissions proto) {
-    ListMultimap<String, TablePermission> perms = ArrayListMultimap.create();
+    ListMultimap<String, Permission> perms = ArrayListMultimap.create();
     AccessControlProtos.UsersAndPermissions.UserPermissions userPerm;
     for (int i = 0; i < proto.getUserPermissionsCount(); i++) {
       userPerm = proto.getUserPermissions(i);
+      String username = userPerm.getUser().toStringUtf8();
       for (int j = 0; j < userPerm.getPermissionsCount(); j++) {
-        TablePermission tablePerm = toTablePermission(userPerm.getPermissions(j));
-        perms.put(userPerm.getUser().toStringUtf8(), tablePerm);
+        perms.put(username, toPermission(userPerm.getPermissions(j)));
       }
     }
     return perms;
   }
 
   /**
-   * Create a request to revoke user permissions.
+   * Create a request to revoke user table permissions.
    *
    * @param username the short user name whose permissions to be revoked
    * @param tableName optional table name the permissions apply

http://git-wip-us.apache.org/repos/asf/hbase/blob/130057f1/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/GlobalPermission.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/GlobalPermission.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/GlobalPermission.java
new file mode 100644
index 0000000..b29317a
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/GlobalPermission.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.access;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Represents an authorization for access whole cluster.
+ */
+@InterfaceAudience.Private
+public class GlobalPermission extends Permission {
+
+  /** Default constructor for Writable, do not use */
+  public GlobalPermission() {
+    super();
+    this.scope = Scope.EMPTY;
+  }
+
+  /**
+   * Construct a global permission.
+   * @param assigned assigned actions
+   */
+  GlobalPermission(Action... assigned) {
+    super(assigned);
+    this.scope = Scope.GLOBAL;
+  }
+
+  /**
+   * Construct a global permission.
+   * @param actionCode assigned actions
+   */
+  GlobalPermission(byte[] actionCode) {
+    super(actionCode);
+    this.scope = Scope.GLOBAL;
+  }
+
+  @Override
+  public int hashCode() {
+    return super.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    return obj instanceof GlobalPermission && super.equals(obj);
+  }
+
+  @Override
+  public String toString() {
+    return "[GlobalPermission: " + rawExpression() + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/130057f1/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/NamespacePermission.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/NamespacePermission.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/NamespacePermission.java
new file mode 100644
index 0000000..c7ede96
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/NamespacePermission.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.access;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Objects;
+
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Represents an authorization for access for the given namespace.
+ */
+@InterfaceAudience.Private
+public class NamespacePermission extends Permission {
+
+  private String namespace = NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR;
+
+  /** Default constructor for Writable, do not use */
+  public NamespacePermission() {
+    super();
+    this.scope = Scope.EMPTY;
+  }
+
+  /**
+   * Construct a namespace permission.
+   * @param namespace namespace's name
+   * @param assigned assigned actions
+   */
+  public NamespacePermission(String namespace, Action... assigned) {
+    super(assigned);
+    this.namespace = namespace;
+    this.scope = Scope.NAMESPACE;
+  }
+
+  /**
+   * Construct a namespace permission.
+   * @param namespace namespace's name
+   * @param actionCode assigned actions
+   */
+  public NamespacePermission(String namespace, byte[] actionCode) {
+    super(actionCode);
+    this.namespace = namespace;
+    this.scope = Scope.NAMESPACE;
+  }
+
+  public String getNamespace() {
+    return namespace;
+  }
+
+  /**
+   * check if given action is granted in given namespace.
+   * @param namespace namespace's name
+   * @param action action to be checked
+   * @return true if granted, false otherwise
+   */
+  public boolean implies(String namespace, Action action) {
+    return namespace.equals(this.namespace) && implies(action);
+  }
+
+  @Override
+  public boolean equalsExceptActions(Object obj) {
+    if (!(obj instanceof NamespacePermission)) {
+      return false;
+    }
+    NamespacePermission gp = (NamespacePermission) obj;
+    return namespace.equals(gp.namespace);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(namespace) + super.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    return equalsExceptActions(obj) && super.equals(obj);
+  }
+
+  @Override
+  public String toString() {
+    return "[NamespacePermission: " + rawExpression() + "]";
+  }
+
+  @Override
+  protected String rawExpression() {
+    StringBuilder raw = new StringBuilder("namespace=").append(namespace).append(", ");
+    return raw.toString() + super.rawExpression();
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    namespace = Bytes.toString(Bytes.readByteArray(in));
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    Bytes.writeByteArray(out, Bytes.toBytes(namespace));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/130057f1/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/Permission.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/Permission.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/Permission.java
index 1e9e60c..d448d3a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/Permission.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/Permission.java
@@ -22,6 +22,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.EnumSet;
 import java.util.Map;
 
 import org.apache.yetus.audience.InterfaceAudience;
@@ -30,7 +31,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.VersionedWritable;
 
-import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
 
 /**
  * Base permissions instance representing the ability to perform a given set
@@ -48,21 +49,49 @@ public class Permission extends VersionedWritable {
 
     private final byte code;
     Action(char code) {
-      this.code = (byte)code;
+      this.code = (byte) code;
     }
 
     public byte code() { return code; }
   }
 
+  @InterfaceAudience.Private
+  protected enum Scope {
+    GLOBAL('G'), NAMESPACE('N'), TABLE('T'), EMPTY('E');
+
+    private final byte code;
+    Scope(char code) {
+      this.code = (byte) code;
+    }
+
+    public byte code() {
+      return code;
+    }
+  }
+
   private static final Logger LOG = LoggerFactory.getLogger(Permission.class);
-  protected static final Map<Byte,Action> ACTION_BY_CODE = Maps.newHashMap();
 
-  protected Action[] actions;
+  protected static final Map<Byte, Action> ACTION_BY_CODE;
+  protected static final Map<Byte, Scope> SCOPE_BY_CODE;
+
+  protected EnumSet<Action> actions = EnumSet.noneOf(Action.class);
+  protected Scope scope = Scope.EMPTY;
 
   static {
-    for (Action a : Action.values()) {
-      ACTION_BY_CODE.put(a.code(), a);
-    }
+    ACTION_BY_CODE = ImmutableMap.of(
+      Action.READ.code, Action.READ,
+      Action.WRITE.code, Action.WRITE,
+      Action.EXEC.code, Action.EXEC,
+      Action.CREATE.code, Action.CREATE,
+      Action.ADMIN.code, Action.ADMIN
+    );
+
+    SCOPE_BY_CODE = ImmutableMap.of(
+      Scope.GLOBAL.code, Scope.GLOBAL,
+      Scope.NAMESPACE.code, Scope.NAMESPACE,
+      Scope.TABLE.code, Scope.TABLE,
+      Scope.EMPTY.code, Scope.EMPTY
+    );
   }
 
   /** Empty constructor for Writable implementation.  <b>Do not use.</b> */
@@ -72,75 +101,72 @@ public class Permission extends VersionedWritable {
 
   public Permission(Action... assigned) {
     if (assigned != null && assigned.length > 0) {
-      actions = Arrays.copyOf(assigned, assigned.length);
+      actions.addAll(Arrays.asList(assigned));
     }
   }
 
   public Permission(byte[] actionCodes) {
     if (actionCodes != null) {
-      Action acts[] = new Action[actionCodes.length];
-      int j = 0;
-      for (int i=0; i<actionCodes.length; i++) {
-        byte b = actionCodes[i];
-        Action a = ACTION_BY_CODE.get(b);
-        if (a == null) {
-          LOG.error("Ignoring unknown action code '"+
-              Bytes.toStringBinary(new byte[]{b})+"'");
+      for (byte code : actionCodes) {
+        Action action = ACTION_BY_CODE.get(code);
+        if (action == null) {
+          LOG.error("Ignoring unknown action code '" +
+            Bytes.toStringBinary(new byte[] { code }) + "'");
           continue;
         }
-        acts[j++] = a;
+        actions.add(action);
       }
-      this.actions = Arrays.copyOf(acts, j);
     }
   }
 
   public Action[] getActions() {
-    return actions;
+    return actions.toArray(new Action[actions.size()]);
   }
 
+  /**
+   * check if given action is granted
+   * @param action action to be checked
+   * @return true if granted, false otherwise
+   */
   public boolean implies(Action action) {
-    if (this.actions != null) {
-      for (Action a : this.actions) {
-        if (a == action) {
-          return true;
-        }
-      }
-    }
-
-    return false;
+    return actions.contains(action);
   }
 
   public void setActions(Action[] assigned) {
     if (assigned != null && assigned.length > 0) {
-      actions = Arrays.copyOf(assigned, assigned.length);
+      // setActions should cover the previous actions,
+      // so we call clear here.
+      actions.clear();
+      actions.addAll(Arrays.asList(assigned));
     }
   }
 
+  /**
+   * Check if two permission equals regardless of actions. It is useful when
+   * merging a new permission with an existed permission which needs to check two permissions's
+   * fields.
+   * @param obj instance
+   * @return true if equals, false otherwise
+   */
+  public boolean equalsExceptActions(Object obj) {
+    return obj instanceof Permission;
+  }
+
   @Override
   public boolean equals(Object obj) {
     if (!(obj instanceof Permission)) {
       return false;
     }
-    Permission other = (Permission)obj;
-    // check actions
-    if (actions == null && other.getActions() == null) {
-      return true;
-    } else if (actions != null && other.getActions() != null) {
-      Action[] otherActions = other.getActions();
-      if (actions.length != otherActions.length) {
-        return false;
-      }
 
-      outer:
-      for (Action a : actions) {
-        for (Action oa : otherActions) {
-          if (a == oa) continue outer;
-        }
+    Permission other = (Permission) obj;
+    if (actions.isEmpty() && other.actions.isEmpty()) {
+      return true;
+    } else if (!actions.isEmpty() && !other.actions.isEmpty()) {
+      if (actions.size() != other.actions.size()) {
         return false;
       }
-      return true;
+      return actions.containsAll(other.actions);
     }
-
     return false;
   }
 
@@ -151,26 +177,28 @@ public class Permission extends VersionedWritable {
     for (Action a : actions) {
       result = prime * result + a.code();
     }
+    result = prime * result + scope.code();
     return result;
   }
 
   @Override
   public String toString() {
-    StringBuilder str = new StringBuilder("[Permission: ")
-        .append("actions=");
+    return "[Permission: " + rawExpression() + "]";
+  }
+
+  protected String rawExpression() {
+    StringBuilder raw = new StringBuilder("actions=");
     if (actions != null) {
-      for (int i=0; i<actions.length; i++) {
-        if (i > 0)
-          str.append(",");
-        if (actions[i] != null)
-          str.append(actions[i].toString());
-        else
-          str.append("NULL");
+      int i = 0;
+      for (Action action : actions) {
+        if (i > 0) {
+          raw.append(",");
+        }
+        raw.append(action != null ? action.toString() : "NULL");
+        i++;
       }
     }
-    str.append("]");
-
-    return str.toString();
+    return raw.toString();
   }
 
   /** @return the object version number */
@@ -182,31 +210,35 @@ public class Permission extends VersionedWritable {
   @Override
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
-    int length = (int)in.readByte();
+    int length = (int) in.readByte();
+    actions = EnumSet.noneOf(Action.class);
     if (length > 0) {
-      actions = new Action[length];
       for (int i = 0; i < length; i++) {
         byte b = in.readByte();
-        Action a = ACTION_BY_CODE.get(b);
-        if (a == null) {
-          throw new IOException("Unknown action code '"+
-              Bytes.toStringBinary(new byte[]{b})+"' in input");
+        Action action = ACTION_BY_CODE.get(b);
+        if (action == null) {
+          throw new IOException("Unknown action code '" +
+            Bytes.toStringBinary(new byte[] { b }) + "' in input");
         }
-        this.actions[i] = a;
+        actions.add(action);
       }
-    } else {
-      actions = new Action[0];
     }
+    scope = SCOPE_BY_CODE.get(in.readByte());
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
     super.write(out);
-    out.writeByte(actions != null ? actions.length : 0);
+    out.writeByte(actions != null ? actions.size() : 0);
     if (actions != null) {
       for (Action a: actions) {
         out.writeByte(a.code());
       }
     }
+    out.writeByte(scope.code());
+  }
+
+  public Scope getAccessScope() {
+    return scope;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/130057f1/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/ShadedAccessControlUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/ShadedAccessControlUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/ShadedAccessControlUtil.java
index 5a94805..7e36656 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/ShadedAccessControlUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/ShadedAccessControlUtil.java
@@ -119,14 +119,13 @@ public class ShadedAccessControlUtil {
    * @param proto the protobuf Permission
    * @return the converted TablePermission
    */
-  public static TablePermission toTablePermission(AccessControlProtos.Permission proto) {
+  public static Permission toPermission(AccessControlProtos.Permission proto) {
 
     if (proto.getType() == AccessControlProtos.Permission.Type.Global) {
       AccessControlProtos.GlobalPermission perm = proto.getGlobalPermission();
       List<Action> actions = toPermissionActions(perm.getActionList());
 
-      return new TablePermission(null, null, null,
-          actions.toArray(new Permission.Action[actions.size()]));
+      return new GlobalPermission(actions.toArray(new Permission.Action[actions.size()]));
     }
     if (proto.getType() == AccessControlProtos.Permission.Type.Namespace) {
       AccessControlProtos.NamespacePermission perm = proto.getNamespacePermission();
@@ -135,8 +134,8 @@ public class ShadedAccessControlUtil {
       if (!proto.hasNamespacePermission()) {
         throw new IllegalStateException("Namespace must not be empty in NamespacePermission");
       }
-      String namespace = perm.getNamespaceName().toStringUtf8();
-      return new TablePermission(namespace, actions.toArray(new Permission.Action[actions.size()]));
+      String ns = perm.getNamespaceName().toStringUtf8();
+      return new NamespacePermission(ns, actions.toArray(new Permission.Action[actions.size()]));
     }
     if (proto.getType() == AccessControlProtos.Permission.Type.Table) {
       AccessControlProtos.TablePermission perm = proto.getTablePermission();
@@ -144,12 +143,11 @@ public class ShadedAccessControlUtil {
 
       byte[] qualifier = null;
       byte[] family = null;
-      TableName table = null;
 
       if (!perm.hasTableName()) {
         throw new IllegalStateException("TableName cannot be empty");
       }
-      table = toTableName(perm.getTableName());
+      TableName table = toTableName(perm.getTableName());
 
       if (perm.hasFamily()) family = perm.getFamily().toByteArray();
       if (perm.hasQualifier()) qualifier = perm.getQualifier().toByteArray();
@@ -170,63 +168,58 @@ public class ShadedAccessControlUtil {
     org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.Permission.Builder ret =
         org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.Permission
             .newBuilder();
-    if (perm instanceof TablePermission) {
-      TablePermission tablePerm = (TablePermission) perm;
-      if (tablePerm.hasNamespace()) {
-        ret.setType(
-          org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.Permission.Type.Namespace);
-
-        org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.NamespacePermission.Builder builder =
-            org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.NamespacePermission
-                .newBuilder();
-        builder.setNamespaceName(org.apache.hbase.thirdparty.com.google.protobuf.ByteString
-            .copyFromUtf8(tablePerm.getNamespace()));
-        Permission.Action[] actions = perm.getActions();
-        if (actions != null) {
-          for (Permission.Action a : actions) {
-            builder.addAction(toPermissionAction(a));
-          }
-        }
-        ret.setNamespacePermission(builder);
-        return ret.build();
-      } else if (tablePerm.hasTable()) {
-        ret.setType(
-          org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.Permission.Type.Table);
-
-        org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.TablePermission.Builder builder =
-            org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.TablePermission
-                .newBuilder();
-        builder.setTableName(toProtoTableName(tablePerm.getTableName()));
-        if (tablePerm.hasFamily()) {
-          builder.setFamily(ByteString.copyFrom(tablePerm.getFamily()));
-        }
-        if (tablePerm.hasQualifier()) {
-          builder.setQualifier(ByteString.copyFrom(tablePerm.getQualifier()));
+    if (perm instanceof NamespacePermission) {
+      NamespacePermission nsPerm = (NamespacePermission) perm;
+      ret.setType(
+        org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.Permission.Type.Namespace);
+      org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.NamespacePermission.Builder builder =
+        org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.NamespacePermission
+          .newBuilder();
+      builder.setNamespaceName(org.apache.hbase.thirdparty.com.google.protobuf.ByteString
+        .copyFromUtf8(nsPerm.getNamespace()));
+      Permission.Action[] actions = perm.getActions();
+      if (actions != null) {
+        for (Permission.Action a : actions) {
+          builder.addAction(toPermissionAction(a));
         }
-        Permission.Action actions[] = perm.getActions();
-        if (actions != null) {
-          for (Permission.Action a : actions) {
-            builder.addAction(toPermissionAction(a));
-          }
+      }
+      ret.setNamespacePermission(builder);
+    } else if (perm instanceof TablePermission) {
+      TablePermission tablePerm = (TablePermission) perm;
+      ret.setType(
+        org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.Permission.Type.Table);
+      org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.TablePermission.Builder builder =
+        org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.TablePermission
+          .newBuilder();
+      builder.setTableName(toProtoTableName(tablePerm.getTableName()));
+      if (tablePerm.hasFamily()) {
+        builder.setFamily(ByteString.copyFrom(tablePerm.getFamily()));
+      }
+      if (tablePerm.hasQualifier()) {
+        builder.setQualifier(ByteString.copyFrom(tablePerm.getQualifier()));
+      }
+      Permission.Action[] actions = perm.getActions();
+      if (actions != null) {
+        for (Permission.Action a : actions) {
+          builder.addAction(toPermissionAction(a));
         }
-        ret.setTablePermission(builder);
-        return ret.build();
       }
-    }
-
-    ret.setType(
-      org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.Permission.Type.Global);
-
-    org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GlobalPermission.Builder builder =
+      ret.setTablePermission(builder);
+    } else {
+      // perm.getAccessScope() == Permission.Scope.GLOBAL
+      ret.setType(
+        org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.Permission.Type.Global);
+      org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GlobalPermission.Builder builder =
         org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GlobalPermission
-            .newBuilder();
-    Permission.Action actions[] = perm.getActions();
-    if (actions != null) {
-      for (Permission.Action a : actions) {
-        builder.addAction(toPermissionAction(a));
+          .newBuilder();
+      Permission.Action[] actions = perm.getActions();
+      if (actions != null) {
+        for (Permission.Action a : actions) {
+          builder.addAction(toPermissionAction(a));
+        }
       }
+      ret.setGlobalPermission(builder);
     }
-    ret.setGlobalPermission(builder);
     return ret.build();
   }
 
@@ -236,15 +229,15 @@ public class ShadedAccessControlUtil {
    * @param proto the protobuf UserPermission
    * @return the converted UserPermission
    */
-  public static ListMultimap<String, TablePermission> toUserTablePermissions(
+  public static ListMultimap<String, Permission> toUserTablePermissions(
       org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.UsersAndPermissions proto) {
-    ListMultimap<String, TablePermission> perms = ArrayListMultimap.create();
+    ListMultimap<String, Permission> perms = ArrayListMultimap.create();
     org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.UsersAndPermissions.UserPermissions userPerm;
     for (int i = 0; i < proto.getUserPermissionsCount(); i++) {
       userPerm = proto.getUserPermissions(i);
       for (int j = 0; j < userPerm.getPermissionsCount(); j++) {
-        TablePermission tablePerm = toTablePermission(userPerm.getPermissions(j));
-        perms.put(userPerm.getUser().toStringUtf8(), tablePerm);
+        Permission perm = toPermission(userPerm.getPermissions(j));
+        perms.put(userPerm.getUser().toStringUtf8(), perm);
       }
     }
     return perms;
@@ -258,31 +251,20 @@ public class ShadedAccessControlUtil {
    */
   public static
       org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.UsersAndPermissions
-      toUserTablePermissions(ListMultimap<String, TablePermission> perm) {
+      toUserTablePermissions(ListMultimap<String, UserPermission> perm) {
     org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.UsersAndPermissions.Builder builder =
         org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.UsersAndPermissions
             .newBuilder();
-    for (Map.Entry<String, Collection<TablePermission>> entry : perm.asMap().entrySet()) {
+    for (Map.Entry<String, Collection<UserPermission>> entry : perm.asMap().entrySet()) {
       org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.UsersAndPermissions.UserPermissions.Builder userPermBuilder =
           org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.UsersAndPermissions.UserPermissions
               .newBuilder();
       userPermBuilder.setUser(ByteString.copyFromUtf8(entry.getKey()));
-      for (TablePermission tablePerm : entry.getValue()) {
-        userPermBuilder.addPermissions(toPermission(tablePerm));
+      for (UserPermission userPerm : entry.getValue()) {
+        userPermBuilder.addPermissions(toPermission(userPerm.getPermission()));
       }
       builder.addUserPermissions(userPermBuilder.build());
     }
     return builder.build();
   }
-
-  /**
-   * Converts a user permission proto to a client user permission object.
-   *
-   * @param proto the protobuf UserPermission
-   * @return the converted UserPermission
-   */
-  public static UserPermission toUserPermission(org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.UserPermission proto) {
-    return new UserPermission(proto.getUser().toByteArray(),
-        toTablePermission(proto.getPermission()));
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/130057f1/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/TablePermission.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/TablePermission.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/TablePermission.java
index dd0e71d..36ed8e4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/TablePermission.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/TablePermission.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 /**
  * Represents an authorization for access for the given actions, optionally
  * restricted to the given column family or column qualifier, over the
- * given table.  If the family property is <code>null</code>, it implies
+ * given table. If the family property is <code>null</code>, it implies
  * full table access.
  */
 @InterfaceAudience.Private
@@ -41,114 +41,78 @@ public class TablePermission extends Permission {
   private byte[] family;
   private byte[] qualifier;
 
-  //TODO refactor this class
-  //we need to refacting this into three classes (Global, Table, Namespace)
-  private String namespace;
-
   /** Nullary constructor for Writable, do not use */
   public TablePermission() {
     super();
+    this.scope = Scope.EMPTY;
+  }
+
+  /**
+   * Construct a table permission.
+   * @param table table name
+   * @param assigned assigned actions
+   */
+  public TablePermission(TableName table, Action... assigned) {
+    this(table, null, null, assigned);
   }
 
   /**
-   * Create a new permission for the given table and (optionally) column family,
-   * allowing the given actions.
-   * @param table the table
-   * @param family the family, can be null if a global permission on the table
-   * @param assigned the list of allowed actions
+   * Construct a table:family permission.
+   * @param table table name
+   * @param family family name
+   * @param assigned assigned actions
    */
   public TablePermission(TableName table, byte[] family, Action... assigned) {
     this(table, family, null, assigned);
   }
 
   /**
-   * Creates a new permission for the given table, restricted to the given
-   * column family and qualifier, allowing the assigned actions to be performed.
-   * @param table the table
-   * @param family the family, can be null if a global permission on the table
-   * @param assigned the list of allowed actions
+   * Construct a table:family:qualifier permission.
+   * @param table table name
+   * @param family family name
+   * @param qualifier qualifier name
+   * @param assigned assigned actions
    */
-  public TablePermission(TableName table, byte[] family, byte[] qualifier,
-      Action... assigned) {
+  public TablePermission(TableName table, byte[] family, byte[] qualifier, Action... assigned) {
     super(assigned);
     this.table = table;
     this.family = family;
     this.qualifier = qualifier;
+    this.scope = Scope.TABLE;
   }
 
   /**
-   * Creates a new permission for the given table, family and column qualifier,
-   * allowing the actions matching the provided byte codes to be performed.
-   * @param table the table
-   * @param family the family, can be null if a global permission on the table
-   * @param actionCodes the list of allowed action codes
+   * Construct a table permission.
+   * @param table table name
+   * @param actionCodes assigned actions
    */
-  public TablePermission(TableName table, byte[] family, byte[] qualifier,
-      byte[] actionCodes) {
-    super(actionCodes);
-    this.table = table;
-    this.family = family;
-    this.qualifier = qualifier;
+  public TablePermission(TableName table, byte[] actionCodes) {
+    this(table, null, null, actionCodes);
   }
 
   /**
-   * Creates a new permission for the given namespace or table, restricted to the given
-   * column family and qualifier, allowing the assigned actions to be performed.
-   * @param namespace
-   * @param table the table
-   * @param family the family, can be null if a global permission on the table
-   * @param assigned the list of allowed actions
+   * Construct a table:family permission.
+   * @param table table name
+   * @param family family name
+   * @param actionCodes assigned actions
    */
-  public TablePermission(String namespace, TableName table, byte[] family, byte[] qualifier,
-      Action... assigned) {
-    super(assigned);
-    this.namespace = namespace;
-    this.table = table;
-    this.family = family;
-    this.qualifier = qualifier;
+  public TablePermission(TableName table, byte[] family, byte[] actionCodes) {
+    this(table, family, null, actionCodes);
   }
 
   /**
-   * Creates a new permission for the given namespace or table, family and column qualifier,
-   * allowing the actions matching the provided byte codes to be performed.
-   * @param namespace
-   * @param table the table
-   * @param family the family, can be null if a global permission on the table
-   * @param actionCodes the list of allowed action codes
+   * Construct a table:family:qualifier permission.
+   * @param table table name
+   * @param family family name
+   * @param qualifier qualifier name
+   * @param actionCodes assigned actions
    */
-  public TablePermission(String namespace, TableName table, byte[] family, byte[] qualifier,
-      byte[] actionCodes) {
+  public TablePermission(TableName table, byte[] family, byte[] qualifier, byte[] actionCodes) {
     super(actionCodes);
-    this.namespace = namespace;
     this.table = table;
     this.family = family;
     this.qualifier = qualifier;
-  }
-
-  /**
-   * Creates a new permission for the given namespace,
-   * allowing the actions matching the provided byte codes to be performed.
-   * @param namespace
-   * @param actionCodes the list of allowed action codes
-   */
-  public TablePermission(String namespace, byte[] actionCodes) {
-    super(actionCodes);
-    this.namespace = namespace;
-  }
-
-  /**
-   * Create a new permission for the given namespace,
-   * allowing the given actions.
-   * @param namespace
-   * @param assigned the list of allowed actions
-   */
-  public TablePermission(String namespace, Action... assigned) {
-    super(assigned);
-    this.namespace = namespace;
-  }
-
-  public boolean hasTable() {
-    return table != null;
+    this.scope = Scope.TABLE;
   }
 
   public TableName getTableName() {
@@ -175,65 +139,58 @@ public class TablePermission extends Permission {
     return qualifier;
   }
 
-  public boolean hasNamespace() {
-    return namespace != null;
-  }
-
   public String getNamespace() {
-    return namespace;
+    return table.getNamespaceAsString();
   }
 
   /**
-   * Checks that a given table operation is authorized by this permission
-   * instance.
-   *
-   * @param namespace the namespace where the operation is being performed
-   * @param action the action being requested
-   * @return <code>true</code> if the action within the given scope is allowed
-   *   by this permission, <code>false</code>
+   * Check if given action can performs on given table:family:qualifier.
+   * @param table table name
+   * @param family family name
+   * @param qualifier qualifier name
+   * @param action one of [Read, Write, Create, Exec, Admin]
+   * @return true if can, false otherwise
    */
-  public boolean implies(String namespace, Action action) {
-    if (this.namespace == null || !this.namespace.equals(namespace)) {
+  public boolean implies(TableName table, byte[] family, byte[] qualifier, Action action) {
+    if (failCheckTable(table)) {
       return false;
     }
-
-    // check actions
-    return super.implies(action);
+    if (failCheckFamily(family)) {
+      return false;
+    }
+    if (failCheckQualifier(qualifier)) {
+      return false;
+    }
+    return implies(action);
   }
 
   /**
-   * Checks that a given table operation is authorized by this permission
-   * instance.
-   *
-   * @param table the table where the operation is being performed
-   * @param family the column family to which the operation is restricted,
-   *   if <code>null</code> implies "all"
-   * @param qualifier the column qualifier to which the action is restricted,
-   *   if <code>null</code> implies "all"
-   * @param action the action being requested
-   * @return <code>true</code> if the action within the given scope is allowed
-   *   by this permission, <code>false</code>
+   * Check if given action can performs on given table:family.
+   * @param table table name
+   * @param family family name
+   * @param action one of [Read, Write, Create, Exec, Admin]
+   * @return true if can, false otherwise
    */
-  public boolean implies(TableName table, byte[] family, byte[] qualifier,
-      Action action) {
-    if (this.table == null || !this.table.equals(table)) {
+  public boolean implies(TableName table, byte[] family, Action action) {
+    if (failCheckTable(table)) {
       return false;
     }
-
-    if (this.family != null &&
-        (family == null ||
-         !Bytes.equals(this.family, family))) {
+    if (failCheckFamily(family)) {
       return false;
     }
+    return implies(action);
+  }
 
-    if (this.qualifier != null &&
-        (qualifier == null ||
-         !Bytes.equals(this.qualifier, qualifier))) {
-      return false;
-    }
+  private boolean failCheckTable(TableName table) {
+    return this.table == null || !this.table.equals(table);
+  }
 
-    // check actions
-    return super.implies(action);
+  private boolean failCheckFamily(byte[] family) {
+    return this.family != null && (family == null || !Bytes.equals(this.family, family));
+  }
+
+  private boolean failCheckQualifier(byte[] qual) {
+    return this.qualifier != null && (qual == null || !Bytes.equals(this.qualifier, qual));
   }
 
   /**
@@ -246,7 +203,7 @@ public class TablePermission extends Permission {
    *   by this permission, otherwise <code>false</code>
    */
   public boolean implies(TableName table, KeyValue kv, Action action) {
-    if (this.table == null || !this.table.equals(table)) {
+    if (failCheckTable(table)) {
       return false;
     }
 
@@ -263,82 +220,34 @@ public class TablePermission extends Permission {
   }
 
   /**
-   * Returns <code>true</code> if this permission matches the given column
-   * family at least.  This only indicates a partial match against the table
-   * and column family, however, and does not guarantee that implies() for the
-   * column same family would return <code>true</code>.  In the case of a
-   * column-qualifier specific permission, for example, implies() would still
-   * return false.
+   * Check if fields of table in table permission equals.
+   * @param tp to be checked table permission
+   * @return true if equals, false otherwise
    */
-  public boolean matchesFamily(TableName table, byte[] family, Action action) {
-    if (this.table == null || !this.table.equals(table)) {
+  public boolean tableFieldsEqual(TablePermission tp) {
+    if (tp == null) {
       return false;
     }
 
-    if (this.family != null &&
-        (family == null ||
-         !Bytes.equals(this.family, family))) {
-      return false;
-    }
-
-    // ignore qualifier
-    // check actions
-    return super.implies(action);
+    boolean tEq = (table == null && tp.table == null) || (table != null && table.equals(tp.table));
+    boolean fEq = (family == null && tp.family == null) || Bytes.equals(family, tp.family);
+    boolean qEq = (qualifier == null && tp.qualifier == null) ||
+                   Bytes.equals(qualifier, tp.qualifier);
+    return tEq && fEq && qEq;
   }
 
-  /**
-   * Returns if the given permission matches the given qualifier.
-   * @param table the table name to match
-   * @param family the column family to match
-   * @param qualifier the qualifier name to match
-   * @param action the action requested
-   * @return <code>true</code> if the table, family and qualifier match,
-   *   otherwise <code>false</code>
-   */
-  public boolean matchesFamilyQualifier(TableName table, byte[] family, byte[] qualifier,
-                                Action action) {
-    if (!matchesFamily(table, family, action)) {
-      return false;
-    } else {
-      if (this.qualifier != null &&
-          (qualifier == null ||
-           !Bytes.equals(this.qualifier, qualifier))) {
-        return false;
-      }
-    }
-    return super.implies(action);
-  }
-
-  public boolean tableFieldsEqual(TablePermission other){
-    if (!(((table == null && other.getTableName() == null) ||
-           (table != null && table.equals(other.getTableName()))) &&
-         ((family == null && other.getFamily() == null) ||
-           Bytes.equals(family, other.getFamily())) &&
-         ((qualifier == null && other.getQualifier() == null) ||
-          Bytes.equals(qualifier, other.getQualifier())) &&
-         ((namespace == null && other.getNamespace() == null) ||
-          (namespace != null && namespace.equals(other.getNamespace())))
-    )) {
+  @Override
+  public boolean equalsExceptActions(Object obj) {
+    if (!(obj instanceof TablePermission)) {
       return false;
     }
-    return true;
+    TablePermission other = (TablePermission) obj;
+    return tableFieldsEqual(other);
   }
 
   @Override
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
-    justification="Passed on construction except on constructor not to be used")
   public boolean equals(Object obj) {
-    if (!(obj instanceof TablePermission)) {
-      return false;
-    }
-    TablePermission other = (TablePermission)obj;
-
-    if(!this.tableFieldsEqual(other)){
-      return false;
-    }
-
-    // check actions
-    return super.equals(other);
+    return equalsExceptActions(obj) && super.equals(obj);
   }
 
   @Override
@@ -354,41 +263,24 @@ public class TablePermission extends Permission {
     if (qualifier != null) {
       result = prime * result + Bytes.hashCode(qualifier);
     }
-    if (namespace != null) {
-      result = prime * result + namespace.hashCode();
-    }
     return result;
   }
 
   @Override
   public String toString() {
-    StringBuilder str = new StringBuilder("[TablePermission: ");
-    if(namespace != null) {
-      str.append("namespace=").append(namespace)
+    return "[TablePermission: " + rawExpression() + "]";
+  }
+
+  @Override
+  protected String rawExpression() {
+    StringBuilder raw = new StringBuilder();
+    if (table != null) {
+      raw.append("table=").append(table)
+         .append(", family=").append(family == null ? null : Bytes.toString(family))
+         .append(", qualifier=").append(qualifier == null ? null : Bytes.toString(qualifier))
          .append(", ");
     }
-    if(table != null) {
-       str.append("table=").append(table)
-          .append(", family=")
-          .append(family == null ? null : Bytes.toString(family))
-          .append(", qualifier=")
-          .append(qualifier == null ? null : Bytes.toString(qualifier))
-          .append(", ");
-    }
-    if (actions != null) {
-      str.append("actions=");
-      for (int i=0; i<actions.length; i++) {
-        if (i > 0)
-          str.append(",");
-        if (actions[i] != null)
-          str.append(actions[i].toString());
-        else
-          str.append("NULL");
-      }
-    }
-    str.append("]");
-
-    return str.toString();
+    return raw.toString() + super.rawExpression();
   }
 
   @Override
@@ -404,16 +296,13 @@ public class TablePermission extends Permission {
     if (in.readBoolean()) {
       qualifier = Bytes.readByteArray(in);
     }
-    if(in.readBoolean()) {
-      namespace = Bytes.toString(Bytes.readByteArray(in));
-    }
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
     super.write(out);
     // Explicitly writing null to maintain se/deserialize backward compatibility.
-    Bytes.writeByteArray(out, (table == null) ? null : table.getName());
+    Bytes.writeByteArray(out, table == null ? null : table.getName());
     out.writeBoolean(family != null);
     if (family != null) {
       Bytes.writeByteArray(out, family);
@@ -422,9 +311,5 @@ public class TablePermission extends Permission {
     if (qualifier != null) {
       Bytes.writeByteArray(out, qualifier);
     }
-    out.writeBoolean(namespace != null);
-    if(namespace != null) {
-      Bytes.writeByteArray(out, Bytes.toBytes(namespace));
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/130057f1/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/UserPermission.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/UserPermission.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/UserPermission.java
index 72bd69f..2a9a109 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/UserPermission.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/UserPermission.java
@@ -18,167 +18,152 @@
 
 package org.apache.hadoop.hbase.security.access;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
+import java.util.Objects;
 
 import org.apache.hadoop.hbase.TableName;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.util.Bytes;
 
 /**
- * Represents an authorization for access over the given table, column family
- * plus qualifier, for the given user.
+ * UserPermission consists of a user name and a permission.
+ * Permission can be one of [Global, Namespace, Table] permission.
  */
 @InterfaceAudience.Private
-public class UserPermission extends TablePermission {
-  private static final Logger LOG = LoggerFactory.getLogger(UserPermission.class);
+public class UserPermission {
 
-  private byte[] user;
+  private String user;
+  private Permission permission;
 
-  /** Nullary constructor for Writable, do not use */
-  public UserPermission() {
-    super();
+  /**
+   * Construct a global user permission.
+   * @param user user name
+   * @param assigned assigned actions
+   */
+  public UserPermission(String user, Permission.Action... assigned) {
+    this.user = user;
+    this.permission = new GlobalPermission(assigned);
   }
 
   /**
-   * Creates a new instance for the given user.
-   * @param user the user
-   * @param assigned the list of allowed actions
+   * Construct a global user permission.
+   * @param user user name
+   * @param actionCode action codes
    */
-  public UserPermission(byte[] user, Action... assigned) {
-    super(null, null, null, assigned);
+  public UserPermission(String user, byte[] actionCode) {
     this.user = user;
+    this.permission = new GlobalPermission(actionCode);
   }
 
   /**
-   * Creates a new instance for the given user,
-   * matching the actions with the given codes.
-   * @param user the user
-   * @param actionCodes the list of allowed action codes
+   * Construct a namespace user permission.
+   * @param user user name
+   * @param namespace namespace
+   * @param assigned assigned actions
    */
-  public UserPermission(byte[] user, byte[] actionCodes) {
-    super(null, null, null, actionCodes);
+  public UserPermission(String user, String namespace, Permission.Action... assigned) {
     this.user = user;
+    this.permission = new NamespacePermission(namespace, assigned);
   }
 
   /**
-   * Creates a new instance for the given user.
-   * @param user the user
-   * @param namespace
-   * @param assigned the list of allowed actions
+   * Construct a table user permission.
+   * @param user user name
+   * @param tableName table name
+   * @param assigned assigned actions
    */
-  public UserPermission(byte[] user, String namespace, Action... assigned) {
-    super(namespace, assigned);
+  public UserPermission(String user, TableName tableName, Permission.Action... assigned) {
     this.user = user;
+    this.permission = new TablePermission(tableName, assigned);
   }
 
   /**
-   * Creates a new instance for the given user,
-   * matching the actions with the given codes.
-   * @param user the user
-   * @param namespace
-   * @param actionCodes the list of allowed action codes
+   * Construct a table:family user permission.
+   * @param user user name
+   * @param tableName table name
+   * @param family family name of table
+   * @param assigned assigned actions
    */
-  public UserPermission(byte[] user, String namespace, byte[] actionCodes) {
-    super(namespace, actionCodes);
-    this.user = user;
+  public UserPermission(String user, TableName tableName, byte[] family,
+    Permission.Action... assigned) {
+    this(user, tableName, family, null, assigned);
   }
 
   /**
-   * Creates a new instance for the given user, table and column family.
-   * @param user the user
-   * @param table the table
-   * @param family the family, can be null if action is allowed over the entire
-   *   table
-   * @param assigned the list of allowed actions
+   * Construct a table:family:qualifier user permission.
+   * @param user user name
+   * @param tableName table name
+   * @param family family name of table
+   * @param qualifier qualifier name of table
+   * @param assigned assigned actions
    */
-  public UserPermission(byte[] user, TableName table, byte[] family,
-                        Action... assigned) {
-    super(table, family, assigned);
+  public UserPermission(String user, TableName tableName, byte[] family, byte[] qualifier,
+      Permission.Action... assigned) {
     this.user = user;
+    this.permission = new TablePermission(tableName, family, qualifier, assigned);
   }
 
   /**
-   * Creates a new permission for the given user, table, column family and
-   * column qualifier.
-   * @param user the user
-   * @param table the table
-   * @param family the family, can be null if action is allowed over the entire
-   *   table
-   * @param qualifier the column qualifier, can be null if action is allowed
-   *   over the entire column family
-   * @param assigned the list of allowed actions
+   * Construct a table:family:qualifier user permission.
+   * @param user user name
+   * @param tableName table name
+   * @param family family name of table
+   * @param qualifier qualifier name of table
+   * @param actionCodes assigned actions
    */
-  public UserPermission(byte[] user, TableName table, byte[] family,
-                        byte[] qualifier, Action... assigned) {
-    super(table, family, qualifier, assigned);
+  public UserPermission(String user, TableName tableName, byte[] family, byte[] qualifier,
+      byte[] actionCodes) {
     this.user = user;
+    this.permission = new TablePermission(tableName, family, qualifier, actionCodes);
   }
 
   /**
-   * Creates a new instance for the given user, table, column family and
-   * qualifier, matching the actions with the given codes.
-   * @param user the user
-   * @param table the table
-   * @param family the family, can be null if action is allowed over the entire
-   *   table
-   * @param qualifier the column qualifier, can be null if action is allowed
-   *   over the entire column family
-   * @param actionCodes the list of allowed action codes
+   * Construct a user permission given permission.
+   * @param user user name
+   * @param permission one of [Global, Namespace, Table] permission
    */
-  public UserPermission(byte[] user, TableName table, byte[] family,
-                        byte[] qualifier, byte[] actionCodes) {
-    super(table, family, qualifier, actionCodes);
+  public UserPermission(String user, Permission permission) {
     this.user = user;
+    this.permission = permission;
   }
 
   /**
-   * Creates a new instance for the given user, table, column family and
-   * qualifier, matching the actions with the given codes.
-   * @param user the user
-   * @param perm a TablePermission
+   * Get this permission access scope.
+   * @return access scope
    */
-  public UserPermission(byte[] user, TablePermission perm) {
-    super(perm.getNamespace(), perm.getTableName(), perm.getFamily(), perm.getQualifier(),
-        perm.actions);
-    this.user = user;
+  public Permission.Scope getAccessScope() {
+    return permission.getAccessScope();
   }
 
-  public byte[] getUser() {
+  public String getUser() {
     return user;
   }
 
-  /**
-   * Returns true if this permission describes a global user permission.
-   */
-  public boolean isGlobal() {
-    return(!hasTable() && !hasNamespace());
+  public Permission getPermission() {
+    return permission;
   }
 
-  @Override
-  public boolean equals(Object obj) {
+  public boolean equalsExceptActions(Object obj) {
     if (!(obj instanceof UserPermission)) {
       return false;
     }
-    UserPermission other = (UserPermission)obj;
+    UserPermission other = (UserPermission) obj;
+    return user.equals(other.user) && permission.equalsExceptActions(other.permission);
+  }
 
-    if ((Bytes.equals(user, other.getUser()) &&
-        super.equals(obj))) {
-      return true;
-    } else {
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof UserPermission)) {
       return false;
     }
+    UserPermission other = (UserPermission) obj;
+    return user.equals(other.user) && permission.equals(other.permission);
   }
 
   @Override
   public int hashCode() {
     final int prime = 37;
-    int result = super.hashCode();
+    int result = permission.hashCode();
     if (user != null) {
-      result = prime * result + Bytes.hashCode(user);
+      result = prime * result + Objects.hashCode(user);
     }
     return result;
   }
@@ -186,20 +171,8 @@ public class UserPermission extends TablePermission {
   @Override
   public String toString() {
     StringBuilder str = new StringBuilder("UserPermission: ")
-        .append("user=").append(Bytes.toString(user))
-        .append(", ").append(super.toString());
+        .append("user=").append(user)
+        .append(", ").append(permission.toString());
     return str.toString();
   }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    super.readFields(in);
-    user = Bytes.readByteArray(in);
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    super.write(out);
-    Bytes.writeByteArray(out, user);
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/130057f1/hbase-common/src/main/java/org/apache/hadoop/hbase/AuthUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/AuthUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/AuthUtil.java
index 78da55d..1cf43e1 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/AuthUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/AuthUtil.java
@@ -68,7 +68,7 @@ import org.slf4j.LoggerFactory;
  * an example of configuring a user of this Auth Chore to run on a secure cluster.
  * <pre>
  * </pre>
- * This class will be internal use only from 2.2.0 version, and will transparently work
+ * This class will be internal used only from 2.2.0 version, and will transparently work
  * for kerberized applications. For more, please refer
  * <a href="http://hbase.apache.org/book.html#hbase.secure.configuration">Client-side Configuration for Secure Operation</a>
  *

http://git-wip-us.apache.org/repos/asf/hbase/blob/130057f1/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsWithACL.java
----------------------------------------------------------------------
diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsWithACL.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsWithACL.java
index b6f6463..59e5601 100644
--- a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsWithACL.java
+++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsWithACL.java
@@ -35,9 +35,9 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.access.AccessControlClient;
 import org.apache.hadoop.hbase.security.access.AccessControlLists;
+import org.apache.hadoop.hbase.security.access.AuthManager;
 import org.apache.hadoop.hbase.security.access.Permission;
 import org.apache.hadoop.hbase.security.access.SecureTestUtil;
-import org.apache.hadoop.hbase.security.access.TableAuthManager;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.SecurityTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -203,7 +203,7 @@ public class TestRSGroupsWithACL extends SecureTestUtil{
   public static void tearDownAfterClass() throws Exception {
     cleanUp();
     TEST_UTIL.shutdownMiniCluster();
-    int total = TableAuthManager.getTotalRefCount();
+    int total = AuthManager.getTotalRefCount();
     assertTrue("Unexpected reference count: " + total, total == 0);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/130057f1/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessChecker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessChecker.java
index c31658f..986efd7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessChecker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessChecker.java
@@ -50,7 +50,7 @@ public final class AccessChecker {
   // TODO: we should move to a design where we don't even instantiate an AccessChecker if
   // authorization is not enabled (like in RSRpcServices), instead of always instantiating one and
   // calling requireXXX() only to do nothing (since authorizationEnabled will be false).
-  private TableAuthManager authManager;
+  private AuthManager authManager;
 
   /** Group service to retrieve the user group information */
   private static Groups groupService;
@@ -75,7 +75,7 @@ public final class AccessChecker {
       throws RuntimeException {
     if (zkw != null) {
       try {
-        this.authManager = TableAuthManager.getOrCreate(zkw, conf);
+        this.authManager = AuthManager.getOrCreate(zkw, conf);
       } catch (IOException ioe) {
         throw new RuntimeException("Error obtaining AccessChecker", ioe);
       }
@@ -87,13 +87,13 @@ public final class AccessChecker {
   }
 
   /**
-   * Releases {@link TableAuthManager}'s reference.
+   * Releases {@link AuthManager}'s reference.
    */
   public void stop() {
-    TableAuthManager.release(authManager);
+    AuthManager.release(authManager);
   }
 
-  public TableAuthManager getAuthManager() {
+  public AuthManager getAuthManager() {
     return authManager;
   }
 
@@ -115,7 +115,7 @@ public final class AccessChecker {
     AuthResult result = null;
 
     for (Action permission : permissions) {
-      if (authManager.hasAccess(user, tableName, permission)) {
+      if (authManager.accessUserTable(user, tableName, permission)) {
         result = AuthResult.allow(request, "Table permission granted",
             user, permission, tableName, null, null);
         break;
@@ -164,7 +164,7 @@ public final class AccessChecker {
       return;
     }
     AuthResult result;
-    if (authManager.authorize(user, perm)) {
+    if (authManager.authorizeUserGlobal(user, perm)) {
       result = AuthResult.allow(request, "Global check allowed", user, perm, tableName, familyMap);
     } else {
       result = AuthResult.deny(request, "Global check failed", user, perm, tableName, familyMap);
@@ -195,7 +195,7 @@ public final class AccessChecker {
       return;
     }
     AuthResult authResult;
-    if (authManager.authorize(user, perm)) {
+    if (authManager.authorizeUserGlobal(user, perm)) {
       authResult = AuthResult.allow(request, "Global check allowed", user, perm, null);
       authResult.getParams().setNamespace(namespace);
       logResult(authResult);
@@ -225,7 +225,7 @@ public final class AccessChecker {
     AuthResult result = null;
 
     for (Action permission : permissions) {
-      if (authManager.authorize(user, namespace, permission)) {
+      if (authManager.authorizeUserNamespace(user, namespace, permission)) {
         result =
             AuthResult.allow(request, "Namespace permission granted", user, permission, namespace);
         break;
@@ -260,7 +260,7 @@ public final class AccessChecker {
     AuthResult result = null;
 
     for (Action permission : permissions) {
-      if (authManager.authorize(user, namespace, permission)) {
+      if (authManager.authorizeUserNamespace(user, namespace, permission)) {
         result =
             AuthResult.allow(request, "Namespace permission granted", user, permission, namespace);
         result.getParams().setTableName(tableName).setFamilies(familyMap);
@@ -299,7 +299,7 @@ public final class AccessChecker {
     AuthResult result = null;
 
     for (Action permission : permissions) {
-      if (authManager.authorize(user, tableName, family, qualifier, permission)) {
+      if (authManager.authorizeUserTable(user, tableName, family, qualifier, permission)) {
         result = AuthResult.allow(request, "Table permission granted",
             user, permission, tableName, family, qualifier);
         break;
@@ -337,7 +337,7 @@ public final class AccessChecker {
     AuthResult result = null;
 
     for (Action permission : permissions) {
-      if (authManager.authorize(user, tableName, null, null, permission)) {
+      if (authManager.authorizeUserTable(user, tableName, permission)) {
         result = AuthResult.allow(request, "Table permission granted",
             user, permission, tableName, null, null);
         result.getParams().setFamily(family).setQualifier(qualifier);

http://git-wip-us.apache.org/repos/asf/hbase/blob/130057f1/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlFilter.java
index fd48641..79233df 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlFilter.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
  *
  * <p>
  * TODO: There is room for further performance optimization here.
- * Calling TableAuthManager.authorize() per KeyValue imposes a fair amount of
+ * Calling AuthManager.authorize() per KeyValue imposes a fair amount of
  * overhead.  A more optimized solution might look at the qualifiers where
  * permissions are actually granted and explicitly limit the scan to those.
  * </p>
@@ -58,7 +58,7 @@ class AccessControlFilter extends FilterBase {
     CHECK_CELL_DEFAULT,
   }
 
-  private TableAuthManager authManager;
+  private AuthManager authManager;
   private TableName table;
   private User user;
   private boolean isSystemTable;
@@ -75,7 +75,7 @@ class AccessControlFilter extends FilterBase {
   AccessControlFilter() {
   }
 
-  AccessControlFilter(TableAuthManager mgr, User ugi, TableName tableName,
+  AccessControlFilter(AuthManager mgr, User ugi, TableName tableName,
       Strategy strategy, Map<ByteRange, Integer> cfVsMaxVersions) {
     authManager = mgr;
     table = tableName;
@@ -119,20 +119,20 @@ class AccessControlFilter extends FilterBase {
       return ReturnCode.SKIP;
     }
     // XXX: Compare in place, don't clone
-    byte[] family = CellUtil.cloneFamily(cell);
-    byte[] qualifier = CellUtil.cloneQualifier(cell);
+    byte[] f = CellUtil.cloneFamily(cell);
+    byte[] q = CellUtil.cloneQualifier(cell);
     switch (strategy) {
       // Filter only by checking the table or CF permissions
       case CHECK_TABLE_AND_CF_ONLY: {
-        if (authManager.authorize(user, table, family, qualifier, Permission.Action.READ)) {
+        if (authManager.authorizeUserTable(user, table, f, q, Permission.Action.READ)) {
           return ReturnCode.INCLUDE;
         }
       }
       break;
       // Cell permissions can override table or CF permissions
       case CHECK_CELL_DEFAULT: {
-        if (authManager.authorize(user, table, family, qualifier, Permission.Action.READ) ||
-            authManager.authorize(user, table, cell, Permission.Action.READ)) {
+        if (authManager.authorizeUserTable(user, table, f, q, Permission.Action.READ) ||
+            authManager.authorizeCell(user, table, cell, Permission.Action.READ)) {
           return ReturnCode.INCLUDE;
         }
       }