You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2012/06/19 04:24:33 UTC

svn commit: r1351556 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/security/access/ test/java/org/apache/hadoop/hbase/security/access/

Author: apurtell
Date: Tue Jun 19 02:24:33 2012
New Revision: 1351556

URL: http://svn.apache.org/viewvc?rev=1351556&view=rev
Log:
HBASE-6188. Remove the concept of table owner

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java?rev=1351556&r1=1351555&r2=1351556&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java Tue Jun 19 02:24:33 2012
@@ -1195,11 +1195,13 @@ public class HTableDescriptor implements
               .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
       });
 
+  @Deprecated
   public void setOwner(User owner) {
     setOwnerString(owner != null ? owner.getShortName() : null);
   }
 
   // used by admin.rb:alter(table_name,*args) to update owner.
+  @Deprecated
   public void setOwnerString(String ownerString) {
     if (ownerString != null) {
       setValue(OWNER_KEY, Bytes.toBytes(ownerString));
@@ -1208,12 +1210,14 @@ public class HTableDescriptor implements
     }
   }
 
+  @Deprecated
   public String getOwnerString() {
     if (getValue(OWNER_KEY) != null) {
       return Bytes.toString(getValue(OWNER_KEY));
     }
     // Note that every table should have an owner (i.e. should have OWNER_KEY set).
-    // .META. and -ROOT- should return system user as owner, not null (see MasterFileSystem.java:bootstrap()).
+    // .META. and -ROOT- should return system user as owner, not null (see
+    // MasterFileSystem.java:bootstrap()).
     return null;
   }
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java?rev=1351556&r1=1351555&r2=1351556&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java Tue Jun 19 02:24:33 2012
@@ -288,6 +288,13 @@ public class AccessControlLists {
   }
 
   /**
+   * Returns {@code true} if the given table is {@code _acl_} metadata table.
+   */
+  static boolean isAclTable(HTableDescriptor desc) {
+    return Bytes.equals(ACL_TABLE_NAME, desc.getName());
+  }
+
+  /**
    * Loads all of the permission grants stored in a region of the {@code _acl_}
    * table.
    *

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java?rev=1351556&r1=1351555&r2=1351556&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java Tue Jun 19 02:24:33 2012
@@ -14,14 +14,14 @@
 
 package org.apache.hadoop.hbase.security.access;
 
-import java.io.*;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.TreeSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -54,13 +54,12 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.security.AccessDeniedException;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.access.Permission.Action;
 import org.apache.hadoop.hbase.util.Bytes;
 
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.MapMaker;
@@ -250,7 +249,6 @@ public class AccessController extends Ba
       RegionCoprocessorEnvironment e,
       Map<byte [], ? extends Collection<?>> families) {
     HRegionInfo hri = e.getRegion().getRegionInfo();
-    HTableDescriptor htd = e.getRegion().getTableDesc();
     byte[] tableName = hri.getTableName();
 
     // 1. All users need read access to .META. and -ROOT- tables.
@@ -279,19 +277,12 @@ public class AccessController extends Ba
        return AuthResult.allow("Table permission granted", user, permRequest, tableName);
     }
 
-    // 2. The table owner has full privileges
-    String owner = htd.getOwnerString();
-    if (user.getShortName().equals(owner)) {
-      // owner of the table has full access
-      return AuthResult.allow("User is table owner", user, permRequest, tableName);
-    }
-
-    // 3. check for the table-level, if successful we can short-circuit
+    // 2. check for the table-level, if successful we can short-circuit
     if (authManager.authorize(user, tableName, (byte[])null, permRequest)) {
       return AuthResult.allow("Table permission granted", user, permRequest, tableName);
     }
 
-    // 4. check permissions against the requested families
+    // 3. check permissions against the requested families
     if (families != null && families.size() > 0) {
       // all families must pass
       for (Map.Entry<byte [], ? extends Collection<?>> family : families.entrySet()) {
@@ -335,7 +326,7 @@ public class AccessController extends Ba
           tableName);
     }
 
-    // 5. no families to check and table level access failed
+    // 4. no families to check and table level access failed
     return AuthResult.deny("No families to check and table permission failed",
         user, permRequest, tableName);
   }
@@ -365,38 +356,23 @@ public class AccessController extends Ba
   }
 
   /**
-   * Authorizes that the current user has "admin" privileges for the given table.
-   * that means he/she can edit/modify/delete the table.
-   * If current user is the table owner, and has CREATE permission,
-   * then he/she has table admin permission. otherwise ADMIN rights are checked.
-   * @param e Coprocessor environment
+   * Authorizes that the current user has any of the given permissions for the given table.
    * @param tableName Table requested
    * @throws IOException if obtaining the current user fails
-   * @throws AccessDeniedException if authorization is denied
+   * @throws AccessDeniedException if user has no authorization
    */
-  private void requireTableAdminPermission(CoprocessorEnvironment e, byte[] tableName)
-      throws IOException {
+  private void requireTablePermission(byte[] tableName, Action... permissions) throws IOException {
     User user = getActiveUser();
     AuthResult result = null;
 
-    // Table admins are allowed to perform DDL
-    if (authManager.authorize(user, tableName, (byte[]) null, TablePermission.Action.ADMIN)) {
-      result = AuthResult.allow("Table permission granted", user, TablePermission.Action.ADMIN,
-          tableName);
-    } else if (isActiveUserTableOwner(e, tableName)) {
-      // Table owners with Create permission are allowed to perform DDL
-      if (authManager.authorize(user, tableName, (byte[]) null, TablePermission.Action.CREATE)) {
-        result = AuthResult.allow("Owner has table permission", user,
-            TablePermission.Action.CREATE, tableName);
+    for (Action permission : permissions) {
+      if (authManager.authorize(user, tableName, (byte[]) null, permission)) {
+        result = AuthResult.allow("Table permission granted", user, permission, tableName);
+        break;
       } else {
-        // Table owners without Create permission cannot perform DDL
-        result = AuthResult.deny("Insufficient permissions", user, TablePermission.Action.CREATE,
-            tableName);
+        // rest of the world
+        result = AuthResult.deny("Insufficient permissions", user, permission, tableName);
       }
-    } else {
-      // rest of the world
-      result = AuthResult.deny("Insufficient permissions", user, TablePermission.Action.ADMIN,
-          tableName);
     }
     logResult(result);
     if (!result.isAllowed()) {
@@ -540,21 +516,25 @@ public class AccessController extends Ba
   public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
     requirePermission(Permission.Action.CREATE);
-
-    // default the table owner if not specified
-    User owner = getActiveUser();
-    if (desc.getOwnerString() == null ||
-        desc.getOwnerString().equals("")) {
-      desc.setOwner(owner);
-    }
   }
+
   @Override
   public void preCreateTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {}
 
   @Override
   public void postCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
-      HTableDescriptor desc, HRegionInfo[] regions) throws IOException {}
+      HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
+    if (!AccessControlLists.isAclTable(desc)) {
+      String owner = desc.getOwnerString();
+      // default the table owner to current user, if not specified.
+      if (owner == null) owner = getActiveUser().getShortName();
+      UserPermission userperm = new UserPermission(Bytes.toBytes(owner), desc.getName(), null,
+          Action.values());
+      AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(), userperm);
+    }
+  }
+
   @Override
   public void postCreateTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {}
@@ -562,7 +542,7 @@ public class AccessController extends Ba
   @Override
   public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
       byte[] tableName) throws IOException {
-    requireTableAdminPermission(c.getEnvironment(), tableName);
+    requireTablePermission(tableName, Action.ADMIN, Action.CREATE);
   }
   @Override
   public void preDeleteTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
