You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2015/09/11 22:45:23 UTC

hbase git commit: HBASE-14370 Use separate thread for calling ZKPermissionWatcher#refreshNodes()

Repository: hbase
Updated Branches:
  refs/heads/master c94d10952 -> dff5243c8


HBASE-14370 Use separate thread for calling ZKPermissionWatcher#refreshNodes()


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/dff5243c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/dff5243c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/dff5243c

Branch: refs/heads/master
Commit: dff5243c89544de8ed3127a7df5ec79cdab3373b
Parents: c94d109
Author: tedyu <yu...@gmail.com>
Authored: Fri Sep 11 13:44:48 2015 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Fri Sep 11 13:44:48 2015 -0700

----------------------------------------------------------------------
 .../hbase/security/access/AccessController.java |   6 +-
 .../hbase/security/access/TableAuthManager.java |  55 +++-
 .../security/access/ZKPermissionWatcher.java    | 148 +++++++---
 .../hbase/security/access/SecureTestUtil.java   |  12 +-
 .../security/access/TestAccessController.java   |   2 +
 .../security/access/TestAccessController3.java  | 294 +++++++++++++++++++
 .../security/access/TestTablePermissions.java   |   2 +-
 .../access/TestZKPermissionsWatcher.java        |   4 +-
 8 files changed, 470 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/dff5243c/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index 08ff6da..622b3a6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -971,7 +971,7 @@ public class AccessController extends BaseMasterAndRegionObserver
     // throw RuntimeException so that the coprocessor is unloaded.
     if (zk != null) {
       try {
-        this.authManager = TableAuthManager.get(zk, env.getConfiguration());
+        this.authManager = TableAuthManager.getOrCreate(zk, env.getConfiguration());
       } catch (IOException ioe) {
         throw new RuntimeException("Error obtaining TableAuthManager", ioe);
       }
@@ -984,7 +984,9 @@ public class AccessController extends BaseMasterAndRegionObserver
 
   @Override
   public void stop(CoprocessorEnvironment env) {
-
+    if (this.authManager != null) {
+      TableAuthManager.release(authManager);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/dff5243c/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java
index 7370ee5..e98f7a5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hbase.security.access;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
@@ -40,6 +41,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
@@ -48,7 +50,7 @@ import com.google.common.collect.Lists;
  * Performs authorization checks for a given user's assigned permissions
  */
 @InterfaceAudience.Private
-public class TableAuthManager {
+public class TableAuthManager implements Closeable {
   private static class PermissionCache<T extends Permission> {
     /** Cache of user permissions */
     private ListMultimap<String,T> userCache = ArrayListMultimap.create();
@@ -95,8 +97,6 @@ public class TableAuthManager {
 
   private static final Log LOG = LogFactory.getLog(TableAuthManager.class);
 
-  private static TableAuthManager instance;
-
   /** Cache of global permissions */
   private volatile PermissionCache<Permission> globalCache;
 
@@ -125,6 +125,11 @@ public class TableAuthManager {
     }
   }
 
+  @Override
+  public void close() {
+    this.zkperms.close();
+  }
+
   /**
    * Returns a new {@code PermissionCache} initialized with permission assignments
    * from the {@code hbase.superuser} configuration key.
@@ -739,16 +744,54 @@ public class TableAuthManager {
     return mtime.get();
   }
 
-  static Map<ZooKeeperWatcher,TableAuthManager> managerMap =
+  private static Map<ZooKeeperWatcher,TableAuthManager> managerMap =
     new HashMap<ZooKeeperWatcher,TableAuthManager>();
 
-  public synchronized static TableAuthManager get(
+  private static Map<TableAuthManager, Integer> refCount = new HashMap<>();
+
+  /** Returns a TableAuthManager from the cache. If not cached, constructs a new one. Returned
+   * instance should be released back by calling {@link #release(TableAuthManager)}. */
+  public synchronized static TableAuthManager getOrCreate(
       ZooKeeperWatcher watcher, Configuration conf) throws IOException {
-    instance = managerMap.get(watcher);
+    TableAuthManager instance = managerMap.get(watcher);
     if (instance == null) {
       instance = new TableAuthManager(watcher, conf);
       managerMap.put(watcher, instance);
     }
+    int ref = refCount.get(instance) == null ? 0 : refCount.get(instance).intValue();
+    refCount.put(instance, ref + 1);
     return instance;
   }
+
+  @VisibleForTesting
+  static int getTotalRefCount() {
+    int total = 0;
+    for (int count : refCount.values()) {
+      total += count;
+    }
+    return total;
+  }
+
+  /**
+   * Releases the resources for the given TableAuthManager if the reference count is down to 0.
+   * @param instance TableAuthManager to be released
+   */
+  public synchronized static void release(TableAuthManager instance) {
+    if (refCount.get(instance) == null || refCount.get(instance) < 1) {
+      String msg = "Something wrong with the TableAuthManager reference counting: " + instance
+          + " whose count is " + refCount.get(instance);
+      LOG.fatal(msg);
+      instance.close();
+      managerMap.remove(instance.getZKPermissionWatcher().getWatcher());
+      instance.getZKPermissionWatcher().getWatcher().abort(msg, null);
+    } else {
+      int ref = refCount.get(instance);
+      refCount.put(instance, ref-1);
+      if (ref-1 == 0) {
+        instance.close();
+        managerMap.remove(instance.getZKPermissionWatcher().getWatcher());
+        refCount.remove(instance);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/dff5243c/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java
index d5fdd41..b3aa782 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java
@@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DaemonThreadFactory;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -29,9 +30,15 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Handles synchronization of access control list entries and updates
@@ -43,13 +50,16 @@ import java.util.concurrent.CountDownLatch;
  * trigger updates in the {@link TableAuthManager} permission cache.
  */
 @InterfaceAudience.Private
-public class ZKPermissionWatcher extends ZooKeeperListener {
+public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable {
   private static final Log LOG = LogFactory.getLog(ZKPermissionWatcher.class);
   // parent node for permissions lists
   static final String ACL_NODE = "acl";
   TableAuthManager authManager;
   String aclZNode;
   CountDownLatch initialized = new CountDownLatch(1);
+  AtomicReference<List<ZKUtil.NodeAndData>> nodes =
+      new AtomicReference<List<ZKUtil.NodeAndData>>(null);
+  ExecutorService executor;
 
   public ZKPermissionWatcher(ZooKeeperWatcher watcher,
       TableAuthManager authManager, Configuration conf) {
@@ -57,16 +67,34 @@ public class ZKPermissionWatcher extends ZooKeeperListener {
     this.authManager = authManager;
     String aclZnodeParent = conf.get("zookeeper.znode.acl.parent", ACL_NODE);
     this.aclZNode = ZKUtil.joinZNode(watcher.baseZNode, aclZnodeParent);
+    executor = Executors.newSingleThreadExecutor(
+      new DaemonThreadFactory("zk-permission-watcher"));
   }
 
   public void start() throws KeeperException {
     try {
       watcher.registerListener(this);
       if (ZKUtil.watchAndCheckExists(watcher, aclZNode)) {
-        List<ZKUtil.NodeAndData> existing =
-            ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
-        if (existing != null) {
-          refreshNodes(existing);
+        try {
+          executor.submit(new Callable<Void>() {
+            @Override
+            public Void call() throws KeeperException {
+              List<ZKUtil.NodeAndData> existing =
+                  ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
+              if (existing != null) {
+                refreshNodes(existing, null);
+              }
+              return null;
+            }
+          }).get();
+        } catch (ExecutionException ex) {
+          if (ex.getCause() instanceof KeeperException) {
+            throw (KeeperException)ex.getCause();
+          } else {
+            throw new RuntimeException(ex.getCause());
+          }
+        } catch (InterruptedException ex) {
+          Thread.currentThread().interrupt();
         }
       }
     } finally {
@@ -74,11 +102,16 @@ public class ZKPermissionWatcher extends ZooKeeperListener {
     }
   }
 
+  @Override
+  public void close() {
+    executor.shutdown();
+  }
+
   private void waitUntilStarted() {
     try {
       initialized.await();
     } catch (InterruptedException e) {
-      LOG.warn("Interrupted while waiting", e);
+      LOG.warn("Interrupted while waiting for start", e);
       Thread.currentThread().interrupt();
     }
   }
@@ -87,68 +120,103 @@ public class ZKPermissionWatcher extends ZooKeeperListener {
   public void nodeCreated(String path) {
     waitUntilStarted();
     if (path.equals(aclZNode)) {
-      try {
-        List<ZKUtil.NodeAndData> nodes =
-            ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
-        refreshNodes(nodes);
-      } catch (KeeperException ke) {
-        LOG.error("Error reading data from zookeeper", ke);
-        // only option is to abort
-        watcher.abort("Zookeeper error obtaining acl node children", ke);
-      }
+      executor.submit(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            List<ZKUtil.NodeAndData> nodes =
+                ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
+            refreshNodes(nodes, null);
+          } catch (KeeperException ke) {
+            LOG.error("Error reading data from zookeeper", ke);
+            // only option is to abort
+            watcher.abort("Zookeeper error obtaining acl node children", ke);
+          }
+        }
+      });
     }
   }
 
   @Override
-  public void nodeDeleted(String path) {
+  public void nodeDeleted(final String path) {
     waitUntilStarted();
     if (aclZNode.equals(ZKUtil.getParent(path))) {
-      String table = ZKUtil.getNodeName(path);
-      if(AccessControlLists.isNamespaceEntry(table)) {
-        authManager.removeNamespace(Bytes.toBytes(table));
-      } else {
-        authManager.removeTable(TableName.valueOf(table));
-      }
+      executor.submit(new Runnable() {
+        @Override
+        public void run() {
+          String table = ZKUtil.getNodeName(path);
+          if(AccessControlLists.isNamespaceEntry(table)) {
+            authManager.removeNamespace(Bytes.toBytes(table));
+          } else {
+            authManager.removeTable(TableName.valueOf(table));
+          }
+        }
+      });
     }
   }
 
   @Override
-  public void nodeDataChanged(String path) {
+  public void nodeDataChanged(final String path) {
     waitUntilStarted();
     if (aclZNode.equals(ZKUtil.getParent(path))) {
-      // update cache on an existing table node
-      String entry = ZKUtil.getNodeName(path);
-      try {
-        byte[] data = ZKUtil.getDataAndWatch(watcher, path);
-        refreshAuthManager(entry, data);
-      } catch (KeeperException ke) {
-        LOG.error("Error reading data from zookeeper for node " + entry, ke);
-        // only option is to abort
-        watcher.abort("Zookeeper error getting data for node " + entry, ke);
-      } catch (IOException ioe) {
-        LOG.error("Error reading permissions writables", ioe);
-      }
+      executor.submit(new Runnable() {
+        @Override
+        public void run() {
+          // update cache on an existing table node
+          String entry = ZKUtil.getNodeName(path);
+          try {
+            byte[] data = ZKUtil.getDataAndWatch(watcher, path);
+            refreshAuthManager(entry, data);
+          } catch (KeeperException ke) {
+            LOG.error("Error reading data from zookeeper for node " + entry, ke);
+            // only option is to abort
+            watcher.abort("Zookeeper error getting data for node " + entry, ke);
+          } catch (IOException ioe) {
+            LOG.error("Error reading permissions writables", ioe);
+          }
+        }
+      });
     }
   }
 
   @Override
-  public void nodeChildrenChanged(String path) {
+  public void nodeChildrenChanged(final String path) {
     waitUntilStarted();
     if (path.equals(aclZNode)) {
-      // table permissions changed
       try {
-        List<ZKUtil.NodeAndData> nodes =
+        List<ZKUtil.NodeAndData> nodeList =
             ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
-        refreshNodes(nodes);
+        while (!nodes.compareAndSet(null, nodeList)) {
+          try {
+            Thread.sleep(20);
+          } catch (InterruptedException e) {
+            LOG.warn("Interrupted while setting node list", e);
+            Thread.currentThread().interrupt();
+          }
+        }
       } catch (KeeperException ke) {
         LOG.error("Error reading data from zookeeper for path "+path, ke);
         watcher.abort("Zookeeper error get node children for path "+path, ke);
       }
+      executor.submit(new Runnable() {
+        // allows subsequent nodeChildrenChanged event to preempt current processing of
+        // nodeChildrenChanged event
+        @Override
+        public void run() {
+          List<ZKUtil.NodeAndData> nodeList = nodes.get();
+          nodes.set(null);
+          refreshNodes(nodeList, nodes);
+        }
+      });
     }
   }
 
-  private void refreshNodes(List<ZKUtil.NodeAndData> nodes) {
+  private void refreshNodes(List<ZKUtil.NodeAndData> nodes, AtomicReference ref) {
     for (ZKUtil.NodeAndData n : nodes) {
+      if (ref != null && ref.get() != null) {
+        // there is a newer list
+        break;
+      }
       if (n.isEmpty()) continue;
       String path = n.getNode();
       String entry = (ZKUtil.getNodeName(path));

http://git-wip-us.apache.org/repos/asf/hbase/blob/dff5243c/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
index 003e4ab..9fe6e0c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
@@ -108,10 +108,18 @@ public class SecureTestUtil {
   }
 
   public static void verifyConfiguration(Configuration conf) {
+    String coprocs = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
+    boolean accessControllerLoaded = false;
+    for (String coproc : coprocs.split(",")) {
+      try {
+        accessControllerLoaded = AccessController.class.isAssignableFrom(Class.forName(coproc));
+        if (accessControllerLoaded) break;
+      } catch (ClassNotFoundException cnfe) {
+      }
+    }
     if (!(conf.get(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY).contains(
         AccessController.class.getName())
-        && conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY).contains(
-            AccessController.class.getName()) && conf.get(
+        && accessControllerLoaded && conf.get(
         CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY).contains(
         AccessController.class.getName()))) {
       throw new RuntimeException("AccessController is missing from a system coprocessor list");

http://git-wip-us.apache.org/repos/asf/hbase/blob/dff5243c/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
index 51a7804..70ddbbb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
@@ -245,6 +245,8 @@ public class TestAccessController extends SecureTestUtil {
   public static void tearDownAfterClass() throws Exception {
     cleanUp();
     TEST_UTIL.shutdownMiniCluster();
+    int total = TableAuthManager.getTotalRefCount();
+    assertTrue("Unexpected reference count: " + total, total == 0);
   }
 
   private static void setUpTableAndUserPermissions() throws Exception {

http://git-wip-us.apache.org/repos/asf/hbase/blob/dff5243c/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController3.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController3.java
new file mode 100644
index 0000000..d757736
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController3.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.access;
+
+import static org.apache.hadoop.hbase.AuthUtil.toGroupEntry;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
+import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.SecurityTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Performs checks for reference counting w.r.t. TableAuthManager which is used by AccessController.
+ */
+@Category({SecurityTests.class, MediumTests.class})
+public class TestAccessController3 extends SecureTestUtil {
+  private static final Log LOG = LogFactory.getLog(TestAccessController.class);
+
+  static {
+    Logger.getLogger(AccessController.class).setLevel(Level.TRACE);
+    Logger.getLogger(AccessControlFilter.class).setLevel(Level.TRACE);
+    Logger.getLogger(TableAuthManager.class).setLevel(Level.TRACE);
+  }
+
+  private static TableName TEST_TABLE = TableName.valueOf("testtable1");
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static Configuration conf;
+
+  /** The systemUserConnection created here is tied to the system user. In case, you are planning
+   * to create AccessTestAction, DON'T use this systemUserConnection as the 'doAs' user
+   * gets  eclipsed by the system user. */
+  private static Connection systemUserConnection;
+
+
+  // user with all permissions
+  private static User SUPERUSER;
+  // user granted with all global permission
+  private static User USER_ADMIN;
+  // user with rw permissions on column family.
+  private static User USER_RW;
+  // user with read-only permissions
+  private static User USER_RO;
+  // user is table owner. will have all permissions on table
+  private static User USER_OWNER;
+  // user with create table permissions alone
+  private static User USER_CREATE;
+  // user with no permissions
+  private static User USER_NONE;
+  // user with admin rights on the column family
+  private static User USER_ADMIN_CF;
+
+  private static final String GROUP_ADMIN = "group_admin";
+  private static final String GROUP_CREATE = "group_create";
+  private static final String GROUP_READ = "group_read";
+  private static final String GROUP_WRITE = "group_write";
+
+  private static User USER_GROUP_ADMIN;
+  private static User USER_GROUP_CREATE;
+  private static User USER_GROUP_READ;
+  private static User USER_GROUP_WRITE;
+
+  // TODO: convert this test to cover the full matrix in
+  // https://hbase.apache.org/book/appendix_acl_matrix.html
+  // creating all Scope x Permission combinations
+
+  private static byte[] TEST_FAMILY = Bytes.toBytes("f1");
+
+  private static MasterCoprocessorEnvironment CP_ENV;
+  private static AccessController ACCESS_CONTROLLER;
+  private static RegionServerCoprocessorEnvironment RSCP_ENV;
+  private static RegionCoprocessorEnvironment RCP_ENV;
+  
+  private static boolean callSuperTwice = true;
+
+  // class with faulty stop() method, controlled by flag
+  public static class FaultyAccessController extends AccessController {
+    public FaultyAccessController() {
+    }
+
+    @Override
+    public void stop(CoprocessorEnvironment env) {
+      super.stop(env);
+      if (callSuperTwice) {
+        super.stop(env);
+      }
+    }
+  }
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    // setup configuration
+    conf = TEST_UTIL.getConfiguration();
+    // Enable security
+    enableSecurity(conf);
+    String accessControllerClassName = FaultyAccessController.class.getName();
+    // In this particular test case, we can't use SecureBulkLoadEndpoint because its doAs will fail
+    // to move a file for a random user
+    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, accessControllerClassName);
+    // Verify enableSecurity sets up what we require
+    verifyConfiguration(conf);
+
+    // Enable EXEC permission checking
+    conf.setBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY, true);
+
+    TEST_UTIL.startMiniCluster();
+    MasterCoprocessorHost cpHost =
+      TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterCoprocessorHost();
+    cpHost.load(FaultyAccessController.class, Coprocessor.PRIORITY_HIGHEST, conf);
+    ACCESS_CONTROLLER = (AccessController) cpHost.findCoprocessor(accessControllerClassName);
+    CP_ENV = cpHost.createEnvironment(AccessController.class, ACCESS_CONTROLLER,
+      Coprocessor.PRIORITY_HIGHEST, 1, conf);
+    RegionServerCoprocessorHost rsHost;
+    do {
+      rsHost = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0)
+          .getRegionServerCoprocessorHost();
+    } while (rsHost == null);
+    RSCP_ENV = rsHost.createEnvironment(AccessController.class, ACCESS_CONTROLLER,
+      Coprocessor.PRIORITY_HIGHEST, 1, conf);
+
+    // Wait for the ACL table to become available
+    TEST_UTIL.waitUntilAllRegionsAssigned(AccessControlLists.ACL_TABLE_NAME);
+
+    // create a set of test users
+    SUPERUSER = User.createUserForTesting(conf, "admin", new String[] { "supergroup" });
+    USER_ADMIN = User.createUserForTesting(conf, "admin2", new String[0]);
+    USER_RW = User.createUserForTesting(conf, "rwuser", new String[0]);
+    USER_RO = User.createUserForTesting(conf, "rouser", new String[0]);
+    USER_OWNER = User.createUserForTesting(conf, "owner", new String[0]);
+    USER_CREATE = User.createUserForTesting(conf, "tbl_create", new String[0]);
+    USER_NONE = User.createUserForTesting(conf, "nouser", new String[0]);
+    USER_ADMIN_CF = User.createUserForTesting(conf, "col_family_admin", new String[0]);
+
+    USER_GROUP_ADMIN =
+        User.createUserForTesting(conf, "user_group_admin", new String[] { GROUP_ADMIN });
+    USER_GROUP_CREATE =
+        User.createUserForTesting(conf, "user_group_create", new String[] { GROUP_CREATE });
+    USER_GROUP_READ =
+        User.createUserForTesting(conf, "user_group_read", new String[] { GROUP_READ });
+    USER_GROUP_WRITE =
+        User.createUserForTesting(conf, "user_group_write", new String[] { GROUP_WRITE });
+
+    systemUserConnection = TEST_UTIL.getConnection();
+    setUpTableAndUserPermissions();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    HRegionServer rs = null;
+    for (JVMClusterUtil.RegionServerThread thread:
+      TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
+      rs = thread.getRegionServer();
+    }
+    cleanUp();
+    TEST_UTIL.shutdownMiniCluster();
+    assertTrue("region server should have aborted due to FaultyAccessController", rs.isAborted());
+  }
+
+  private static void setUpTableAndUserPermissions() throws Exception {
+    HTableDescriptor htd = new HTableDescriptor(TEST_TABLE);
+    HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAMILY);
+    hcd.setMaxVersions(100);
+    htd.addFamily(hcd);
+    htd.setOwner(USER_OWNER);
+    createTable(TEST_UTIL, htd, new byte[][] { Bytes.toBytes("s") });
+
+    Region region = TEST_UTIL.getHBaseCluster().getRegions(TEST_TABLE).get(0);
+    RegionCoprocessorHost rcpHost = region.getCoprocessorHost();
+    RCP_ENV = rcpHost.createEnvironment(AccessController.class, ACCESS_CONTROLLER,
+      Coprocessor.PRIORITY_HIGHEST, 1, conf);
+
+    // Set up initial grants
+
+    grantGlobal(TEST_UTIL, USER_ADMIN.getShortName(),
+      Permission.Action.ADMIN,
+      Permission.Action.CREATE,
+      Permission.Action.READ,
+      Permission.Action.WRITE);
+
+    grantOnTable(TEST_UTIL, USER_RW.getShortName(),
+      TEST_TABLE, TEST_FAMILY, null,
+      Permission.Action.READ,
+      Permission.Action.WRITE);
+
+    // USER_CREATE is USER_RW plus CREATE permissions
+    grantOnTable(TEST_UTIL, USER_CREATE.getShortName(),
+      TEST_TABLE, null, null,
+      Permission.Action.CREATE,
+      Permission.Action.READ,
+      Permission.Action.WRITE);
+
+    grantOnTable(TEST_UTIL, USER_RO.getShortName(),
+      TEST_TABLE, TEST_FAMILY, null,
+      Permission.Action.READ);
+
+    grantOnTable(TEST_UTIL, USER_ADMIN_CF.getShortName(),
+      TEST_TABLE, TEST_FAMILY,
+      null, Permission.Action.ADMIN, Permission.Action.CREATE);
+
+    grantGlobal(TEST_UTIL, toGroupEntry(GROUP_ADMIN), Permission.Action.ADMIN);
+    grantGlobal(TEST_UTIL, toGroupEntry(GROUP_CREATE), Permission.Action.CREATE);
+    grantGlobal(TEST_UTIL, toGroupEntry(GROUP_READ), Permission.Action.READ);
+    grantGlobal(TEST_UTIL, toGroupEntry(GROUP_WRITE), Permission.Action.WRITE);
+
+    assertEquals(5, AccessControlLists.getTablePermissions(conf, TEST_TABLE).size());
+    try {
+      assertEquals(5, AccessControlClient.getUserPermissions(systemUserConnection,
+          TEST_TABLE.toString()).size());
+    } catch (Throwable e) {
+      LOG.error("error during call of AccessControlClient.getUserPermissions. ", e);
+    }
+  }
+
+  private static void cleanUp() throws Exception {
+    // Clean the _acl_ table
+    try {
+      deleteTable(TEST_UTIL, TEST_TABLE);
+    } catch (TableNotFoundException ex) {
+      // Test deleted the table, no problem
+      LOG.info("Test deleted table " + TEST_TABLE);
+    }
+    // Verify all table/namespace permissions are erased
+    assertEquals(0, AccessControlLists.getTablePermissions(conf, TEST_TABLE).size());
+    assertEquals(
+      0,
+      AccessControlLists.getNamespacePermissions(conf,
+        TEST_TABLE.getNamespaceAsString()).size());
+  }
+
+  @Test
+  public void testTableCreate() throws Exception {
+    AccessTestAction createTable = new AccessTestAction() {
+      @Override
+      public Object run() throws Exception {
+        HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testnewtable"));
+        htd.addFamily(new HColumnDescriptor(TEST_FAMILY));
+        ACCESS_CONTROLLER.preCreateTable(ObserverContext.createAndPrepare(CP_ENV, null), htd, null);
+        return null;
+      }
+    };
+
+    // verify that superuser can create tables
+    verifyAllowed(createTable, SUPERUSER, USER_ADMIN, USER_GROUP_CREATE);
+
+    // all others should be denied
+    verifyDenied(createTable, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_GROUP_ADMIN,
+      USER_GROUP_READ, USER_GROUP_WRITE);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/dff5243c/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java
index 3456158..1f23576 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java
@@ -434,7 +434,7 @@ public class TestTablePermissions {
     /* test a race condition causing TableAuthManager to sometimes fail global permissions checks
      * when the global cache is being updated
      */
-    TableAuthManager authManager = TableAuthManager.get(ZKW, conf);
+    TableAuthManager authManager = TableAuthManager.getOrCreate(ZKW, conf);
     // currently running user is the system user and should have global admin perms
     User currentUser = User.getCurrent();
     assertTrue(authManager.authorize(currentUser, Permission.Action.ADMIN));

http://git-wip-us.apache.org/repos/asf/hbase/blob/dff5243c/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionsWatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionsWatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionsWatcher.java
index 9c2bc3c..b8e7b53 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionsWatcher.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionsWatcher.java
@@ -75,9 +75,9 @@ public class TestZKPermissionsWatcher {
 
     // start minicluster
     UTIL.startMiniCluster();
-    AUTH_A = TableAuthManager.get(new ZooKeeperWatcher(conf,
+    AUTH_A = TableAuthManager.getOrCreate(new ZooKeeperWatcher(conf,
       "TestZKPermissionsWatcher_1", ABORTABLE), conf);
-    AUTH_B = TableAuthManager.get(new ZooKeeperWatcher(conf,
+    AUTH_B = TableAuthManager.getOrCreate(new ZooKeeperWatcher(conf,
       "TestZKPermissionsWatcher_2", ABORTABLE), conf);
   }