You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2015/04/10 00:56:09 UTC

[6/8] hbase git commit: HBASE-13275 Setting hbase.security.authorization to false does not disable authorization

HBASE-13275 Setting hbase.security.authorization to false does not disable authorization

Conflicts:
	hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/356422e1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/356422e1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/356422e1

Branch: refs/heads/branch-1
Commit: 356422e1aea8c5fb9b3ba063658c27c682759835
Parents: c73f1a5
Author: Andrew Purtell <ap...@apache.org>
Authored: Thu Apr 9 13:51:12 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Thu Apr 9 14:43:29 2015 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/security/User.java  |    2 +
 .../hbase/security/access/AccessController.java |  307 +++---
 .../visibility/VisibilityController.java        |  100 +-
 .../hbase/security/HBaseKerberosUtils.java      |    8 +-
 .../hbase/security/access/SecureTestUtil.java   |   78 +-
 .../security/access/TestAccessController.java   |   88 +-
 .../access/TestWithDisabledAuthorization.java   | 1031 ++++++++++++++++++
 .../TestWithDisabledAuthorization.java          |  236 ++++
 8 files changed, 1629 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/356422e1/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java
index a5ac51a..58a3c66 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java
@@ -52,6 +52,8 @@ import org.apache.hadoop.security.token.TokenIdentifier;
 public abstract class User {
   public static final String HBASE_SECURITY_CONF_KEY =
       "hbase.security.authentication";
+  public static final String HBASE_SECURITY_AUTHORIZATION_CONF_KEY =
+      "hbase.security.authorization";
 
   protected UserGroupInformation ugi;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/356422e1/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index aedc5a8..1427f2b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -165,11 +165,11 @@ public class AccessController extends BaseMasterAndRegionObserver
 
   TableAuthManager authManager = null;
 
-  // flags if we are running on a region of the _acl_ table
+  /** flags if we are running on a region of the _acl_ table */
   boolean aclRegion = false;
 
-  // defined only for Endpoint implementation, so it can have way to
-  // access region services.
+  /** defined only for Endpoint implementation, so it can have way to
+   access region services */
   private RegionCoprocessorEnvironment regionEnv;
 
   /** Mapping of scanner instances to the user who created them */
@@ -178,25 +178,30 @@ public class AccessController extends BaseMasterAndRegionObserver
 
   private Map<TableName, List<UserPermission>> tableAcls;
 
-  // Provider for mapping principal names to Users
+  /** Provider for mapping principal names to Users */
   private UserProvider userProvider;
 
-  // The list of users with superuser authority
+  /** The list of users with superuser authority */
   private List<String> superusers;
 
-  // if we are able to support cell ACLs
+  /** if we are active, usually true, only not true if "hbase.security.authorization"
+   has been set to false in site configuration */
+  boolean authorizationEnabled;
+
+  /** if we are able to support cell ACLs */
   boolean cellFeaturesEnabled;
 
-  // if we should check EXEC permissions
+  /** if we should check EXEC permissions */
   boolean shouldCheckExecPermission;
 
-  // if we should terminate access checks early as soon as table or CF grants
-  // allow access; pre-0.98 compatible behavior
+  /** if we should terminate access checks early as soon as table or CF grants
+    allow access; pre-0.98 compatible behavior */
   boolean compatibleEarlyTermination;
 
+  /** if we have been successfully initialized */
   private volatile boolean initialized = false;
 
-  // This boolean having relevance only in the Master.
+  /** if the ACL table is available, only relevant in the master */
   private volatile boolean aclTabAvailable = false;
 
   public Region getRegion() {
@@ -405,8 +410,8 @@ public class AccessController extends BaseMasterAndRegionObserver
    * @throws IOException if obtaining the current user fails
    * @throws AccessDeniedException if user has no authorization
    */
-  private void requirePermission(String request, TableName tableName, byte[] family, byte[] qualifier,
-      Action... permissions) throws IOException {
+  private void requirePermission(String request, TableName tableName, byte[] family,
+      byte[] qualifier, Action... permissions) throws IOException {
     User user = getActiveUser();
     AuthResult result = null;
 
@@ -422,7 +427,7 @@ public class AccessController extends BaseMasterAndRegionObserver
       }
     }
     logResult(result);
-    if (!result.isAllowed()) {
+    if (authorizationEnabled && !result.isAllowed()) {
       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
     }
   }
@@ -455,7 +460,7 @@ public class AccessController extends BaseMasterAndRegionObserver
       }
     }
     logResult(result);
-    if (!result.isAllowed()) {
+    if (authorizationEnabled && !result.isAllowed()) {
       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
     }
   }
@@ -485,7 +490,7 @@ public class AccessController extends BaseMasterAndRegionObserver
       }
     }
     logResult(result);
-    if (!result.isAllowed()) {
+    if (authorizationEnabled && !result.isAllowed()) {
       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
     }
   }