@@ -579,14 +559,23 @@ public class AccessController extends Ba
   @Override
   public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
       byte[] tableName, HTableDescriptor htd) throws IOException {
-    requireTableAdminPermission(c.getEnvironment(), tableName);
+    requireTablePermission(tableName, Action.ADMIN, Action.CREATE);
   }
   @Override
   public void preModifyTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
       byte[] tableName, HTableDescriptor htd) throws IOException {}
+
   @Override
   public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
-      byte[] tableName, HTableDescriptor htd) throws IOException {}
+      byte[] tableName, HTableDescriptor htd) throws IOException {
+    String owner = htd.getOwnerString();
+    // default the table owner to current user, if not specified.
+    if (owner == null) owner = getActiveUser().getShortName();
+    UserPermission userperm = new UserPermission(Bytes.toBytes(owner), htd.getName(), null,
+        Action.values());
+    AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(), userperm);
+  }
+
   @Override
   public void postModifyTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
       byte[] tableName, HTableDescriptor htd) throws IOException {}
@@ -595,7 +584,7 @@ public class AccessController extends Ba
   @Override
   public void preAddColumn(ObserverContext<MasterCoprocessorEnvironment> c,
       byte[] tableName, HColumnDescriptor column) throws IOException {
-    requireTableAdminPermission(c.getEnvironment(), tableName);
+    requireTablePermission(tableName, Action.ADMIN, Action.CREATE);
   }
   @Override
   public void preAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
@@ -610,7 +599,7 @@ public class AccessController extends Ba
   @Override
   public void preModifyColumn(ObserverContext<MasterCoprocessorEnvironment> c,
       byte[] tableName, HColumnDescriptor descriptor) throws IOException {
-    requireTableAdminPermission(c.getEnvironment(), tableName);
+    requireTablePermission(tableName, Action.ADMIN, Action.CREATE);
   }
   @Override
   public void preModifyColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
@@ -626,7 +615,7 @@ public class AccessController extends Ba
   @Override
   public void preDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c,
       byte[] tableName, byte[] col) throws IOException {
-    requireTableAdminPermission(c.getEnvironment(), tableName);
+    requireTablePermission(tableName, Action.ADMIN, Action.CREATE);
   }
   @Override
   public void preDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
@@ -644,7 +633,7 @@ public class AccessController extends Ba
   @Override
   public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c,
       byte[] tableName) throws IOException {
-    requireTableAdminPermission(c.getEnvironment(), tableName);
+    requireTablePermission(tableName, Action.ADMIN, Action.CREATE);
   }
   @Override
   public void preEnableTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
@@ -659,7 +648,7 @@ public class AccessController extends Ba
   @Override
   public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c,
       byte[] tableName) throws IOException {
-    requireTableAdminPermission(c.getEnvironment(), tableName);
+    requireTablePermission(tableName, Action.ADMIN, Action.CREATE);
   }
   @Override
   public void preDisableTableHandler(ObserverContext<MasterCoprocessorEnvironment> c,
@@ -773,18 +762,18 @@ public class AccessController extends Ba
 
   @Override
   public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
-    requireTableAdminPermission(e.getEnvironment(), getTableName(e.getEnvironment()));
+    requireTablePermission(getTableName(e.getEnvironment()), Action.ADMIN);
   }
 
   @Override
   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
-    requireTableAdminPermission(e.getEnvironment(), getTableName(e.getEnvironment()));
+    requireTablePermission(getTableName(e.getEnvironment()), Action.ADMIN);
   }
 
   @Override
   public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
       final Store store, final InternalScanner scanner) throws IOException {
-    requireTableAdminPermission(e.getEnvironment(), getTableName(e.getEnvironment()));
+    requireTablePermission(getTableName(e.getEnvironment()), Action.ADMIN);
     return scanner;
   }
 
@@ -1155,15 +1144,4 @@ public class AccessController extends Ba
     }
     return tableName;
   }
-
-  private String getTableOwner(CoprocessorEnvironment e, byte[] tableName) throws IOException {
-    HTableDescriptor htd = e.getTable(tableName).getTableDescriptor();
-    return htd.getOwnerString();
-  }
-
-  private boolean isActiveUserTableOwner(CoprocessorEnvironment e, byte[] tableName)
-      throws IOException {
-    String activeUser = getActiveUser().getShortName();
-    return activeUser.equals(getTableOwner(e, tableName));
-  }
 }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java?rev=1351556&r1=1351555&r2=1351556&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java Tue Jun 19 02:24:33 2012
@@ -27,11 +27,8 @@ import java.security.PrivilegedException
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Coprocessor;
-import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -58,6 +55,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
 import org.apache.hadoop.hbase.security.AccessDeniedException;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.access.Permission.Action;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -69,8 +67,8 @@ import org.junit.experimental.categories
  * levels of authorized users.
  */
 @Category(LargeTests.class)
+@SuppressWarnings("rawtypes")
 public class TestAccessController {
-  private static Log LOG = LogFactory.getLog(TestAccessController.class);
   private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
   private static Configuration conf;
 
@@ -78,14 +76,14 @@ public class TestAccessController {
   private static User SUPERUSER;
   // user granted with all global permission
   private static User USER_ADMIN;
-  // table owner user
-  private static User USER_OWNER;
   // user with rw permissions
   private static User USER_RW;
   // user with read-only permissions
   private static User USER_RO;
-  // user with table admin permissions
-  private static User USER_TBLADM;
+  // user is table owner. will have all permissions on table
+  private static User USER_OWNER;
+  // user with create table permissions alone
+  private static User USER_CREATE;
   // user with no permissions
   private static User USER_NONE;
 
@@ -103,51 +101,52 @@ public class TestAccessController {
     SecureTestUtil.enableSecurity(conf);
 
     TEST_UTIL.startMiniCluster();
-    MasterCoprocessorHost cpHost = TEST_UTIL.getMiniHBaseCluster()
-        .getMaster().getCoprocessorHost();
+    MasterCoprocessorHost cpHost = TEST_UTIL.getMiniHBaseCluster().getMaster().getCoprocessorHost();
     cpHost.load(AccessController.class, Coprocessor.PRIORITY_HIGHEST, conf);
-    ACCESS_CONTROLLER = (AccessController)cpHost.findCoprocessor(
-        AccessController.class.getName());
+    ACCESS_CONTROLLER = (AccessController) cpHost.findCoprocessor(AccessController.class.getName());
     CP_ENV = cpHost.createEnvironment(AccessController.class, ACCESS_CONTROLLER,
-        Coprocessor.PRIORITY_HIGHEST, 1, conf);
+      Coprocessor.PRIORITY_HIGHEST, 1, conf);
+
+    // Wait for the ACL table to become available
+    TEST_UTIL.waitTableAvailable(AccessControlLists.ACL_TABLE_NAME, 5000);
 
     // create a set of test users
-    SUPERUSER = User.createUserForTesting(conf, "admin", new String[]{"supergroup"});
+    SUPERUSER = User.createUserForTesting(conf, "admin", new String[] { "supergroup" });
     USER_ADMIN = User.createUserForTesting(conf, "admin2", new String[0]);
-    USER_OWNER = User.createUserForTesting(conf, "owner", new String[0]);
     USER_RW = User.createUserForTesting(conf, "rwuser", new String[0]);
     USER_RO = User.createUserForTesting(conf, "rouser", new String[0]);
-    USER_TBLADM = User.createUserForTesting(conf, "tbladm", new String[0]);
+    USER_OWNER = User.createUserForTesting(conf, "owner", new String[0]);
+    USER_CREATE = User.createUserForTesting(conf, "tbl_create", new String[0]);
     USER_NONE = User.createUserForTesting(conf, "nouser", new String[0]);
 
     HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
     HTableDescriptor htd = new HTableDescriptor(TEST_TABLE);
     htd.addFamily(new HColumnDescriptor(TEST_FAMILY));
-    htd.setOwnerString(USER_OWNER.getShortName());
+    htd.setOwner(USER_OWNER);
     admin.createTable(htd);
 
     // initilize access control
     HTable meta = new HTable(conf, AccessControlLists.ACL_TABLE_NAME);
-    AccessControllerProtocol protocol =
-        meta.coprocessorProxy(AccessControllerProtocol.class, TEST_TABLE);
+    AccessControllerProtocol protocol = meta.coprocessorProxy(AccessControllerProtocol.class,
+      TEST_TABLE);
 
     HRegion region = TEST_UTIL.getHBaseCluster().getRegions(TEST_TABLE).get(0);
     RegionCoprocessorHost rcpHost = region.getCoprocessorHost();
     RCP_ENV = rcpHost.createEnvironment(AccessController.class, ACCESS_CONTROLLER,
-        Coprocessor.PRIORITY_HIGHEST, 1, conf);
+      Coprocessor.PRIORITY_HIGHEST, 1, conf);
 
     protocol.grant(new UserPermission(Bytes.toBytes(USER_ADMIN.getShortName()),
-                      Permission.Action.ADMIN, Permission.Action.CREATE,
-                      Permission.Action.READ, Permission.Action.WRITE));
+        Permission.Action.ADMIN, Permission.Action.CREATE, Permission.Action.READ,
+        Permission.Action.WRITE));
 
-    protocol.grant(new UserPermission(Bytes.toBytes(USER_RW.getShortName()),
-        TEST_TABLE, TEST_FAMILY, Permission.Action.READ, Permission.Action.WRITE));
+    protocol.grant(new UserPermission(Bytes.toBytes(USER_RW.getShortName()), TEST_TABLE,
+        TEST_FAMILY, Permission.Action.READ, Permission.Action.WRITE));
 
-    protocol.grant(new UserPermission(Bytes.toBytes(USER_RO.getShortName()),
-                   TEST_TABLE, TEST_FAMILY, Permission.Action.READ));
+    protocol.grant(new UserPermission(Bytes.toBytes(USER_RO.getShortName()), TEST_TABLE,
+        TEST_FAMILY, Permission.Action.READ));
 
-    protocol.grant(new UserPermission(Bytes.toBytes(USER_TBLADM.getShortName()),
-      TEST_TABLE, null, Permission.Action.ADMIN));    
+    protocol.grant(new UserPermission(Bytes.toBytes(USER_CREATE.getShortName()), TEST_TABLE, null,
+        Permission.Action.CREATE));
   }
 
   @AfterClass
@@ -155,8 +154,7 @@ public class TestAccessController {
     TEST_UTIL.shutdownMiniCluster();
   }
 