@@ -501,31 +506,6 @@ public class AccessController extends BaseMasterAndRegionObserver
   }
 
   /**
-   * Authorizes that the current user has permission to perform the given
-   * action on the set of table column families.
-   * @param perm Action that is required
-   * @param env The current coprocessor environment
-   * @param families The map of column families-qualifiers.
-   * @throws AccessDeniedException if the authorization check failed
-   */
-  private void requirePermission(String request, Action perm,
-        RegionCoprocessorEnvironment env,
-        Map<byte[], ? extends Collection<?>> families)
-      throws IOException {
-    User user = getActiveUser();
-    AuthResult result = permissionGranted(request, user, perm, env, families);
-    logResult(result);
-
-    if (!result.isAllowed()) {
-      throw new AccessDeniedException("Insufficient permissions (table=" +
-        env.getRegion().getTableDesc().getTableName()+
-        ((families != null && families.size() > 0) ? ", family: " +
-        result.toFamilyString() : "") + ", action=" +
-        perm.toString() + ")");
-    }
-  }
-
-  /**
    * Checks that the user has the given global permission. The generated
    * audit log message will contain context information for the operation
    * being authorized, based on the given parameters.
@@ -545,9 +525,11 @@ public class AccessController extends BaseMasterAndRegionObserver
       result = AuthResult.deny(request, "Global check failed", user, perm, tableName, familyMap);
       result.getParams().setTableName(tableName).setFamilies(familyMap);
       logResult(result);
-      throw new AccessDeniedException("Insufficient permissions for user '" +
+      if (authorizationEnabled) {
+        throw new AccessDeniedException("Insufficient permissions for user '" +
           (user != null ? user.getShortName() : "null") +"' (global, action=" +
           perm.toString() + ")");
+      }
     }
   }
 
@@ -570,9 +552,11 @@ public class AccessController extends BaseMasterAndRegionObserver
       authResult = AuthResult.deny(request, "Global check failed", user, perm, null);
       authResult.getParams().setNamespace(namespace);
       logResult(authResult);
-      throw new AccessDeniedException("Insufficient permissions for user '" +
+      if (authorizationEnabled) {
+        throw new AccessDeniedException("Insufficient permissions for user '" +
           (user != null ? user.getShortName() : "null") +"' (global, action=" +
           perm.toString() + ")");
+      }
     }
   }
 
@@ -598,7 +582,7 @@ public class AccessController extends BaseMasterAndRegionObserver
       }
     }
     logResult(result);
-    if (!result.isAllowed()) {
+    if (authorizationEnabled && !result.isAllowed()) {
       throw new AccessDeniedException("Insufficient permissions "
           + result.toContextString());
     }
@@ -629,7 +613,7 @@ public class AccessController extends BaseMasterAndRegionObserver
       }
     }
     logResult(result);
-    if (!result.isAllowed()) {
+    if (authorizationEnabled && !result.isAllowed()) {
       throw new AccessDeniedException("Insufficient permissions "
           + result.toContextString());
     }
@@ -764,6 +748,8 @@ public class AccessController extends BaseMasterAndRegionObserver
             }
           }
         }
+      } else if (entry.getValue() == null) {
+        get.addFamily(col);        
       } else {
         throw new RuntimeException("Unhandled collection type " +
           entry.getValue().getClass().getName());
@@ -899,8 +885,14 @@ public class AccessController extends BaseMasterAndRegionObserver
   // Checks whether incoming cells contain any tag with type as ACL_TAG_TYPE. This tag
   // type is reserved and should not be explicitly set by user.
   private void checkForReservedTagPresence(User user, Mutation m) throws IOException {
+    // No need to check if we're not going to throw
+    if (!authorizationEnabled) {
+      m.setAttribute(TAG_CHECK_PASSED, TRUE);
+      return;
+    }
     // Superusers are allowed to store cells unconditionally.
     if (superusers.contains(user.getShortName())) {
+      m.setAttribute(TAG_CHECK_PASSED, TRUE);
       return;
     }
     // We already checked (prePut vs preBatchMutation)
@@ -928,6 +920,11 @@ public class AccessController extends BaseMasterAndRegionObserver
     CompoundConfiguration conf = new CompoundConfiguration();
     conf.add(env.getConfiguration());
 
+    authorizationEnabled = conf.getBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true);
+    if (!authorizationEnabled) {
+      LOG.warn("The AccessController has been loaded with authorization checks disabled.");
+    }
+
     shouldCheckExecPermission = conf.getBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY,
       AccessControlConstants.DEFAULT_EXEC_PERMISSION_CHECKS);
 
@@ -1064,6 +1061,7 @@ public class AccessController extends BaseMasterAndRegionObserver
   public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c,
       final TableName tableName) throws IOException {
     requirePermission("truncateTable", tableName, null, null, Action.ADMIN, Action.CREATE);
+
     final Configuration conf = c.getEnvironment().getConfiguration();
     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
       @Override
@@ -1163,8 +1161,12 @@ public class AccessController extends BaseMasterAndRegionObserver
   public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
       throws IOException {
     if (Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
+      // We have to unconditionally disallow disable of the ACL table when we are installed,
+      // even if not enforcing authorizations. We are still allowing grants and revocations,
+      // checking permissions and logging audit messages, etc. If the ACL table is not
+      // available we will fail random actions all over the place.
       throw new AccessDeniedException("Not allowed to disable "
-          + AccessControlLists.ACL_TABLE_NAME + " table.");
+          + AccessControlLists.ACL_TABLE_NAME + " table with AccessController installed");
     }
     requirePermission("disableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
   }
@@ -1262,6 +1264,7 @@ public class AccessController extends BaseMasterAndRegionObserver
       final SnapshotDescription snapshot) throws IOException {
     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
       // Snapshot owner is allowed to delete the snapshot
+      // TODO: We are not logging this for audit
     } else {
       requirePermission("deleteSnapshot", Action.ADMIN);
     }
@@ -1429,8 +1432,9 @@ public class AccessController extends BaseMasterAndRegionObserver
       authResult.setReason("Covering cell set");
     }
     logResult(authResult);
-    if (!authResult.isAllowed()) {
-      throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
+    if (authorizationEnabled && !authResult.isAllowed()) {
+      throw new AccessDeniedException("Insufficient permissions " +
+        authResult.toContextString());
     }
   }
 
@@ -1473,26 +1477,29 @@ public class AccessController extends BaseMasterAndRegionObserver
         // grants three times (permissionGranted above, here, and in the
         // filter) but that's the price of backwards compatibility.
         if (hasFamilyQualifierPermission(user, Action.READ, env, families)) {
-          Filter ourFilter = new AccessControlFilter(authManager, user, table,
-            AccessControlFilter.Strategy.CHECK_TABLE_AND_CF_ONLY,
-            cfVsMaxVersions);
-          // wrap any existing filter
-          if (filter != null) {
-            ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
-              Lists.newArrayList(ourFilter, filter));
-          }
           authResult.setAllowed(true);
           authResult.setReason("Access allowed with filter");
-          switch (opType) {
-          case GET:
-          case EXISTS:
-            ((Get)query).setFilter(ourFilter);
-            break;
-          case SCAN:
-            ((Scan)query).setFilter(ourFilter);
-            break;
-          default:
-            throw new RuntimeException("Unhandled operation " + opType);
+          // Only wrap the filter if we are enforcing authorizations
+          if (authorizationEnabled) {
+            Filter ourFilter = new AccessControlFilter(authManager, user, table,
+              AccessControlFilter.Strategy.CHECK_TABLE_AND_CF_ONLY,
+              cfVsMaxVersions);
+            // wrap any existing filter
+            if (filter != null) {
+              ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
+                Lists.newArrayList(ourFilter, filter));
+            }
+            switch (opType) {
+              case GET:
+              case EXISTS:
+                ((Get)query).setFilter(ourFilter);
+                break;
+              case SCAN:
+                ((Scan)query).setFilter(ourFilter);
+                break;
+              default:
+                throw new RuntimeException("Unhandled operation " + opType);
+            }
           }
         }
       } else {
@@ -1500,31 +1507,34 @@ public class AccessController extends BaseMasterAndRegionObserver
         // than whole table or CF. Simply inject a filter and return what is
         // allowed. We will not throw an AccessDeniedException. This is a
         // behavioral change since 0.96.
-        Filter ourFilter = new AccessControlFilter(authManager, user, table,
-          AccessControlFilter.Strategy.CHECK_CELL_DEFAULT, cfVsMaxVersions);
-        // wrap any existing filter
-        if (filter != null) {
-          ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
-            Lists.newArrayList(ourFilter, filter));
-        }
         authResult.setAllowed(true);
         authResult.setReason("Access allowed with filter");
-        switch (opType) {
-        case GET:
-        case EXISTS:
-          ((Get)query).setFilter(ourFilter);
-          break;
-        case SCAN:
-          ((Scan)query).setFilter(ourFilter);
-          break;
-        default:
-          throw new RuntimeException("Unhandled operation " + opType);
+        // Only wrap the filter if we are enforcing authorizations
+        if (authorizationEnabled) {
+          Filter ourFilter = new AccessControlFilter(authManager, user, table,
+            AccessControlFilter.Strategy.CHECK_CELL_DEFAULT, cfVsMaxVersions);
+          // wrap any existing filter
+          if (filter != null) {
+            ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
+              Lists.newArrayList(ourFilter, filter));
+          }
+          switch (opType) {
+            case GET:
+            case EXISTS:
+              ((Get)query).setFilter(ourFilter);
+              break;
+            case SCAN:
+              ((Scan)query).setFilter(ourFilter);
+              break;
+            default:
+              throw new RuntimeException("Unhandled operation " + opType);
+          }
         }
       }
     }
 
     logResult(authResult);
-    if (!authResult.isAllowed()) {
+    if (authorizationEnabled && !authResult.isAllowed()) {
       throw new AccessDeniedException("Insufficient permissions (table=" + table +
         ", action=READ)");
     }
@@ -1547,14 +1557,15 @@ public class AccessController extends BaseMasterAndRegionObserver
   public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
       final Put put, final WALEdit edit, final Durability durability)
       throws IOException {
+    User user = getActiveUser();
+    checkForReservedTagPresence(user, put);
+
     // Require WRITE permission to the table, CF, or top visible value, if any.
     // NOTE: We don't need to check the permissions for any earlier Puts
     // because we treat the ACLs in each Put as timestamped like any other
     // HBase value. A new ACL in a new Put applies to that Put. It doesn't
     // change the ACL of any previous Put. This allows simple evolution of
     // security policy over time without requiring expensive updates.
-    User user = getActiveUser();
-    checkForReservedTagPresence(user, put);
     RegionCoprocessorEnvironment env = c.getEnvironment();
     Map<byte[],? extends Collection<Cell>> families = put.getFamilyCellMap();
     AuthResult authResult = permissionGranted(OpType.PUT, user, env, families, Action.WRITE);
@@ -1562,10 +1573,11 @@ public class AccessController extends BaseMasterAndRegionObserver
     if (!authResult.isAllowed()) {
       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
         put.setAttribute(CHECK_COVERING_PERM, TRUE);
-      } else {
+      } else if (authorizationEnabled) {
         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
       }
     }
+
     // Add cell ACLs from the operation to the cells themselves
     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
     if (bytes != null) {
@@ -1606,8 +1618,9 @@ public class AccessController extends BaseMasterAndRegionObserver
     if (!authResult.isAllowed()) {
       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
-      } else {
-        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
+      } else if (authorizationEnabled) {
+        throw new AccessDeniedException("Insufficient permissions " +
+          authResult.toContextString());
       }
     }
   }
@@ -1630,18 +1643,18 @@ public class AccessController extends BaseMasterAndRegionObserver
             opType = OpType.DELETE;
           }
           AuthResult authResult = null;
-          if (checkCoveringPermission(opType, c.getEnvironment(), m.getRow(), m.getFamilyCellMap(),
-              m.getTimeStamp(), Action.WRITE)) {
-            authResult = AuthResult.allow(opType.toString(), "Covering cell set", getActiveUser(),
-                Action.WRITE, table, m.getFamilyCellMap());
+          if (checkCoveringPermission(opType, c.getEnvironment(), m.getRow(),
+            m.getFamilyCellMap(), m.getTimeStamp(), Action.WRITE)) {
+            authResult = AuthResult.allow(opType.toString(), "Covering cell set",
+              getActiveUser(), Action.WRITE, table, m.getFamilyCellMap());
           } else {
-            authResult = AuthResult.deny(opType.toString(), "Covering cell set", getActiveUser(),
-                Action.WRITE, table, m.getFamilyCellMap());
+            authResult = AuthResult.deny(opType.toString(), "Covering cell set",
+              getActiveUser(), Action.WRITE, table, m.getFamilyCellMap());
           }
           logResult(authResult);
-          if (!authResult.isAllowed()) {
+          if (authorizationEnabled && !authResult.isAllowed()) {
             throw new AccessDeniedException("Insufficient permissions "
-                + authResult.toContextString());
+              + authResult.toContextString());
           }
         }
       }
@@ -1663,9 +1676,10 @@ public class AccessController extends BaseMasterAndRegionObserver
       final CompareFilter.CompareOp compareOp,
       final ByteArrayComparable comparator, final Put put,
       final boolean result) throws IOException {
-    // Require READ and WRITE permissions on the table, CF, and KV to update
     User user = getActiveUser();
     checkForReservedTagPresence(user, put);
+
+    // Require READ and WRITE permissions on the table, CF, and KV to update
     RegionCoprocessorEnvironment env = c.getEnvironment();
     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
     AuthResult authResult = permissionGranted(OpType.CHECK_AND_PUT, user, env, families,
@@ -1674,10 +1688,12 @@ public class AccessController extends BaseMasterAndRegionObserver
     if (!authResult.isAllowed()) {
       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
         put.setAttribute(CHECK_COVERING_PERM, TRUE);
-      } else {
-        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
+      } else if (authorizationEnabled) {
+        throw new AccessDeniedException("Insufficient permissions " +
+          authResult.toContextString());
       }
     }
+
     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
     if (bytes != null) {
       if (cellFeaturesEnabled) {
@@ -1709,7 +1725,7 @@ public class AccessController extends BaseMasterAndRegionObserver
             getActiveUser(), Action.READ, table, families);
       }
       logResult(authResult);
-      if (!authResult.isAllowed()) {
+      if (authorizationEnabled && !authResult.isAllowed()) {
         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
       }
     }
@@ -1738,8 +1754,9 @@ public class AccessController extends BaseMasterAndRegionObserver
     if (!authResult.isAllowed()) {
       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
-      } else {
-        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
+      } else if (authorizationEnabled) {
+        throw new AccessDeniedException("Insufficient permissions " +
+          authResult.toContextString());
       }
     }
     return result;
@@ -1766,7 +1783,7 @@ public class AccessController extends BaseMasterAndRegionObserver
             getActiveUser(), Action.READ, table, families);
       }
       logResult(authResult);
-      if (!authResult.isAllowed()) {
+      if (authorizationEnabled && !authResult.isAllowed()) {
         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
       }
     }
@@ -1791,7 +1808,7 @@ public class AccessController extends BaseMasterAndRegionObserver
       authResult.setReason("Covering cell set");
     }
     logResult(authResult);
-    if (!authResult.isAllowed()) {
+    if (authorizationEnabled && !authResult.isAllowed()) {
       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
     }
     return -1;
@@ -1800,9 +1817,10 @@ public class AccessController extends BaseMasterAndRegionObserver
   @Override
   public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
       throws IOException {
-    // Require WRITE permission to the table, CF, and the KV to be appended
     User user = getActiveUser();
     checkForReservedTagPresence(user, append);
+
+    // Require WRITE permission to the table, CF, and the KV to be appended
     RegionCoprocessorEnvironment env = c.getEnvironment();
     Map<byte[],? extends Collection<Cell>> families = append.getFamilyCellMap();
     AuthResult authResult = permissionGranted(OpType.APPEND, user, env, families, Action.WRITE);
@@ -1810,10 +1828,12 @@ public class AccessController extends BaseMasterAndRegionObserver
     if (!authResult.isAllowed()) {
       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
         append.setAttribute(CHECK_COVERING_PERM, TRUE);
-      } else {
-        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
+      } else if (authorizationEnabled)  {
+        throw new AccessDeniedException("Insufficient permissions " +
+          authResult.toContextString());
       }
     }
+
     byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
     if (bytes != null) {
       if (cellFeaturesEnabled) {
@@ -1822,6 +1842,7 @@ public class AccessController extends BaseMasterAndRegionObserver
         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
       }
     }
+
     return null;
   }
 
@@ -1842,8 +1863,9 @@ public class AccessController extends BaseMasterAndRegionObserver
             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
       }
       logResult(authResult);
-      if (!authResult.isAllowed()) {
-        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
+      if (authorizationEnabled && !authResult.isAllowed()) {
+        throw new AccessDeniedException("Insufficient permissions " +
+          authResult.toContextString());
       }
     }
     return null;
@@ -1853,10 +1875,11 @@ public class AccessController extends BaseMasterAndRegionObserver
   public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
       final Increment increment)
       throws IOException {
-    // Require WRITE permission to the table, CF, and the KV to be replaced by
-    // the incremented value
     User user = getActiveUser();
     checkForReservedTagPresence(user, increment);
+
+    // Require WRITE permission to the table, CF, and the KV to be replaced by
+    // the incremented value
     RegionCoprocessorEnvironment env = c.getEnvironment();
     Map<byte[],? extends Collection<Cell>> families = increment.getFamilyCellMap();
     AuthResult authResult = permissionGranted(OpType.INCREMENT, user, env, families,
@@ -1865,10 +1888,12 @@ public class AccessController extends BaseMasterAndRegionObserver
     if (!authResult.isAllowed()) {
       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
         increment.setAttribute(CHECK_COVERING_PERM, TRUE);
-      } else {
-        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
+      } else if (authorizationEnabled) {
+        throw new AccessDeniedException("Insufficient permissions " +
+          authResult.toContextString());
       }
     }
+
     byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
     if (bytes != null) {
       if (cellFeaturesEnabled) {
@@ -1877,6 +1902,7 @@ public class AccessController extends BaseMasterAndRegionObserver
         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
       }
     }
+
     return null;
   }
 
@@ -1897,8 +1923,9 @@ public class AccessController extends BaseMasterAndRegionObserver
             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
       }
       logResult(authResult);
-      if (!authResult.isAllowed()) {
-        throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
+      if (authorizationEnabled && !authResult.isAllowed()) {
+        throw new AccessDeniedException("Insufficient permissions " +
+          authResult.toContextString());
       }
     }
     return null;
@@ -1980,7 +2007,8 @@ public class AccessController extends BaseMasterAndRegionObserver
   public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
       final Scan scan, final RegionScanner s) throws IOException {
     User user = getActiveUser();
-    if (user != null && user.getShortName() != null) {      // store reference to scanner owner for later checks
+    if (user != null && user.getShortName() != null) {
+      // store reference to scanner owner for later checks
       scannerOwners.put(s, user.getShortName());
     }
     return s;
@@ -2015,7 +2043,7 @@ public class AccessController extends BaseMasterAndRegionObserver
   private void requireScannerOwner(InternalScanner s) throws AccessDeniedException {
     String requestUserName = RpcServer.getRequestUserName();
     String owner = scannerOwners.get(s);
-    if (owner != null && !owner.equals(requestUserName)) {
+    if (authorizationEnabled && owner != null && !owner.equals(requestUserName)) {
       throw new AccessDeniedException("User '"+ requestUserName +"' is not the scanner owner!");
     }
   }
@@ -2109,11 +2137,11 @@ public class AccessController extends BaseMasterAndRegionObserver
           case Global :
           case Table :
             requirePermission("grant", perm.getTableName(), perm.getFamily(),
-                perm.getQualifier(), Action.ADMIN);
+              perm.getQualifier(), Action.ADMIN);
             break;
           case Namespace :
             requireGlobalPermission("grant", Action.ADMIN, perm.getNamespace());
-            break;
+           break;
         }
 
         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
@@ -2160,7 +2188,7 @@ public class AccessController extends BaseMasterAndRegionObserver
           case Global :
           case Table :
             requirePermission("revoke", perm.getTableName(), perm.getFamily(),
-                              perm.getQualifier(), Action.ADMIN);
+              perm.getQualifier(), Action.ADMIN);
             break;
           case Namespace :
             requireGlobalPermission("revoke", Action.ADMIN, perm.getNamespace());
@@ -2254,9 +2282,12 @@ public class AccessController extends BaseMasterAndRegionObserver
     }
     AccessControlProtos.CheckPermissionsResponse response = null;
     try {
+      User user = getActiveUser();
       TableName tableName = regionEnv.getRegion().getTableDesc().getTableName();
       for (Permission permission : permissions) {
         if (permission instanceof TablePermission) {
+          // Check table permissions
+
           TablePermission tperm = (TablePermission) permission;
           for (Action action : permission.getActions()) {
             if (!tperm.getTableName().equals(tableName)) {
@@ -2266,7 +2297,8 @@ public class AccessController extends BaseMasterAndRegionObserver
                   tperm.getTableName()));
             }
 
-            Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
+            Map<byte[], Set<byte[]>> familyMap =
+                new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
             if (tperm.getFamily() != null) {
               if (tperm.getQualifier() != null) {
                 Set<byte[]> qualifiers = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
@@ -2277,12 +2309,37 @@ public class AccessController extends BaseMasterAndRegionObserver
               }
             }
 
-            requirePermission("checkPermissions", action, regionEnv, familyMap);
+            AuthResult result = permissionGranted("checkPermissions", user, action, regionEnv,
+              familyMap);
+            logResult(result);
+            if (!result.isAllowed()) {
+              // Even if passive we need to throw an exception here, we support checking
+              // effective permissions, so throw unconditionally
+              throw new AccessDeniedException("Insufficient permissions (table=" + tableName +
+                (familyMap.size() > 0 ? ", family: " + result.toFamilyString() : "") +
+                ", action=" + action.toString() + ")");
+            }
           }
 
         } else {
+          // Check global permissions
+
           for (Action action : permission.getActions()) {
-            requirePermission("checkPermissions", action);
+            AuthResult result;
+            if (authManager.authorize(user, action)) {
+              result = AuthResult.allow("checkPermissions", "Global action allowed", user,
+                action, null, null);
+            } else {
+              result = AuthResult.deny("checkPermissions", "Global action denied", user, action,
+                null, null);
+            }
+            logResult(result);
+            if (!result.isAllowed()) {
+              // Even if passive we need to throw an exception here, we support checking
+              // effective permissions, so throw unconditionally
+              throw new AccessDeniedException("Insufficient permissions (action=" +
+                action.toString() + ")");
+            }
           }
         }
       }
@@ -2325,6 +2382,10 @@ public class AccessController extends BaseMasterAndRegionObserver
   }
 
   private void isSystemOrSuperUser(Configuration conf) throws IOException {
+    // No need to check if we're not going to throw
+    if (!authorizationEnabled) {
+      return;
+    }
     User user = userProvider.getCurrent();
     if (user == null) {
       throw new IOException("Unable to obtain the current user, " +

http://git-wip-us.apache.org/repos/asf/hbase/blob/356422e1/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
index e94692f..b981501 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
@@ -125,7 +125,7 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
   // flags if we are running on a region of the 'labels' table
   private boolean labelsRegion = false;
   // Flag denoting whether AcessController is available or not.
-  private boolean acOn = false;
+  private boolean accessControllerAvailable = false;
   private Configuration conf;
   private volatile boolean initialized = false;
   private boolean checkAuths = false;
@@ -137,6 +137,10 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
   private List<String> superGroups;
   private VisibilityLabelService visibilityLabelService;
 
+  /** if we are active, usually true, only not true if "hbase.security.authorization"
+    has been set to false in site configuration */
+  boolean authorizationEnabled;
+
   // Add to this list if there are any reserved tag types
   private static ArrayList<Byte> RESERVED_VIS_TAG_TYPES = new ArrayList<Byte>();
   static {
@@ -148,6 +152,12 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
   @Override
   public void start(CoprocessorEnvironment env) throws IOException {
     this.conf = env.getConfiguration();
+
+    authorizationEnabled = conf.getBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true);
+    if (!authorizationEnabled) {
+      LOG.warn("The VisibilityController has been loaded with authorization checks disabled.");
+    }
+
     if (HFile.getFormatVersion(conf) < HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
       throw new RuntimeException("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
         + " is required to persist visibility labels. Consider setting " + HFile.FORMAT_VERSION_KEY
@@ -200,6 +210,9 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
   @Override
   public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
       TableName tableName, HTableDescriptor htd) throws IOException {
+    if (!authorizationEnabled) {
+      return;
+    }
     if (LABELS_TABLE_NAME.equals(tableName)) {
       throw new ConstraintException("Cannot alter " + LABELS_TABLE_NAME);
     }
@@ -208,6 +221,9 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
   @Override
   public void preAddColumn(ObserverContext<MasterCoprocessorEnvironment> ctx, TableName tableName,
       HColumnDescriptor column) throws IOException {
+    if (!authorizationEnabled) {
+      return;
+    }
     if (LABELS_TABLE_NAME.equals(tableName)) {
       throw new ConstraintException("Cannot alter " + LABELS_TABLE_NAME);
     }
@@ -216,6 +232,9 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
   @Override
   public void preModifyColumn(ObserverContext<MasterCoprocessorEnvironment> ctx,
       TableName tableName, HColumnDescriptor descriptor) throws IOException {
+    if (!authorizationEnabled) {
+      return;
+    }
     if (LABELS_TABLE_NAME.equals(tableName)) {
       throw new ConstraintException("Cannot alter " + LABELS_TABLE_NAME);
     }
@@ -224,6 +243,9 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
   @Override
   public void preDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> ctx,
       TableName tableName, byte[] c) throws IOException {
+    if (!authorizationEnabled) {
+      return;
+    }
     if (LABELS_TABLE_NAME.equals(tableName)) {
       throw new ConstraintException("Cannot alter " + LABELS_TABLE_NAME);
     }
@@ -232,6 +254,9 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
   @Override
   public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> ctx, TableName tableName)
       throws IOException {
+    if (!authorizationEnabled) {
+      return;
+    }
     if (LABELS_TABLE_NAME.equals(tableName)) {
       throw new ConstraintException("Cannot disable " + LABELS_TABLE_NAME);
     }
@@ -244,7 +269,8 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
     // Read the entire labels table and populate the zk
     if (e.getEnvironment().getRegion().getRegionInfo().getTable().equals(LABELS_TABLE_NAME)) {
       this.labelsRegion = true;
-      this.acOn = CoprocessorHost.getLoadedCoprocessors().contains(AccessController.class.getName());
+      this.accessControllerAvailable = CoprocessorHost.getLoadedCoprocessors()
+          .contains(AccessController.class.getName());
       // Defer the init of VisibilityLabelService on labels region until it is in recovering state.
       if (!e.getEnvironment().getRegion().isRecovering()) {
         initVisibilityLabelService(e.getEnvironment());
@@ -298,9 +324,12 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
       for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
         pair = checkForReservedVisibilityTagPresence(cellScanner.current(), pair);
         if (!pair.getFirst()) {
-          miniBatchOp.setOperationStatus(i, new OperationStatus(SANITY_CHECK_FAILURE,
+          // Don't disallow reserved tags if authorization is disabled
+          if (authorizationEnabled) {
+            miniBatchOp.setOperationStatus(i, new OperationStatus(SANITY_CHECK_FAILURE,
               "Mutation contains cell with reserved type tag"));
-          sanityFailure = true;
+            sanityFailure = true;
+          }
           break;
         } else {
           // Indicates that the cell has a the tag which was modified in the src replication cluster
@@ -319,7 +348,7 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
           List<Tag> visibilityTags = labelCache.get(labelsExp);
           if (visibilityTags == null) {
             // Don't check user auths for labels with Mutations when the user is super user
-            boolean authCheck = this.checkAuths && !(isSystemOrSuperUser());
+            boolean authCheck = authorizationEnabled && checkAuths && !(isSystemOrSuperUser());
             try {
               visibilityTags = this.visibilityLabelService.createVisibilityExpTags(labelsExp, true,
                   authCheck);
@@ -366,6 +395,11 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
   public void prePrepareTimeStampForDeleteVersion(
       ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation delete, Cell cell,
       byte[] byteNow, Get get) throws IOException {
+    // Nothing to do if we are not filtering by visibility
+    if (!authorizationEnabled) {
+      return;
+    }
+
     CellVisibility cellVisibility = null;
     try {
       cellVisibility = delete.getCellVisibility();
@@ -514,6 +548,10 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
     if (!initialized) {
       throw new VisibilityControllerNotReadyException("VisibilityController not yet initialized!");
     }
+    // Nothing to do if authorization is not enabled
+    if (!authorizationEnabled) {
+      return s;
+    }
     Region region = e.getEnvironment().getRegion();
     Authorizations authorizations = null;
     try {
@@ -548,6 +586,10 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
   public DeleteTracker postInstantiateDeleteTracker(
       ObserverContext<RegionCoprocessorEnvironment> ctx, DeleteTracker delTracker)
       throws IOException {
+    // Nothing to do if we are not filtering by visibility
+    if (!authorizationEnabled) {
+      return delTracker;
+    }
     Region region = ctx.getEnvironment().getRegion();
     TableName table = region.getRegionInfo().getTable();
     if (table.isSystemTable()) {
@@ -600,16 +642,20 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
     // This is duplicated code!
     String requestUName = RpcServer.getRequestUserName();
     String owner = scannerOwners.get(s);
-    if (owner != null && !owner.equals(requestUName)) {
+    if (authorizationEnabled && owner != null && !owner.equals(requestUName)) {
       throw new AccessDeniedException("User '" + requestUName + "' is not the scanner owner!");
     }
   }
 
   @Override
-  public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results)
-      throws IOException {
+  public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get,
+      List<Cell> results) throws IOException {
     if (!initialized) {
-      throw new VisibilityControllerNotReadyException("VisibilityController not yet initialized!");
+      throw new VisibilityControllerNotReadyException("VisibilityController not yet initialized");
+    }
+    // Nothing useful to do if authorization is not enabled
+    if (!authorizationEnabled) {
+      return;
     }
     Region region = e.getEnvironment().getRegion();
     Authorizations authorizations = null;
@@ -658,6 +704,10 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
   @Override
   public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> e, Append append)
       throws IOException {
+    // If authorization is not enabled, we don't care about reserved tags
+    if (!authorizationEnabled) {
+      return null;
+    }
     for (CellScanner cellScanner = append.cellScanner(); cellScanner.advance();) {
       if (!checkForReservedVisibilityTagPresence(cellScanner.current())) {
         throw new FailedSanityCheckException("Append contains cell with reserved type tag");
@@ -669,6 +719,10 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
   @Override
   public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> e, Increment increment)
       throws IOException {
+    // If authorization is not enabled, we don't care about reserved tags
+    if (!authorizationEnabled) {
+      return null;
+    }
     for (CellScanner cellScanner = increment.cellScanner(); cellScanner.advance();) {
       if (!checkForReservedVisibilityTagPresence(cellScanner.current())) {
         throw new FailedSanityCheckException("Increment contains cell with reserved type tag");
@@ -692,7 +746,7 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
     }
     // Prepend new visibility tags to a new list of tags for the cell
     // Don't check user auths for labels with Mutations when the user is super user
-    boolean authCheck = this.checkAuths && !(isSystemOrSuperUser());
+    boolean authCheck = authorizationEnabled && checkAuths && !(isSystemOrSuperUser());
     tags.addAll(this.visibilityLabelService.createVisibilityExpTags(cellVisibility.getExpression(),
         true, authCheck));
     // Save an object allocation where we can
@@ -731,7 +785,9 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
     } else {
       List<byte[]> labels = new ArrayList<byte[]>(visLabels.size());
       try {
-        checkCallingUserAuth();
+        if (authorizationEnabled) {
+          checkCallingUserAuth();
+        }
         RegionActionResult successResult = RegionActionResult.newBuilder().build();
         for (VisibilityLabel visLabel : visLabels) {
           byte[] label = visLabel.getLabel().toByteArray();
@@ -791,8 +847,9 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
       byte[] user = request.getUser().toByteArray();
       List<byte[]> labelAuths = new ArrayList<byte[]>(auths.size());
       try {
-        checkCallingUserAuth();
-
+        if (authorizationEnabled) {
+          checkCallingUserAuth();
+        }
         for (ByteString authBS : auths) {
           labelAuths.add(authBS.toByteArray());
         }
@@ -863,7 +920,7 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
       try {
         // We do ACL check here as we create scanner directly on region. It will not make calls to
         // AccessController CP methods.
-        if (this.acOn && !isSystemOrSuperUser()) {
+        if (authorizationEnabled && accessControllerAvailable && !isSystemOrSuperUser()) {
           User requestingUser = VisibilityUtils.getActiveUser();
           throw new AccessDeniedException("User '"
               + (requestingUser != null ? requestingUser.getShortName() : "null")
@@ -916,13 +973,15 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
       List<byte[]> labelAuths = new ArrayList<byte[]>(auths.size());
       try {
         // When AC is ON, do AC based user auth check
-        if (this.acOn && !isSystemOrSuperUser()) {
+        if (authorizationEnabled && accessControllerAvailable && !isSystemOrSuperUser()) {
           User user = VisibilityUtils.getActiveUser();
           throw new AccessDeniedException("User '" + (user != null ? user.getShortName() : "null")
               + " is not authorized to perform this action.");
         }
-        checkCallingUserAuth(); // When AC is not in place the calling user should have SYSTEM_LABEL
-                                // auth to do this action.
+        if (authorizationEnabled) {
+          checkCallingUserAuth(); // When AC is not in place the calling user should have
+                                  // SYSTEM_LABEL auth to do this action.
+        }
         for (ByteString authBS : auths) {
           labelAuths.add(authBS.toByteArray());
         }
@@ -966,7 +1025,7 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
       try {
         // We do ACL check here as we create scanner directly on region. It will not make calls to
         // AccessController CP methods.
-        if (this.acOn && !isSystemOrSuperUser()) {
+        if (authorizationEnabled && accessControllerAvailable && !isSystemOrSuperUser()) {
           User requestingUser = VisibilityUtils.getActiveUser();
           throw new AccessDeniedException("User '"
               + (requestingUser != null ? requestingUser.getShortName() : "null")
@@ -990,7 +1049,10 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
   }
 
   private void checkCallingUserAuth() throws IOException {
-    if (!this.acOn) {
+    if (!authorizationEnabled) { // Redundant, but just in case
+      return;
+    }
+    if (!accessControllerAvailable) {
       User user = VisibilityUtils.getActiveUser();
       if (user == null) {
         throw new IOException("Unable to retrieve calling user");

http://git-wip-us.apache.org/repos/asf/hbase/blob/356422e1/hbase-server/src/test/java/org/apache/hadoop/hbase/security/HBaseKerberosUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/HBaseKerberosUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/HBaseKerberosUtils.java
index 1f9f4f5..237efe9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/HBaseKerberosUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/HBaseKerberosUtils.java
@@ -62,8 +62,8 @@ public class HBaseKerberosUtils {
   public static Configuration getConfigurationWoPrincipal() {
     Configuration conf = HBaseConfiguration.create();
     conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
-    conf.set("hbase.security.authentication", "kerberos");
-    conf.setBoolean("hbase.security.authorization", true);
+    conf.set(User.HBASE_SECURITY_CONF_KEY, "kerberos");
+    conf.setBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true);
     return conf;
   }
 
@@ -75,8 +75,8 @@ public class HBaseKerberosUtils {
 
   public static void setSecuredConfiguration(Configuration conf) {
     conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
-    conf.set("hbase.security.authentication", "kerberos");
-    conf.setBoolean("hbase.security.authorization", true);
+    conf.set(User.HBASE_SECURITY_CONF_KEY, "kerberos");
+    conf.setBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true);
     conf.set(KRB_KEYTAB_FILE, System.getProperty(KRB_KEYTAB_FILE));
     conf.set(KRB_PRINCIPAL, System.getProperty(KRB_PRINCIPAL));
     conf.set(MASTER_KRB_PRINCIPAL, System.getProperty(KRB_PRINCIPAL));

http://git-wip-us.apache.org/repos/asf/hbase/blob/356422e1/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
index b30c770..fb06c05 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
@@ -55,11 +55,13 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
 import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.CheckPermissionsRequest;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.security.AccessDeniedException;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.access.Permission.Action;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
 
 import com.google.common.collect.Lists;
@@ -75,14 +77,7 @@ public class SecureTestUtil {
   private static final Log LOG = LogFactory.getLog(SecureTestUtil.class);
   private static final int WAIT_TIME = 10000;
 
-  public static void enableSecurity(Configuration conf) throws IOException {
-    conf.set("hadoop.security.authorization", "false");
-    conf.set("hadoop.security.authentication", "simple");
-    conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, AccessController.class.getName() +
-      "," + MasterSyncObserver.class.getName());
-    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, AccessController.class.getName() +
-      "," + SecureBulkLoadEndpoint.class.getName());
-    conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, AccessController.class.getName());
+  public static void configureSuperuser(Configuration conf) throws IOException {
     // The secure minicluster creates separate service principals based on the
     // current user's name, one for each slave. We need to add all of these to
     // the superuser list or security won't function properly. We expect the
@@ -97,8 +92,19 @@ public class SecureTestUtil {
       sb.append(currentUser); sb.append(".hfs."); sb.append(i);
     }
     conf.set("hbase.superuser", sb.toString());
+  }
+
+  public static void enableSecurity(Configuration conf) throws IOException {
+    conf.set("hadoop.security.authorization", "false");
+    conf.set("hadoop.security.authentication", "simple");
+    conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, AccessController.class.getName() +
+      "," + MasterSyncObserver.class.getName());
+    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, AccessController.class.getName() +
+      "," + SecureBulkLoadEndpoint.class.getName());
+    conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, AccessController.class.getName());
     // Need HFile V3 for tags for security features
     conf.setInt(HFile.FORMAT_VERSION_KEY, 3);
+    configureSuperuser(conf);
   }
 
   public static void verifyConfiguration(Configuration conf) {
@@ -716,4 +722,60 @@ public class SecureTestUtil {
   public static String convertToGroup(String group) {
     return AccessControlLists.GROUP_PREFIX + group;
   }
+
+  public static void checkGlobalPerms(HBaseTestingUtility testUtil, Permission.Action... actions)
+      throws IOException {
+    Permission[] perms = new Permission[actions.length];
+    for (int i = 0; i < actions.length; i++) {
+      perms[i] = new Permission(actions[i]);
+    }
+    CheckPermissionsRequest.Builder request = CheckPermissionsRequest.newBuilder();
+    for (Action a : actions) {
+      request.addPermission(AccessControlProtos.Permission.newBuilder()
+          .setType(AccessControlProtos.Permission.Type.Global)
+          .setGlobalPermission(
+              AccessControlProtos.GlobalPermission.newBuilder()
+                  .addAction(ProtobufUtil.toPermissionAction(a)).build()));
+    }
+    try(Connection conn = ConnectionFactory.createConnection(testUtil.getConfiguration());
+        Table acl = conn.getTable(AccessControlLists.ACL_TABLE_NAME)) {
+      BlockingRpcChannel channel = acl.coprocessorService(new byte[0]);
+      AccessControlService.BlockingInterface protocol =
+        AccessControlService.newBlockingStub(channel);
+      try {
+        protocol.checkPermissions(null, request.build());
+      } catch (ServiceException se) {
+        ProtobufUtil.toIOException(se);
+      }
+    }
+  }
+
+  public static void checkTablePerms(HBaseTestingUtility testUtil, TableName table, byte[] family,
+      byte[] column, Permission.Action... actions) throws IOException {
+    Permission[] perms = new Permission[actions.length];
+    for (int i = 0; i < actions.length; i++) {
+      perms[i] = new TablePermission(table, family, column, actions[i]);
+    }
+    checkTablePerms(testUtil, table, perms);
+  }
+
+  public static void checkTablePerms(HBaseTestingUtility testUtil, TableName table,
+      Permission... perms) throws IOException {
+    CheckPermissionsRequest.Builder request = CheckPermissionsRequest.newBuilder();
+    for (Permission p : perms) {
+      request.addPermission(ProtobufUtil.toPermission(p));
+    }
+
+    try(Connection conn = ConnectionFactory.createConnection(testUtil.getConfiguration());
+        Table acl = conn.getTable(table)) {
+      AccessControlService.BlockingInterface protocol =
+        AccessControlService.newBlockingStub(acl.coprocessorService(new byte[0]));
+      try {
+        protocol.checkPermissions(null, request.build());
+      } catch (ServiceException se) {
+        ProtobufUtil.toIOException(se);
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/356422e1/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
index fb3dbe2..a9dd52d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
@@ -137,7 +137,7 @@ public class TestAccessController extends SecureTestUtil {
   }
 
   @Rule public TestTableName TEST_TABLE = new TestTableName();
-  private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
   private static Configuration conf;
 
   /** The systemUserConnection created here is tied to the system user. In case, you are planning
@@ -1535,59 +1535,6 @@ public class TestAccessController extends SecureTestUtil {
     verifyDenied(action, USER_CREATE, USER_RW, USER_NONE, USER_RO);
   }
 
-  public void checkGlobalPerms(Permission.Action... actions) throws IOException {
-    Permission[] perms = new Permission[actions.length];
-    for (int i = 0; i < actions.length; i++) {
-      perms[i] = new Permission(actions[i]);
-    }
-    CheckPermissionsRequest.Builder request = CheckPermissionsRequest.newBuilder();
-    for (Action a : actions) {
-      request.addPermission(AccessControlProtos.Permission.newBuilder()
-          .setType(AccessControlProtos.Permission.Type.Global)
-          .setGlobalPermission(
-              AccessControlProtos.GlobalPermission.newBuilder()
-                  .addAction(ProtobufUtil.toPermissionAction(a)).build()));
-    }
-    try(Connection conn = ConnectionFactory.createConnection(conf);
-        Table acl = conn.getTable(AccessControlLists.ACL_TABLE_NAME)) {
-      BlockingRpcChannel channel = acl.coprocessorService(new byte[0]);
-      AccessControlService.BlockingInterface protocol =
-        AccessControlService.newBlockingStub(channel);
-      try {
-        protocol.checkPermissions(null, request.build());
-      } catch (ServiceException se) {
-        ProtobufUtil.toIOException(se);
-      }
-    }
-  }
-
-  public void checkTablePerms(TableName table, byte[] family, byte[] column,
-      Permission.Action... actions) throws IOException {
-    Permission[] perms = new Permission[actions.length];
-    for (int i = 0; i < actions.length; i++) {
-      perms[i] = new TablePermission(table, family, column, actions[i]);
-    }
-
-    checkTablePerms(table, perms);
-  }
-
-  public void checkTablePerms(TableName table, Permission... perms) throws IOException {
-    CheckPermissionsRequest.Builder request = CheckPermissionsRequest.newBuilder();
-    for (Permission p : perms) {
-      request.addPermission(ProtobufUtil.toPermission(p));
-    }
-    try(Connection conn = ConnectionFactory.createConnection(conf);
-        Table acl = conn.getTable(table)) {
-      AccessControlService.BlockingInterface protocol =
-        AccessControlService.newBlockingStub(acl.coprocessorService(new byte[0]));
-      try {
-        protocol.checkPermissions(null, request.build());
-      } catch (ServiceException se) {
-        ProtobufUtil.toIOException(se);
-      }
-    }
-  }
-
   @Test
   public void testCheckPermissions() throws Exception {
     // --------------------------------------
@@ -1595,7 +1542,7 @@ public class TestAccessController extends SecureTestUtil {
     AccessTestAction globalAdmin = new AccessTestAction() {
       @Override
       public Void run() throws Exception {
-        checkGlobalPerms(Permission.Action.ADMIN);
+        checkGlobalPerms(TEST_UTIL, Permission.Action.ADMIN);
         return null;
       }
     };
@@ -1607,7 +1554,7 @@ public class TestAccessController extends SecureTestUtil {
     AccessTestAction globalReadWrite = new AccessTestAction() {
       @Override
       public Void run() throws Exception {
-        checkGlobalPerms(Permission.Action.READ, Permission.Action.WRITE);
+        checkGlobalPerms(TEST_UTIL, Permission.Action.READ, Permission.Action.WRITE);
         return null;
       }
     };
@@ -1636,7 +1583,8 @@ public class TestAccessController extends SecureTestUtil {
     AccessTestAction tableRead = new AccessTestAction() {
       @Override
       public Void run() throws Exception {
-        checkTablePerms(TEST_TABLE.getTableName(), null, null, Permission.Action.READ);
+        checkTablePerms(TEST_UTIL, TEST_TABLE.getTableName(), null, null,
+          Permission.Action.READ);
         return null;
       }
     };
@@ -1644,7 +1592,8 @@ public class TestAccessController extends SecureTestUtil {
     AccessTestAction columnRead = new AccessTestAction() {
       @Override
       public Void run() throws Exception {
-        checkTablePerms(TEST_TABLE.getTableName(), TEST_FAMILY, null, Permission.Action.READ);
+        checkTablePerms(TEST_UTIL, TEST_TABLE.getTableName(), TEST_FAMILY, null,
+          Permission.Action.READ);
         return null;
       }
     };
@@ -1652,7 +1601,8 @@ public class TestAccessController extends SecureTestUtil {
     AccessTestAction qualifierRead = new AccessTestAction() {
       @Override
       public Void run() throws Exception {
-        checkTablePerms(TEST_TABLE.getTableName(), TEST_FAMILY, TEST_Q1, Permission.Action.READ);
+        checkTablePerms(TEST_UTIL, TEST_TABLE.getTableName(), TEST_FAMILY, TEST_Q1,
+          Permission.Action.READ);
         return null;
       }
     };
@@ -1660,9 +1610,11 @@ public class TestAccessController extends SecureTestUtil {
     AccessTestAction multiQualifierRead = new AccessTestAction() {
       @Override
       public Void run() throws Exception {
-        checkTablePerms(TEST_TABLE.getTableName(), new Permission[] {
-            new TablePermission(TEST_TABLE.getTableName(), TEST_FAMILY, TEST_Q1, Permission.Action.READ),
-            new TablePermission(TEST_TABLE.getTableName(), TEST_FAMILY, TEST_Q2, Permission.Action.READ), });
+        checkTablePerms(TEST_UTIL, TEST_TABLE.getTableName(), new Permission[] {
+            new TablePermission(TEST_TABLE.getTableName(), TEST_FAMILY, TEST_Q1,
+              Permission.Action.READ),
+            new TablePermission(TEST_TABLE.getTableName(), TEST_FAMILY, TEST_Q2,
+              Permission.Action.READ), });
         return null;
       }
     };
@@ -1670,8 +1622,10 @@ public class TestAccessController extends SecureTestUtil {
     AccessTestAction globalAndTableRead = new AccessTestAction() {
       @Override
       public Void run() throws Exception {
-        checkTablePerms(TEST_TABLE.getTableName(), new Permission[] { new Permission(Permission.Action.READ),
-            new TablePermission(TEST_TABLE.getTableName(), null, (byte[]) null, Permission.Action.READ), });
+        checkTablePerms(TEST_UTIL, TEST_TABLE.getTableName(),
+          new Permission[] { new Permission(Permission.Action.READ),
+            new TablePermission(TEST_TABLE.getTableName(), null, (byte[]) null,
+            Permission.Action.READ), });
         return null;
       }
     };
@@ -1679,7 +1633,7 @@ public class TestAccessController extends SecureTestUtil {
     AccessTestAction noCheck = new AccessTestAction() {
       @Override
       public Void run() throws Exception {
-        checkTablePerms(TEST_TABLE.getTableName(), new Permission[0]);
+        checkTablePerms(TEST_UTIL, TEST_TABLE.getTableName(), new Permission[0]);
         return null;
       }
     };
@@ -1705,8 +1659,8 @@ public class TestAccessController extends SecureTestUtil {
     AccessTestAction familyReadWrite = new AccessTestAction() {
       @Override
       public Void run() throws Exception {
-        checkTablePerms(TEST_TABLE.getTableName(), TEST_FAMILY, null, Permission.Action.READ,
-          Permission.Action.WRITE);
+        checkTablePerms(TEST_UTIL, TEST_TABLE.getTableName(), TEST_FAMILY, null,
+          Permission.Action.READ, Permission.Action.WRITE);
         return null;
       }
     };