-  public void verifyAllowed(User user, PrivilegedExceptionAction... actions)
-    throws Exception {
+  public void verifyAllowed(User user, PrivilegedExceptionAction... actions) throws Exception {
     for (PrivilegedExceptionAction action : actions) {
       try {
         user.runAs(action);
@@ -166,15 +164,13 @@ public class TestAccessController {
     }
   }
 
-  public void verifyAllowed(PrivilegedExceptionAction action, User... users)
-    throws Exception {
+  public void verifyAllowed(PrivilegedExceptionAction action, User... users) throws Exception {
     for (User user : users) {
       verifyAllowed(user, action);
     }
   }
 
-  public void verifyDenied(User user, PrivilegedExceptionAction... actions)
-    throws Exception {
+  public void verifyDenied(User user, PrivilegedExceptionAction... actions) throws Exception {
     for (PrivilegedExceptionAction action : actions) {
       try {
         user.runAs(action);
@@ -199,12 +195,11 @@ public class TestAccessController {
     }
   }
 
-  public void verifyDenied(PrivilegedExceptionAction action, User... users)
-      throws Exception {
-      for (User user : users) {
-        verifyDenied(user, action);
-      }
+  public void verifyDenied(PrivilegedExceptionAction action, User... users) throws Exception {
+    for (User user : users) {
+      verifyDenied(user, action);
     }
+  }
 
   @Test
   public void testTableCreate() throws Exception {
@@ -212,21 +207,16 @@ public class TestAccessController {
       public Object run() throws Exception {
         HTableDescriptor htd = new HTableDescriptor("testnewtable");
         htd.addFamily(new HColumnDescriptor(TEST_FAMILY));
-        ACCESS_CONTROLLER.preCreateTable(
-            ObserverContext.createAndPrepare(CP_ENV, null), htd, null);
+        ACCESS_CONTROLLER.preCreateTable(ObserverContext.createAndPrepare(CP_ENV, null), htd, null);
         return null;
       }
     };
 
     // verify that superuser can create tables
-    verifyAllowed(SUPERUSER, createTable);
-    verifyAllowed(USER_ADMIN, createTable);
+    verifyAllowed(createTable, SUPERUSER, USER_ADMIN);
 
     // all others should be denied
-    verifyDenied(USER_OWNER, createTable);
-    verifyDenied(USER_RW, createTable);
-    verifyDenied(USER_RO, createTable);
-    verifyDenied(USER_NONE, createTable);
+    verifyDenied(createTable, USER_CREATE, USER_RW, USER_RO, USER_NONE);
   }
 
   @Test
@@ -235,43 +225,29 @@ public class TestAccessController {
       public Object run() throws Exception {
         HTableDescriptor htd = new HTableDescriptor(TEST_TABLE);
         htd.addFamily(new HColumnDescriptor(TEST_FAMILY));
-        htd.addFamily(new HColumnDescriptor("fam_"+User.getCurrent().getShortName()));
-        ACCESS_CONTROLLER.preModifyTable(ObserverContext.createAndPrepare(CP_ENV, null), TEST_TABLE, htd);
+        htd.addFamily(new HColumnDescriptor("fam_" + User.getCurrent().getShortName()));
+        ACCESS_CONTROLLER.preModifyTable(ObserverContext.createAndPrepare(CP_ENV, null),
+          TEST_TABLE, htd);
         return null;
       }
     };
 
-    // all others should be denied
-    verifyDenied(USER_OWNER, modifyTable);
-    verifyDenied(USER_RW, modifyTable);
-    verifyDenied(USER_RO, modifyTable);
-    verifyDenied(USER_NONE, modifyTable);
-
-    // verify that superuser can create tables
-    verifyAllowed(SUPERUSER, modifyTable);
-    verifyAllowed(USER_ADMIN, modifyTable);
-    verifyAllowed(USER_TBLADM, modifyTable);
+    verifyAllowed(modifyTable, SUPERUSER, USER_ADMIN, USER_CREATE, USER_OWNER);
+    verifyDenied(modifyTable, USER_RW, USER_RO, USER_NONE);
   }
 
   @Test
   public void testTableDelete() throws Exception {
     PrivilegedExceptionAction deleteTable = new PrivilegedExceptionAction() {
       public Object run() throws Exception {
-        ACCESS_CONTROLLER.preDeleteTable(ObserverContext.createAndPrepare(CP_ENV, null), TEST_TABLE);
+        ACCESS_CONTROLLER
+            .preDeleteTable(ObserverContext.createAndPrepare(CP_ENV, null), TEST_TABLE);
         return null;
       }
     };
 
-    // all others should be denied
-    verifyDenied(USER_OWNER, deleteTable);
-    verifyDenied(USER_RW, deleteTable);
-    verifyDenied(USER_RO, deleteTable);
-    verifyDenied(USER_NONE, deleteTable);
-
-    // verify that superuser can create tables
-    verifyAllowed(SUPERUSER, deleteTable);
-    verifyAllowed(USER_ADMIN, deleteTable);
-    verifyAllowed(USER_TBLADM, deleteTable);
+    verifyAllowed(deleteTable, SUPERUSER, USER_ADMIN, USER_CREATE, USER_OWNER);
+    verifyDenied(deleteTable, USER_RW, USER_RO, USER_NONE);
   }
 
   @Test
@@ -279,21 +255,14 @@ public class TestAccessController {
     final HColumnDescriptor hcd = new HColumnDescriptor("fam_new");
     PrivilegedExceptionAction action = new PrivilegedExceptionAction() {
       public Object run() throws Exception {
-        ACCESS_CONTROLLER.preAddColumn(ObserverContext.createAndPrepare(CP_ENV, null), TEST_TABLE, hcd);
+        ACCESS_CONTROLLER.preAddColumn(ObserverContext.createAndPrepare(CP_ENV, null), TEST_TABLE,
+          hcd);
         return null;
       }
     };
 
-    // all others should be denied
-    verifyDenied(USER_OWNER, action);
-    verifyDenied(USER_RW, action);
-    verifyDenied(USER_RO, action);
-    verifyDenied(USER_NONE, action);
-
-    // verify that superuser can create tables
-    verifyAllowed(SUPERUSER, action);
-    verifyAllowed(USER_ADMIN, action);
-    verifyAllowed(USER_TBLADM, action);
+    verifyAllowed(action, SUPERUSER, USER_ADMIN, USER_CREATE, USER_OWNER);
+    verifyDenied(action, USER_RW, USER_RO, USER_NONE);
   }
 
   @Test
@@ -302,162 +271,110 @@ public class TestAccessController {
     hcd.setMaxVersions(10);
     PrivilegedExceptionAction action = new PrivilegedExceptionAction() {
       public Object run() throws Exception {
-        ACCESS_CONTROLLER.preModifyColumn(ObserverContext.createAndPrepare(CP_ENV, null), TEST_TABLE, hcd);
+        ACCESS_CONTROLLER.preModifyColumn(ObserverContext.createAndPrepare(CP_ENV, null),
+          TEST_TABLE, hcd);
         return null;
       }
     };
 
-    // all others should be denied
-    verifyDenied(USER_OWNER, action);
-    verifyDenied(USER_RW, action);
-    verifyDenied(USER_RO, action);
-    verifyDenied(USER_NONE, action);
-
-    // verify that superuser can create tables
-    verifyAllowed(SUPERUSER, action);
-    verifyAllowed(USER_ADMIN, action);
-    verifyAllowed(USER_TBLADM, action);
+    verifyAllowed(action, SUPERUSER, USER_ADMIN, USER_CREATE, USER_OWNER);
+    verifyDenied(action, USER_RW, USER_RO, USER_NONE);
   }
 
   @Test
   public void testDeleteColumn() throws Exception {
     PrivilegedExceptionAction action = new PrivilegedExceptionAction() {
       public Object run() throws Exception {
-        ACCESS_CONTROLLER.preDeleteColumn(ObserverContext.createAndPrepare(CP_ENV, null), TEST_TABLE, TEST_FAMILY);
+        ACCESS_CONTROLLER.preDeleteColumn(ObserverContext.createAndPrepare(CP_ENV, null),
+          TEST_TABLE, TEST_FAMILY);
         return null;
       }
     };
 
-    // all others should be denied
-    verifyDenied(USER_OWNER, action);
-    verifyDenied(USER_RW, action);
-    verifyDenied(USER_RO, action);
-    verifyDenied(USER_NONE, action);
-
-    // verify that superuser can create tables
-    verifyAllowed(SUPERUSER, action);
-    verifyAllowed(USER_ADMIN, action);
-    verifyAllowed(USER_TBLADM, action);
+    verifyAllowed(action, SUPERUSER, USER_ADMIN, USER_CREATE, USER_OWNER);
+    verifyDenied(action, USER_RW, USER_RO, USER_NONE);
   }
 
   @Test
   public void testTableDisable() throws Exception {
     PrivilegedExceptionAction disableTable = new PrivilegedExceptionAction() {
       public Object run() throws Exception {
-        ACCESS_CONTROLLER.preDisableTable(ObserverContext.createAndPrepare(CP_ENV, null), TEST_TABLE);
+        ACCESS_CONTROLLER.preDisableTable(ObserverContext.createAndPrepare(CP_ENV, null),
+          TEST_TABLE);
         return null;
       }
     };
 
-    // all others should be denied
-    verifyDenied(USER_OWNER, disableTable);
-    verifyDenied(USER_RW, disableTable);
-    verifyDenied(USER_RO, disableTable);
-    verifyDenied(USER_NONE, disableTable);
-
-    // verify that superuser can create tables
-    verifyAllowed(SUPERUSER, disableTable);
-    verifyAllowed(USER_ADMIN, disableTable);
-    verifyAllowed(USER_TBLADM, disableTable);
+    verifyAllowed(disableTable, SUPERUSER, USER_ADMIN, USER_CREATE, USER_OWNER);
+    verifyDenied(disableTable, USER_RW, USER_RO, USER_NONE);
   }
 
   @Test
   public void testTableEnable() throws Exception {
     PrivilegedExceptionAction enableTable = new PrivilegedExceptionAction() {
       public Object run() throws Exception {
-        ACCESS_CONTROLLER.preEnableTable(ObserverContext.createAndPrepare(CP_ENV, null), TEST_TABLE);
+        ACCESS_CONTROLLER
+            .preEnableTable(ObserverContext.createAndPrepare(CP_ENV, null), TEST_TABLE);
         return null;
       }
     };
 
-    // all others should be denied
-    verifyDenied(USER_OWNER, enableTable);
-    verifyDenied(USER_RW, enableTable);
-    verifyDenied(USER_RO, enableTable);
-    verifyDenied(USER_NONE, enableTable);
-
-    // verify that superuser can create tables
-    verifyAllowed(SUPERUSER, enableTable);
-    verifyAllowed(USER_ADMIN, enableTable);
-    verifyAllowed(USER_TBLADM, enableTable);
+    verifyAllowed(enableTable, SUPERUSER, USER_ADMIN, USER_CREATE, USER_OWNER);
+    verifyDenied(enableTable, USER_RW, USER_RO, USER_NONE);
   }
 
   @Test
   public void testMove() throws Exception {
     HTable table = new HTable(TEST_UTIL.getConfiguration(), TEST_TABLE);
-    Map<HRegionInfo,HServerAddress> regions = table.getRegionsInfo();
-    final Map.Entry<HRegionInfo,HServerAddress> firstRegion =
-        regions.entrySet().iterator().next();
+    Map<HRegionInfo, HServerAddress> regions = table.getRegionsInfo();
+    final Map.Entry<HRegionInfo, HServerAddress> firstRegion = regions.entrySet().iterator().next();
     final ServerName server = TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName();
     PrivilegedExceptionAction action = new PrivilegedExceptionAction() {
       public Object run() throws Exception {
         ACCESS_CONTROLLER.preMove(ObserverContext.createAndPrepare(CP_ENV, null),
-            firstRegion.getKey(), server, server);
+          firstRegion.getKey(), server, server);
         return null;
       }
     };
 
-    // all others should be denied
-    verifyDenied(USER_OWNER, action);
-    verifyDenied(USER_RW, action);
-    verifyDenied(USER_RO, action);
-    verifyDenied(USER_NONE, action);
-
-    // verify that superuser can create tables
-    verifyAllowed(SUPERUSER, action);
-    verifyAllowed(USER_ADMIN, action);
+    verifyAllowed(action, SUPERUSER, USER_ADMIN);
+    verifyDenied(action, USER_CREATE, USER_OWNER, USER_RW, USER_RO, USER_NONE);
   }
 
   @Test
   public void testAssign() throws Exception {
     HTable table = new HTable(TEST_UTIL.getConfiguration(), TEST_TABLE);
-    Map<HRegionInfo,HServerAddress> regions = table.getRegionsInfo();
-    final Map.Entry<HRegionInfo,HServerAddress> firstRegion =
-        regions.entrySet().iterator().next();
+    Map<HRegionInfo, HServerAddress> regions = table.getRegionsInfo();
+    final Map.Entry<HRegionInfo, HServerAddress> firstRegion = regions.entrySet().iterator().next();
 
     PrivilegedExceptionAction action = new PrivilegedExceptionAction() {
       public Object run() throws Exception {
         ACCESS_CONTROLLER.preAssign(ObserverContext.createAndPrepare(CP_ENV, null),
-            firstRegion.getKey());
+          firstRegion.getKey());
         return null;
       }
     };
 
-    // all others should be denied
-    verifyDenied(USER_OWNER, action);
-    verifyDenied(USER_RW, action);
-    verifyDenied(USER_RO, action);
-    verifyDenied(USER_NONE, action);
-
-    // verify that superuser can create tables
-    verifyAllowed(SUPERUSER, action);
-    verifyAllowed(USER_ADMIN, action);
+    verifyAllowed(action, SUPERUSER, USER_ADMIN);
+    verifyDenied(action, USER_CREATE, USER_OWNER, USER_RW, USER_RO, USER_NONE);
   }
 
   @Test
   public void testUnassign() throws Exception {
     HTable table = new HTable(TEST_UTIL.getConfiguration(), TEST_TABLE);
-    Map<HRegionInfo,HServerAddress> regions = table.getRegionsInfo();
-    final Map.Entry<HRegionInfo,HServerAddress> firstRegion =
-        regions.entrySet().iterator().next();
+    Map<HRegionInfo, HServerAddress> regions = table.getRegionsInfo();
+    final Map.Entry<HRegionInfo, HServerAddress> firstRegion = regions.entrySet().iterator().next();
 
     PrivilegedExceptionAction action = new PrivilegedExceptionAction() {
       public Object run() throws Exception {
         ACCESS_CONTROLLER.preUnassign(ObserverContext.createAndPrepare(CP_ENV, null),
-            firstRegion.getKey(), false);
+          firstRegion.getKey(), false);
         return null;
       }
     };
 
-    // all others should be denied
-    verifyDenied(USER_OWNER, action);
-    verifyDenied(USER_RW, action);
-    verifyDenied(USER_RO, action);
-    verifyDenied(USER_NONE, action);
-
-    // verify that superuser can create tables
-    verifyAllowed(SUPERUSER, action);
-    verifyAllowed(USER_ADMIN, action);
+    verifyAllowed(action, SUPERUSER, USER_ADMIN);
+    verifyDenied(action, USER_CREATE, USER_OWNER, USER_RW, USER_RO, USER_NONE);
   }
 
   @Test
@@ -469,15 +386,8 @@ public class TestAccessController {
       }
     };
 
-    // all others should be denied
-    verifyDenied(USER_OWNER, action);
-    verifyDenied(USER_RW, action);
-    verifyDenied(USER_RO, action);
-    verifyDenied(USER_NONE, action);
-
-    // verify that superuser can create tables
-    verifyAllowed(SUPERUSER, action);
-    verifyAllowed(USER_ADMIN, action);
+    verifyAllowed(action, SUPERUSER, USER_ADMIN);
+    verifyDenied(action, USER_CREATE, USER_OWNER, USER_RW, USER_RO, USER_NONE);
   }
 
   @Test
@@ -489,15 +399,8 @@ public class TestAccessController {
       }
     };
 
-    // all others should be denied
-    verifyDenied(USER_OWNER, action);
-    verifyDenied(USER_RW, action);
-    verifyDenied(USER_RO, action);
-    verifyDenied(USER_NONE, action);
-
-    // verify that superuser can create tables
-    verifyAllowed(SUPERUSER, action);
-    verifyAllowed(USER_ADMIN, action);
+    verifyAllowed(action, SUPERUSER, USER_ADMIN);
+    verifyDenied(action, USER_CREATE, USER_OWNER, USER_RW, USER_RO, USER_NONE);
   }
 
   @Test
@@ -509,15 +412,8 @@ public class TestAccessController {
       }
     };
 
-    // all others should be denied
-    verifyDenied(USER_OWNER, action);
-    verifyDenied(USER_RW, action);
-    verifyDenied(USER_RO, action);
-    verifyDenied(USER_NONE, action);
-
-    // verify that superuser can create tables
-    verifyAllowed(SUPERUSER, action);
-    verifyAllowed(USER_ADMIN, action);
+    verifyAllowed(action, SUPERUSER, USER_ADMIN);
+    verifyDenied(action, USER_CREATE, USER_OWNER, USER_RW, USER_RO, USER_NONE);
   }
 
   @Test
@@ -529,27 +425,13 @@ public class TestAccessController {
       }
     };
 
-    // all others should be denied
-    verifyDenied(USER_OWNER, action);
-    verifyDenied(USER_RW, action);
-    verifyDenied(USER_RO, action);
-    verifyDenied(USER_NONE, action);
-
-    // verify that superuser can create tables
-    verifyAllowed(SUPERUSER, action);
-    verifyAllowed(USER_ADMIN, action);
+    verifyAllowed(action, SUPERUSER, USER_ADMIN);
+    verifyDenied(action, USER_CREATE, USER_OWNER, USER_RW, USER_RO, USER_NONE);
   }
 
   private void verifyWrite(PrivilegedExceptionAction action) throws Exception {
-    // should be denied
-    verifyDenied(USER_NONE, action);
-    verifyDenied(USER_RO, action);
-
-    // should be allowed
-    verifyAllowed(SUPERUSER, action);
-    verifyAllowed(USER_ADMIN, action);
-    verifyAllowed(USER_OWNER, action);
-    verifyAllowed(USER_RW, action);
+    verifyAllowed(action, SUPERUSER, USER_ADMIN, USER_OWNER, USER_RW);
+    verifyDenied(action, USER_NONE, USER_CREATE, USER_RO);
   }
 
   @Test
@@ -561,11 +443,8 @@ public class TestAccessController {
       }
     };
 
-    // verify that superuser and admin only can split
-    verifyAllowed(action, SUPERUSER, USER_ADMIN, USER_TBLADM);
-
-    // all others should be denied
-    verifyDenied(action, USER_OWNER, USER_RW, USER_RO, USER_NONE);
+    verifyAllowed(action, SUPERUSER, USER_ADMIN, USER_OWNER);
+    verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE);
   }
 
   @Test
@@ -577,11 +456,8 @@ public class TestAccessController {
       }
     };
 
-    // verify that superuser and admin only can flush
-    verifyAllowed(action, SUPERUSER, USER_ADMIN, USER_TBLADM);
-
-    // all others should be denied
-    verifyDenied(action, USER_OWNER, USER_RW, USER_RO, USER_NONE);
+    verifyAllowed(action, SUPERUSER, USER_ADMIN, USER_OWNER);
+    verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE);
   }
 
   @Test
@@ -593,35 +469,18 @@ public class TestAccessController {
       }
     };
 
-    // verify that superuser and admin only can compact
-    verifyAllowed(action, SUPERUSER, USER_ADMIN, USER_TBLADM);
-
-    // all others should be denied
-    verifyDenied(action, USER_OWNER, USER_RW, USER_RO, USER_NONE);
+    verifyAllowed(action, SUPERUSER, USER_ADMIN, USER_OWNER);
+    verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE);
   }
 
   private void verifyRead(PrivilegedExceptionAction action) throws Exception {
-    // should be denied
-    verifyDenied(USER_NONE, action);
-
-    // should be allowed
-    verifyAllowed(SUPERUSER, action);
-    verifyAllowed(USER_ADMIN, action);
-    verifyAllowed(USER_OWNER, action);
-    verifyAllowed(USER_RW, action);
-    verifyAllowed(USER_RO, action);
+    verifyAllowed(action, SUPERUSER, USER_ADMIN, USER_OWNER, USER_RW, USER_RO);
+    verifyDenied(action, USER_NONE, USER_CREATE);
   }
 
   private void verifyReadWrite(PrivilegedExceptionAction action) throws Exception {
-    // should be denied
-    verifyDenied(USER_NONE, action);
-    verifyDenied(USER_RO, action);
-
-    // should be allowed
-    verifyAllowed(SUPERUSER, action);
-    verifyAllowed(USER_ADMIN, action);
-    verifyAllowed(USER_OWNER, action);
-    verifyAllowed(USER_RW, action);
+    verifyAllowed(action, SUPERUSER, USER_ADMIN, USER_OWNER, USER_RW);
+    verifyDenied(action, USER_NONE, USER_CREATE, USER_RO);
   }
 
   @Test
@@ -709,9 +568,8 @@ public class TestAccessController {
         d.deleteFamily(TEST_FAMILY);
 
         HTable t = new HTable(conf, TEST_TABLE);
-        t.checkAndDelete(Bytes.toBytes("random_row"), 
-                         TEST_FAMILY, Bytes.toBytes("q"),
-                         Bytes.toBytes("test_value"), d);
+        t.checkAndDelete(Bytes.toBytes("random_row"), TEST_FAMILY, Bytes.toBytes("q"),
+          Bytes.toBytes("test_value"), d);
         return null;
       }
     };
@@ -724,9 +582,8 @@ public class TestAccessController {
         p.add(TEST_FAMILY, Bytes.toBytes("Qualifier"), Bytes.toBytes(1));
 
         HTable t = new HTable(conf, TEST_TABLE);
-        t.checkAndPut(Bytes.toBytes("random_row"), 
-                      TEST_FAMILY, Bytes.toBytes("q"),
-                      Bytes.toBytes("test_value"), p);
+        t.checkAndPut(Bytes.toBytes("random_row"), TEST_FAMILY, Bytes.toBytes("q"),
+          Bytes.toBytes("test_value"), p);
         return null;
       }
     };
@@ -749,18 +606,18 @@ public class TestAccessController {
     HTableDescriptor htd = new HTableDescriptor(tableName);
     htd.addFamily(new HColumnDescriptor(family1));
     htd.addFamily(new HColumnDescriptor(family2));
-    htd.setOwnerString(USER_OWNER.getShortName());
     admin.createTable(htd);
 
     // create temp users
-    User tblUser = User.createUserForTesting(TEST_UTIL.getConfiguration(), "tbluser", new String[0]);
-    User gblUser = User.createUserForTesting(TEST_UTIL.getConfiguration(), "gbluser", new String[0]);
+    User tblUser = User
+        .createUserForTesting(TEST_UTIL.getConfiguration(), "tbluser", new String[0]);
+    User gblUser = User
+        .createUserForTesting(TEST_UTIL.getConfiguration(), "gbluser", new String[0]);
 
     // perms only stored against the first region
     HTable acl = new HTable(conf, AccessControlLists.ACL_TABLE_NAME);
-    AccessControllerProtocol protocol =
-        acl.coprocessorProxy(AccessControllerProtocol.class,
-            tableName);
+    AccessControllerProtocol protocol = acl.coprocessorProxy(AccessControllerProtocol.class,
+      tableName);
 
     // prepare actions:
     PrivilegedExceptionAction putActionAll = new PrivilegedExceptionAction() {
@@ -860,7 +717,8 @@ public class TestAccessController {
     // grant table read permission
     protocol.grant(new UserPermission(Bytes.toBytes(tblUser.getShortName()), tableName, null,
         Permission.Action.READ));
-    protocol.grant(new UserPermission(Bytes.toBytes(gblUser.getShortName()), Permission.Action.READ));
+    protocol
+        .grant(new UserPermission(Bytes.toBytes(gblUser.getShortName()), Permission.Action.READ));
 
     Thread.sleep(100);
     // check
@@ -905,7 +763,8 @@ public class TestAccessController {
     // grant column family read permission
     protocol.grant(new UserPermission(Bytes.toBytes(tblUser.getShortName()), tableName, family1,
         Permission.Action.READ));
-    protocol.grant(new UserPermission(Bytes.toBytes(gblUser.getShortName()), Permission.Action.READ));
+    protocol
+        .grant(new UserPermission(Bytes.toBytes(gblUser.getShortName()), Permission.Action.READ));
 
     Thread.sleep(100);
 
@@ -959,8 +818,7 @@ public class TestAccessController {
     admin.deleteTable(tableName);
   }
 
-  private boolean hasFoundUserPermission(UserPermission userPermission,
-                                         List<UserPermission> perms) {
+  private boolean hasFoundUserPermission(UserPermission userPermission, List<UserPermission> perms) {
     return perms.contains(userPermission);
   }
 
@@ -981,16 +839,14 @@ public class TestAccessController {
     HTableDescriptor htd = new HTableDescriptor(tableName);
     htd.addFamily(new HColumnDescriptor(family1));
     htd.addFamily(new HColumnDescriptor(family2));
-    htd.setOwnerString(USER_OWNER.getShortName());
     admin.createTable(htd);
 
     // create temp users
-    User user = User.createUserForTesting(TEST_UTIL.getConfiguration(),
-        "user", new String[0]);
+    User user = User.createUserForTesting(TEST_UTIL.getConfiguration(), "user", new String[0]);
 
     HTable acl = new HTable(conf, AccessControlLists.ACL_TABLE_NAME);
-    AccessControllerProtocol protocol =
-        acl.coprocessorProxy(AccessControllerProtocol.class, tableName);
+    AccessControllerProtocol protocol = acl.coprocessorProxy(AccessControllerProtocol.class,
+      tableName);
 
     PrivilegedExceptionAction getQualifierAction = new PrivilegedExceptionAction() {
       public Object run() throws Exception {
@@ -1014,21 +870,20 @@ public class TestAccessController {
       public Object run() throws Exception {
         Delete d = new Delete(Bytes.toBytes("random_row"));
         d.deleteColumn(family1, qualifier);
-        //d.deleteFamily(family1);
+        // d.deleteFamily(family1);
         HTable t = new HTable(conf, tableName);
         t.delete(d);
         return null;
       }
     };
 
-    protocol.revoke(new UserPermission(Bytes.toBytes(user.getShortName()),
-                    tableName, family1));
+    protocol.revoke(new UserPermission(Bytes.toBytes(user.getShortName()), tableName, family1));
     verifyDenied(user, getQualifierAction);
     verifyDenied(user, putQualifierAction);
     verifyDenied(user, deleteQualifierAction);
 
-    protocol.grant(new UserPermission(Bytes.toBytes(user.getShortName()),
-                   tableName, family1, qualifier, Permission.Action.READ));
+    protocol.grant(new UserPermission(Bytes.toBytes(user.getShortName()), tableName, family1,
+        qualifier, Permission.Action.READ));
     Thread.sleep(100);
 
     verifyAllowed(user, getQualifierAction);
@@ -1037,8 +892,8 @@ public class TestAccessController {
 
     // only grant write permission
     // TODO: comment this portion after HBASE-3583
-    protocol.grant(new UserPermission(Bytes.toBytes(user.getShortName()),
-                   tableName, family1, qualifier, Permission.Action.WRITE));
+    protocol.grant(new UserPermission(Bytes.toBytes(user.getShortName()), tableName, family1,
+        qualifier, Permission.Action.WRITE));
     Thread.sleep(100);
 
     verifyDenied(user, getQualifierAction);
@@ -1046,9 +901,8 @@ public class TestAccessController {
     verifyAllowed(user, deleteQualifierAction);
 
     // grant both read and write permission.
-    protocol.grant(new UserPermission(Bytes.toBytes(user.getShortName()),
-                   tableName, family1, qualifier,
-                   Permission.Action.READ, Permission.Action.WRITE));
+    protocol.grant(new UserPermission(Bytes.toBytes(user.getShortName()), tableName, family1,
+        qualifier, Permission.Action.READ, Permission.Action.WRITE));
     Thread.sleep(100);
 
     verifyAllowed(user, getQualifierAction);
@@ -1056,8 +910,8 @@ public class TestAccessController {
     verifyAllowed(user, deleteQualifierAction);
 
     // revoke family level permission won't impact column level.
-    protocol.revoke(new UserPermission(Bytes.toBytes(user.getShortName()),
-                    tableName, family1, qualifier));
+    protocol.revoke(new UserPermission(Bytes.toBytes(user.getShortName()), tableName, family1,
+        qualifier));
     Thread.sleep(100);
 
     verifyDenied(user, getQualifierAction);
@@ -1086,76 +940,86 @@ public class TestAccessController {
     HTableDescriptor htd = new HTableDescriptor(tableName);
     htd.addFamily(new HColumnDescriptor(family1));
     htd.addFamily(new HColumnDescriptor(family2));
-    htd.setOwnerString(USER_OWNER.getShortName());
+    htd.setOwner(USER_OWNER);
     admin.createTable(htd);
 
     HTable acl = new HTable(conf, AccessControlLists.ACL_TABLE_NAME);
-    AccessControllerProtocol protocol =
-        acl.coprocessorProxy(AccessControllerProtocol.class, tableName);
+    AccessControllerProtocol protocol = acl.coprocessorProxy(AccessControllerProtocol.class,
+      tableName);
 
     List<UserPermission> perms = protocol.getUserPermissions(tableName);
 
-    UserPermission up = new UserPermission(user,
-        tableName, family1, qualifier, Permission.Action.READ);
+    UserPermission ownerperm = new UserPermission(Bytes.toBytes(USER_OWNER.getName()), tableName,
+        null, Action.values());
+    assertTrue("Owner should have all permissions on table",
+      hasFoundUserPermission(ownerperm, perms));
+
+    UserPermission up = new UserPermission(user, tableName, family1, qualifier,
+        Permission.Action.READ);
     assertFalse("User should not be granted permission: " + up.toString(),
-        hasFoundUserPermission(up, perms));
+      hasFoundUserPermission(up, perms));
 
     // grant read permission
-    UserPermission upToSet = new UserPermission(user,
-        tableName, family1, qualifier, Permission.Action.READ);
+    UserPermission upToSet = new UserPermission(user, tableName, family1, qualifier,
+        Permission.Action.READ);
     protocol.grant(upToSet);
     perms = protocol.getUserPermissions(tableName);
 
-    UserPermission upToVerify = new UserPermission(user,
-        tableName, family1, qualifier, Permission.Action.READ);
+    UserPermission upToVerify = new UserPermission(user, tableName, family1, qualifier,
+        Permission.Action.READ);
     assertTrue("User should be granted permission: " + upToVerify.toString(),
-        hasFoundUserPermission(upToVerify, perms));
+      hasFoundUserPermission(upToVerify, perms));
 
-    upToVerify = new UserPermission(user, tableName, family1, qualifier,
-        Permission.Action.WRITE);
+    upToVerify = new UserPermission(user, tableName, family1, qualifier, Permission.Action.WRITE);
     assertFalse("User should not be granted permission: " + upToVerify.toString(),
-        hasFoundUserPermission(upToVerify, perms));
+      hasFoundUserPermission(upToVerify, perms));
 
     // grant read+write
-    upToSet = new UserPermission(user, tableName, family1, qualifier,
-        Permission.Action.WRITE, Permission.Action.READ);
+    upToSet = new UserPermission(user, tableName, family1, qualifier, Permission.Action.WRITE,
+        Permission.Action.READ);
     protocol.grant(upToSet);
     perms = protocol.getUserPermissions(tableName);
 
-    upToVerify = new UserPermission(user, tableName, family1, qualifier,
-        Permission.Action.WRITE, Permission.Action.READ);
+    upToVerify = new UserPermission(user, tableName, family1, qualifier, Permission.Action.WRITE,
+        Permission.Action.READ);
     assertTrue("User should be granted permission: " + upToVerify.toString(),
-            hasFoundUserPermission(upToVerify, perms));
+      hasFoundUserPermission(upToVerify, perms));
 
     protocol.revoke(upToSet);
     perms = protocol.getUserPermissions(tableName);
     assertFalse("User should not be granted permission: " + upToVerify.toString(),
       hasFoundUserPermission(upToVerify, perms));
 
-    // delete table
+    // disable table before modification
     admin.disableTable(tableName);
+
+    User newOwner = User.createUserForTesting(conf, "new_owner", new String[] {});
+    htd.setOwner(newOwner);
+    admin.modifyTable(tableName, htd);
+    perms = protocol.getUserPermissions(tableName);
+    UserPermission newOwnerperm = new UserPermission(Bytes.toBytes(newOwner.getName()), tableName,
+        null, Action.values());
+    assertTrue("New owner should have all permissions on table",
+      hasFoundUserPermission(newOwnerperm, perms));
+
+    // delete table
     admin.deleteTable(tableName);
   }
 
-  /** global operations*/
+  /** global operations */
   private void verifyGlobal(PrivilegedExceptionAction<?> action) throws Exception {
-    // should be allowed
-    verifyAllowed(SUPERUSER, action);
+    verifyAllowed(action, SUPERUSER);
 
-    // should be denied
-    verifyDenied(USER_OWNER, action);
-    verifyDenied(USER_RW, action);
-    verifyDenied(USER_NONE, action);
-    verifyDenied(USER_RO, action);
+    verifyDenied(action, USER_CREATE, USER_RW, USER_NONE, USER_RO);
   }
 
   public void checkGlobalPerms(Permission.Action... actions) throws IOException {
     HTable acl = new HTable(conf, AccessControlLists.ACL_TABLE_NAME);
-    AccessControllerProtocol protocol =
-        acl.coprocessorProxy(AccessControllerProtocol.class, new byte[0]);
+    AccessControllerProtocol protocol = acl.coprocessorProxy(AccessControllerProtocol.class,
+      new byte[0]);
 
     Permission[] perms = new Permission[actions.length];
-    for (int i=0; i < actions.length; i++) {
+    for (int i = 0; i < actions.length; i++) {
       perms[i] = new Permission(actions[i]);
     }
 
@@ -1165,34 +1029,34 @@ public class TestAccessController {
   public void checkTablePerms(byte[] table, byte[] family, byte[] column,
       Permission.Action... actions) throws IOException {
     Permission[] perms = new Permission[actions.length];
-    for (int i=0; i < actions.length; i++) {
+    for (int i = 0; i < actions.length; i++) {
       perms[i] = new TablePermission(table, family, column, actions[i]);
     }
 
     checkTablePerms(table, perms);
   }
 
-  public void checkTablePerms(byte[] table, Permission...perms) throws IOException {
+  public void checkTablePerms(byte[] table, Permission... perms) throws IOException {
     HTable acl = new HTable(conf, table);
-    AccessControllerProtocol protocol =
-        acl.coprocessorProxy(AccessControllerProtocol.class, new byte[0]);
+    AccessControllerProtocol protocol = acl.coprocessorProxy(AccessControllerProtocol.class,
+      new byte[0]);
 
     protocol.checkPermissions(perms);
   }
 
-  public void grant(AccessControllerProtocol protocol, User user, byte[] t, byte[] f,
-      byte[] q, Permission.Action... actions) throws IOException {
+  public void grant(AccessControllerProtocol protocol, User user, byte[] t, byte[] f, byte[] q,
+      Permission.Action... actions) throws IOException {
     protocol.grant(new UserPermission(Bytes.toBytes(user.getShortName()), t, f, q, actions));
   }
 
   @Test
   public void testCheckPermissions() throws Exception {
     final HTable acl = new HTable(conf, AccessControlLists.ACL_TABLE_NAME);
-    final AccessControllerProtocol protocol =
-        acl.coprocessorProxy(AccessControllerProtocol.class, TEST_TABLE);
+    final AccessControllerProtocol protocol = acl.coprocessorProxy(AccessControllerProtocol.class,
+      TEST_TABLE);
 
-    //--------------------------------------
-    //test global permissions
+    // --------------------------------------
+    // test global permissions
     PrivilegedExceptionAction<Void> globalAdmin = new PrivilegedExceptionAction<Void>() {
       @Override
       public Void run() throws Exception {
@@ -1200,11 +1064,11 @@ public class TestAccessController {
         return null;
       }
     };
-    //verify that only superuser can admin
+    // verify that only superuser can admin
     verifyGlobal(globalAdmin);
 
-    //--------------------------------------
-    //test multiple permissions
+    // --------------------------------------
+    // test multiple permissions
     PrivilegedExceptionAction<Void> globalReadWrite = new PrivilegedExceptionAction<Void>() {
       @Override
       public Void run() throws Exception {
@@ -1215,8 +1079,8 @@ public class TestAccessController {
 
     verifyGlobal(globalReadWrite);
 
-    //--------------------------------------
-    //table/column/qualifier level permissions
+    // --------------------------------------
+    // table/column/qualifier level permissions
     final byte[] TEST_Q1 = Bytes.toBytes("q1");
     final byte[] TEST_Q2 = Bytes.toBytes("q2");
 
@@ -1256,9 +1120,8 @@ public class TestAccessController {
       @Override
       public Void run() throws Exception {
         checkTablePerms(TEST_TABLE, new Permission[] {
-          new TablePermission(TEST_TABLE, TEST_FAMILY, TEST_Q1, Permission.Action.READ),
-          new TablePermission(TEST_TABLE, TEST_FAMILY, TEST_Q2, Permission.Action.READ),
-        });
+            new TablePermission(TEST_TABLE, TEST_FAMILY, TEST_Q1, Permission.Action.READ),
+            new TablePermission(TEST_TABLE, TEST_FAMILY, TEST_Q2, Permission.Action.READ), });
         return null;
       }
     };
@@ -1266,10 +1129,8 @@ public class TestAccessController {
     PrivilegedExceptionAction<Void> globalAndTableRead = new PrivilegedExceptionAction<Void>() {
       @Override
       public Void run() throws Exception {
-        checkTablePerms(TEST_TABLE, new Permission[] {
-          new Permission(Permission.Action.READ),
-          new TablePermission(TEST_TABLE, null, (byte[])null, Permission.Action.READ),
-        });
+        checkTablePerms(TEST_TABLE, new Permission[] { new Permission(Permission.Action.READ),
+            new TablePermission(TEST_TABLE, null, (byte[]) null, Permission.Action.READ), });
         return null;
       }
     };
@@ -1298,30 +1159,29 @@ public class TestAccessController {
 
     verifyAllowed(noCheck, SUPERUSER, userTable, userColumn, userQualifier);
 
-    //--------------------------------------
-    //test family level multiple permissions
+    // --------------------------------------
+    // test family level multiple permissions
     PrivilegedExceptionAction<Void> familyReadWrite = new PrivilegedExceptionAction<Void>() {
       @Override
       public Void run() throws Exception {
         checkTablePerms(TEST_TABLE, TEST_FAMILY, null, Permission.Action.READ,
-            Permission.Action.WRITE);
+          Permission.Action.WRITE);
         return null;
       }
     };
-    // should be allowed
+
     verifyAllowed(familyReadWrite, SUPERUSER, USER_OWNER, USER_RW);
-    // should be denied
-    verifyDenied(familyReadWrite, USER_NONE, USER_RO);
+    verifyDenied(familyReadWrite, USER_NONE, USER_CREATE, USER_RO);
 
-    //--------------------------------------
-    //check for wrong table region
+    // --------------------------------------
+    // check for wrong table region
     try {
-      //but ask for TablePermissions for TEST_TABLE
-      protocol.checkPermissions(new Permission[] {(Permission) new TablePermission(
-          TEST_TABLE, null, (byte[])null, Permission.Action.CREATE)});
+      // but ask for TablePermissions for TEST_TABLE
+      protocol.checkPermissions(new Permission[] { (Permission) new TablePermission(TEST_TABLE,
+          null, (byte[]) null, Permission.Action.CREATE) });
       fail("this should have thrown CoprocessorException");
-    } catch(CoprocessorException ex) {
-      //expected
+    } catch (CoprocessorException ex) {
+      // expected
     }
 
   }