You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/01/15 16:26:48 UTC

[1/9] hbase git commit: HBASE-15099 Move RegionStateListener class out of quotas package

Repository: hbase
Updated Branches:
  refs/heads/hbase-12439 5e89ebcc2 -> cb17c7a97


HBASE-15099 Move RegionStateListener class out of quotas package


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c8b9754a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c8b9754a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c8b9754a

Branch: refs/heads/hbase-12439
Commit: c8b9754a5e0372f93d0e9cf1d9ce788c3d0dcda1
Parents: 5e89ebc
Author: tedyu <yu...@gmail.com>
Authored: Wed Jan 13 15:15:37 2016 -0800
Committer: tedyu <yu...@gmail.com>
Committed: Wed Jan 13 15:15:37 2016 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/RegionStateListener.java       | 53 +++++++++++++++++++
 .../hadoop/hbase/master/AssignmentManager.java  |  2 +-
 .../org/apache/hadoop/hbase/master/HMaster.java |  2 +-
 .../hadoop/hbase/quotas/MasterQuotaManager.java |  1 +
 .../hbase/quotas/RegionStateListener.java       | 54 --------------------
 5 files changed, 56 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c8b9754a/hbase-server/src/main/java/org/apache/hadoop/hbase/RegionStateListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/RegionStateListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/RegionStateListener.java
new file mode 100644
index 0000000..22725ec
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/RegionStateListener.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * The listener interface for receiving region state events.
+ */
+@InterfaceAudience.Private
+public interface RegionStateListener {
+
+  /**
+   * Process region split event.
+   *
+   * @param hri An instance of HRegionInfo
+   * @throws IOException
+   */
+  void onRegionSplit(HRegionInfo hri) throws IOException;
+
+  /**
+   * Process region split reverted event.
+   *
+   * @param hri An instance of HRegionInfo
+   * @throws IOException Signals that an I/O exception has occurred.
+   */
+  void onRegionSplitReverted(HRegionInfo hri) throws IOException;
+
+  /**
+   * Process region merge event.
+   *
+   * @param hri An instance of HRegionInfo
+   * @throws IOException
+   */
+  void onRegionMerged(HRegionInfo hri) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c8b9754a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index c319bb1..f8132e0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.RegionStateListener;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
@@ -76,7 +77,6 @@ import org.apache.hadoop.hbase.normalizer.NormalizationPlan.PlanType;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
 import org.apache.hadoop.hbase.quotas.QuotaExceededException;
-import org.apache.hadoop.hbase.quotas.RegionStateListener;
 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
 import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;

http://git-wip-us.apache.org/repos/asf/hbase/blob/c8b9754a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 2431681..8c34b91 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.PleaseHoldException;
 import org.apache.hadoop.hbase.ProcedureInfo;
+import org.apache.hadoop.hbase.RegionStateListener;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerLoad;
 import org.apache.hadoop.hbase.ServerName;
@@ -123,7 +124,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRespo
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
-import org.apache.hadoop.hbase.quotas.RegionStateListener;
 import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.HStore;

http://git-wip-us.apache.org/repos/asf/hbase/blob/c8b9754a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
index 8ff633f..caaea67 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.RegionStateListener;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hbase/blob/c8b9754a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionStateListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionStateListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionStateListener.java
deleted file mode 100644
index 368e21e..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionStateListener.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.quotas;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-/**
- * The listener interface for receiving region state events.
- */
-@InterfaceAudience.Private
-public interface RegionStateListener {
-
-  /**
-   * Process region split event.
-   *
-   * @param hri An instance of HRegionInfo
-   * @throws IOException
-   */
-  void onRegionSplit(HRegionInfo hri) throws IOException;
-
-  /**
-   * Process region split reverted event.
-   *
-   * @param hri An instance of HRegionInfo
-   * @throws IOException Signals that an I/O exception has occurred.
-   */
-  void onRegionSplitReverted(HRegionInfo hri) throws IOException;
-
-  /**
-   * Process region merge event.
-   *
-   * @param hri An instance of HRegionInfo
-   * @throws IOException
-   */
-  void onRegionMerged(HRegionInfo hri) throws IOException;
-}


[6/9] hbase git commit: HBASE-15105 Procedure V2 - Procedure Queue with Namespaces

Posted by sy...@apache.org.
HBASE-15105 Procedure V2 - Procedure Queue with Namespaces


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ae7cc0c8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ae7cc0c8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ae7cc0c8

Branch: refs/heads/hbase-12439
Commit: ae7cc0c8487dd10f72c64c898ddfec6f2ceca643
Parents: 18a48af
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Thu Jan 14 13:45:17 2016 -0800
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Thu Jan 14 13:45:17 2016 -0800

----------------------------------------------------------------------
 .../hbase/master/TableNamespaceManager.java     |  58 +----
 .../procedure/CreateNamespaceProcedure.java     |  18 +-
 .../procedure/DeleteNamespaceProcedure.java     |  11 +-
 .../procedure/MasterProcedureScheduler.java     | 213 ++++++++++++++++---
 .../procedure/ModifyNamespaceProcedure.java     |   9 +-
 .../procedure/TestMasterProcedureScheduler.java |  86 ++++++--
 6 files changed, 281 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ae7cc0c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableNamespaceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableNamespaceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableNamespaceManager.java
index 69d1280..c7ead2d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableNamespaceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableNamespaceManager.java
@@ -21,8 +21,6 @@ package org.apache.hadoop.hbase.master;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.NavigableSet;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -62,7 +60,7 @@ import com.google.common.collect.Sets;
  * This is a helper class used internally to manage the namespace metadata that is stored in
  * TableName.NAMESPACE_TABLE_NAME. It also mirrors updates to the ZK store by forwarding updates to
  * {@link org.apache.hadoop.hbase.ZKNamespaceManager}.
- * 
+ *
  * WARNING: Do not use. Go via the higher-level {@link ClusterSchema} API instead. This manager
  * is likely to go aways anyways.
  */
@@ -76,35 +74,14 @@ public class TableNamespaceManager {
   private ZKNamespaceManager zkNamespaceManager;
   private boolean initialized;
 
-  private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
-
   public static final String KEY_MAX_REGIONS = "hbase.namespace.quota.maxregions";
   public static final String KEY_MAX_TABLES = "hbase.namespace.quota.maxtables";
   static final String NS_INIT_TIMEOUT = "hbase.master.namespace.init.timeout";
   static final int DEFAULT_NS_INIT_TIMEOUT = 300000;
 
-  /** Configuration key for time out for trying to acquire table locks */
-  private static final String TABLE_WRITE_LOCK_TIMEOUT_MS =
-    "hbase.table.write.lock.timeout.ms";
-  /** Configuration key for time out for trying to acquire table locks */
-  private static final String TABLE_READ_LOCK_TIMEOUT_MS =
-    "hbase.table.read.lock.timeout.ms";
-  private static final long DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS = 600 * 1000; //10 min default
-  private static final long DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS = 600 * 1000; //10 min default
-
-  private long exclusiveLockTimeoutMs;
-  private long sharedLockTimeoutMs;
-
   TableNamespaceManager(MasterServices masterServices) {
     this.masterServices = masterServices;
     this.conf = masterServices.getConfiguration();
-
-    this.exclusiveLockTimeoutMs = conf.getLong(
-      TABLE_WRITE_LOCK_TIMEOUT_MS,
-      DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS);
-    this.sharedLockTimeoutMs = conf.getLong(
-      TABLE_READ_LOCK_TIMEOUT_MS,
-      DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS);
   }
 
   public void start() throws IOException {
@@ -138,30 +115,6 @@ public class TableNamespaceManager {
     return nsTable;
   }
 
-  private boolean acquireSharedLock() throws IOException {
-    try {
-      return rwLock.readLock().tryLock(sharedLockTimeoutMs, TimeUnit.MILLISECONDS);
-    } catch (InterruptedException e) {
-      throw (InterruptedIOException) new InterruptedIOException().initCause(e);
-    }
-  }
-
-  public void releaseSharedLock() {
-    rwLock.readLock().unlock();
-  }
-
-  public boolean acquireExclusiveLock() {
-    try {
-      return rwLock.writeLock().tryLock(exclusiveLockTimeoutMs, TimeUnit.MILLISECONDS);
-    } catch (InterruptedException e) {
-      return false;
-    }
-  }
-
-  public void releaseExclusiveLock() {
-    rwLock.writeLock().unlock();
-  }
-
   /*
    * check whether a namespace has already existed.
    */
@@ -229,13 +182,7 @@ public class TableNamespaceManager {
         Sets.newTreeSet(NamespaceDescriptor.NAMESPACE_DESCRIPTOR_COMPARATOR);
     ResultScanner scanner =
         getNamespaceTable().getScanner(HTableDescriptor.NAMESPACE_FAMILY_INFO_BYTES);
-    boolean locked = false;
     try {
-      locked = acquireSharedLock();
-      if (!locked) {
-        throw new IOException(
-          "Fail to acquire lock to scan namespace list.  Some namespace DDL is in progress.");
-      }
       for(Result r : scanner) {
         byte[] val = CellUtil.cloneValue(r.getColumnLatestCell(
           HTableDescriptor.NAMESPACE_FAMILY_INFO_BYTES,
@@ -245,9 +192,6 @@ public class TableNamespaceManager {
       }
     } finally {
       scanner.close();
-      if (locked) {
-        releaseSharedLock();
-      }
     }
     return ret;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ae7cc0c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java
index 87b411e..29a040e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java
@@ -200,23 +200,19 @@ public class CreateNamespaceProcedure
       // Namespace manager might not be ready if master is not fully initialized,
       // return false to reject user namespace creation; return true for default
       // and system namespace creation (this is part of master initialization).
-      if (nsDescriptor.equals(NamespaceDescriptor.DEFAULT_NAMESPACE) ||
-        nsDescriptor.equals(NamespaceDescriptor.SYSTEM_NAMESPACE)) {
-        return true;
-      }
+      boolean isBootstrapNs = nsDescriptor.equals(NamespaceDescriptor.DEFAULT_NAMESPACE) ||
+        nsDescriptor.equals(NamespaceDescriptor.SYSTEM_NAMESPACE);
 
-      if (env.waitInitialized(this)) {
+      if (!isBootstrapNs && env.waitInitialized(this)) {
         return false;
       }
     }
-    return getTableNamespaceManager(env).acquireExclusiveLock();
+    return env.getProcedureQueue().tryAcquireNamespaceExclusiveLock(getNamespaceName());
   }
 
   @Override
   protected void releaseLock(final MasterProcedureEnv env) {
-    if (env.getMasterServices().isInitialized()) {
-      getTableNamespaceManager(env).releaseExclusiveLock();
-    }
+    env.getProcedureQueue().releaseNamespaceExclusiveLock(getNamespaceName());
   }
 
   @Override
@@ -229,6 +225,10 @@ public class CreateNamespaceProcedure
     return TableOperationType.EDIT;
   }
 
+  private String getNamespaceName() {
+    return nsDescriptor.getName();
+  }
+
   /**
    * Action before any real action of creating namespace.
    * @param env MasterProcedureEnv

http://git-wip-us.apache.org/repos/asf/hbase/blob/ae7cc0c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteNamespaceProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteNamespaceProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteNamespaceProcedure.java
index 2f99167..23ff96e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteNamespaceProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteNamespaceProcedure.java
@@ -104,7 +104,7 @@ public class DeleteNamespaceProcedure
         throw new UnsupportedOperationException(this + " unhandled state=" + state);
       }
     } catch (IOException e) {
-      LOG.warn("Error trying to delete the namespace" + namespaceName
+      LOG.warn("Error trying to delete the namespace " + namespaceName
         + " (in state=" + state + ")", e);
 
       setFailure("master-delete-namespace", e);
@@ -212,12 +212,13 @@ public class DeleteNamespaceProcedure
 
   @Override
   protected boolean acquireLock(final MasterProcedureEnv env) {
-    return getTableNamespaceManager(env).acquireExclusiveLock();
+    if (env.waitInitialized(this)) return false;
+    return env.getProcedureQueue().tryAcquireNamespaceExclusiveLock(getNamespaceName());
   }
 
   @Override
   protected void releaseLock(final MasterProcedureEnv env) {
-    getTableNamespaceManager(env).releaseExclusiveLock();
+    env.getProcedureQueue().releaseNamespaceExclusiveLock(getNamespaceName());
   }
 
   @Override
@@ -230,6 +231,10 @@ public class DeleteNamespaceProcedure
     return TableOperationType.EDIT;
   }
 
+  private String getNamespaceName() {
+    return namespaceName;
+  }
+
   /**
    * Action before any real action of deleting namespace.
    * @param env MasterProcedureEnv

http://git-wip-us.apache.org/repos/asf/hbase/blob/ae7cc0c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
index 9a3714f..86a7f44 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
@@ -309,9 +309,9 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
       if (!suspendQueue) suspendQueue = true;
 
       if (isTableProcedure(procedure)) {
-        suspendTableQueue(event, getTableName(procedure));
+        waitTableEvent(event, procedure, suspendQueue);
       } else if (isServerProcedure(procedure)) {
-        suspendServerQueue(event, getServerName(procedure));
+        waitServerEvent(event, procedure, suspendQueue);
       } else {
         // TODO: at the moment we only have Table and Server procedures
         // if you are implementing a non-table/non-server procedure, you have two options: create
@@ -324,15 +324,21 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
     return true;
   }
 
-  private void suspendTableQueue(ProcedureEvent event, TableName tableName) {
+  private void waitTableEvent(ProcedureEvent event, Procedure procedure, boolean suspendQueue) {
+    final TableName tableName = getTableName(procedure);
+    final boolean isDebugEnabled = LOG.isDebugEnabled();
+
     schedLock.lock();
     try {
       TableQueue queue = getTableQueue(tableName);
-      if (!queue.setSuspended(true)) return;
+      if (queue.isSuspended()) return;
+
+      // TODO: if !suspendQueue
 
-      if (LOG.isDebugEnabled()) {
+      if (isDebugEnabled) {
         LOG.debug("Suspend table queue " + tableName);
       }
+      queue.setSuspended(true);
       removeFromRunQueue(tableRunQueue, queue);
       event.suspendTableQueue(queue);
     } finally {
@@ -340,16 +346,22 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
     }
   }
 
-  private void suspendServerQueue(ProcedureEvent event, ServerName serverName) {
+  private void waitServerEvent(ProcedureEvent event, Procedure procedure, boolean suspendQueue) {
+    final ServerName serverName = getServerName(procedure);
+    final boolean isDebugEnabled = LOG.isDebugEnabled();
+
     schedLock.lock();
     try {
       // TODO: This will change once we have the new AM
       ServerQueue queue = getServerQueue(serverName);
-      if (!queue.setSuspended(true)) return;
+      if (queue.isSuspended()) return;
+
+      // TODO: if !suspendQueue
 
-      if (LOG.isDebugEnabled()) {
+      if (isDebugEnabled) {
         LOG.debug("Suspend server queue " + serverName);
       }
+      queue.setSuspended(true);
       removeFromRunQueue(serverRunQueue, queue);
       event.suspendServerQueue(queue);
     } finally {
@@ -358,18 +370,20 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
   }
 
   public void suspend(ProcedureEvent event) {
+    final boolean isDebugEnabled = LOG.isDebugEnabled();
     synchronized (event) {
       event.setReady(false);
-      if (LOG.isDebugEnabled()) {
+      if (isDebugEnabled) {
         LOG.debug("Suspend event " + event);
       }
     }
   }
 
   public void wake(ProcedureEvent event) {
+    final boolean isDebugEnabled = LOG.isDebugEnabled();
     synchronized (event) {
       event.setReady(true);
-      if (LOG.isDebugEnabled()) {
+      if (isDebugEnabled) {
         LOG.debug("Wake event " + event);
       }
 
@@ -467,7 +481,8 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
     Queue<TableName> node = AvlTree.get(tableMap, tableName);
     if (node != null) return (TableQueue)node;
 
-    node = new TableQueue(tableName, getTablePriority(tableName));
+    NamespaceQueue nsQueue = getNamespaceQueue(tableName.getNamespaceAsString());
+    node = new TableQueue(tableName, nsQueue, getTablePriority(tableName));
     tableMap = AvlTree.insert(tableMap, node);
     return (TableQueue)node;
   }
@@ -494,6 +509,18 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
   }
 
   // ============================================================================
+  //  Namespace Queue Lookup Helpers
+  // ============================================================================
+  private NamespaceQueue getNamespaceQueue(String namespace) {
+    Queue<String> node = AvlTree.get(namespaceMap, namespace);
+    if (node != null) return (NamespaceQueue)node;
+
+    node = new NamespaceQueue(namespace);
+    namespaceMap = AvlTree.insert(namespaceMap, node);
+    return (NamespaceQueue)node;
+  }
+
+  // ============================================================================
   //  Server Queue Lookup Helpers
   // ============================================================================
   private ServerQueue getServerQueueWithLock(ServerName serverName) {
@@ -559,10 +586,22 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
   }
 
   public static class TableQueue extends QueueImpl<TableName> {
+    private final NamespaceQueue namespaceQueue;
+
     private TableLock tableLock = null;
 
-    public TableQueue(TableName tableName, int priority) {
+    public TableQueue(TableName tableName, NamespaceQueue namespaceQueue, int priority) {
       super(tableName, priority);
+      this.namespaceQueue = namespaceQueue;
+    }
+
+    public NamespaceQueue getNamespaceQueue() {
+      return namespaceQueue;
+    }
+
+    @Override
+    public synchronized boolean isAvailable() {
+      return super.isAvailable() && !namespaceQueue.hasExclusiveLock();
     }
 
     // TODO: We can abort pending/in-progress operation if the new call is
@@ -584,9 +623,11 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
         case CREATE:
         case DELETE:
         case DISABLE:
-        case EDIT:
         case ENABLE:
           return true;
+        case EDIT:
+          // we allow concurrent edit on the NS table
+          return !tpi.getTableName().equals(TableName.NAMESPACE_TABLE_NAME);
         case READ:
           return false;
         default:
@@ -595,10 +636,8 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
       throw new UnsupportedOperationException("unexpected type " + tpi.getTableOperationType());
     }
 
-    private synchronized boolean trySharedLock(final TableLockManager lockManager,
+    private synchronized boolean tryZkSharedLock(final TableLockManager lockManager,
         final String purpose) {
-      if (hasExclusiveLock()) return false;
-
       // Take zk-read-lock
       TableName tableName = getKey();
       tableLock = lockManager.readLock(tableName, purpose);
@@ -609,14 +648,11 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
         tableLock = null;
         return false;
       }
-
-      trySharedLock();
       return true;
     }
 
-    private synchronized void releaseSharedLock(final TableLockManager lockManager) {
+    private synchronized void releaseZkSharedLock(final TableLockManager lockManager) {
       releaseTableLock(lockManager, isSingleSharedLock());
-      releaseSharedLock();
     }
 
     private synchronized boolean tryZkExclusiveLock(final TableLockManager lockManager,
@@ -653,8 +689,44 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
     }
   }
 
+  /**
+   * the namespace is currently used just as a rwlock, not as a queue.
+   * because ns operation are not frequent enough. so we want to avoid
+   * having to move table queues around for suspend/resume.
+   */
+  private static class NamespaceQueue extends Queue<String> {
+    public NamespaceQueue(String namespace) {
+      super(namespace);
+    }
+
+    @Override
+    public boolean requireExclusiveLock(Procedure proc) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void add(final Procedure proc, final boolean addToFront) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Procedure poll() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean isEmpty() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int size() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
   // ============================================================================
-  //  Locking Helpers
+  //  Table Locking Helpers
   // ============================================================================
   /**
    * Try to acquire the exclusive lock on the specified table.
@@ -666,8 +738,12 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
   public boolean tryAcquireTableExclusiveLock(final TableName table, final String purpose) {
     schedLock.lock();
     TableQueue queue = getTableQueue(table);
-    boolean hasXLock = queue.tryExclusiveLock();
-    if (!hasXLock) {
+    if (!queue.getNamespaceQueue().trySharedLock()) {
+      return false;
+    }
+
+    if (!queue.tryExclusiveLock()) {
+      queue.getNamespaceQueue().releaseSharedLock();
       schedLock.unlock();
       return false;
     }
@@ -676,10 +752,11 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
     schedLock.unlock();
 
     // Zk lock is expensive...
-    hasXLock = queue.tryZkExclusiveLock(lockManager, purpose);
+    boolean hasXLock = queue.tryZkExclusiveLock(lockManager, purpose);
     if (!hasXLock) {
       schedLock.lock();
       queue.releaseExclusiveLock();
+      queue.getNamespaceQueue().releaseSharedLock();
       addToRunQueue(tableRunQueue, queue);
       schedLock.unlock();
     }
@@ -700,6 +777,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
 
     schedLock.lock();
     queue.releaseExclusiveLock();
+    queue.getNamespaceQueue().releaseSharedLock();
     addToRunQueue(tableRunQueue, queue);
     schedLock.unlock();
   }
@@ -712,7 +790,29 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
    * @return true if we were able to acquire the lock on the table, otherwise false.
    */
   public boolean tryAcquireTableSharedLock(final TableName table, final String purpose) {
-    return getTableQueueWithLock(table).trySharedLock(lockManager, purpose);
+    schedLock.lock();
+    TableQueue queue = getTableQueue(table);
+    if (!queue.getNamespaceQueue().trySharedLock()) {
+      return false;
+    }
+
+    if (!queue.trySharedLock()) {
+      queue.getNamespaceQueue().releaseSharedLock();
+      schedLock.unlock();
+      return false;
+    }
+
+    schedLock.unlock();
+
+    // Zk lock is expensive...
+    boolean hasXLock = queue.tryZkSharedLock(lockManager, purpose);
+    if (!hasXLock) {
+      schedLock.lock();
+      queue.releaseSharedLock();
+      queue.getNamespaceQueue().releaseSharedLock();
+      schedLock.unlock();
+    }
+    return hasXLock;
   }
 
   /**
@@ -720,7 +820,17 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
    * @param table the name of the table that has the shared lock
    */
   public void releaseTableSharedLock(final TableName table) {
-    getTableQueueWithLock(table).releaseSharedLock(lockManager);
+    schedLock.lock();
+    TableQueue queue = getTableQueue(table);
+    schedLock.unlock();
+
+    // Zk lock is expensive...
+    queue.releaseZkSharedLock(lockManager);
+
+    schedLock.lock();
+    queue.releaseSharedLock();
+    queue.getNamespaceQueue().releaseSharedLock();
+    schedLock.unlock();
   }
 
   /**
@@ -763,12 +873,57 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
   }
 
   // ============================================================================
-  //  Server Locking Helpers
+  //  Namespace Locking Helpers
   // ============================================================================
   /**
+   * Try to acquire the exclusive lock on the specified namespace.
+   * @see #releaseNamespaceExclusiveLock(String)
+   * @param nsName Namespace to lock
+   * @return true if we were able to acquire the lock on the namespace, otherwise false.
+   */
+  public boolean tryAcquireNamespaceExclusiveLock(final String nsName) {
+    schedLock.lock();
+    try {
+      TableQueue tableQueue = getTableQueue(TableName.NAMESPACE_TABLE_NAME);
+      if (!tableQueue.trySharedLock()) return false;
+
+      NamespaceQueue nsQueue = getNamespaceQueue(nsName);
+      boolean hasLock = nsQueue.tryExclusiveLock();
+      if (!hasLock) {
+        tableQueue.releaseSharedLock();
+      }
+      return hasLock;
+    } finally {
+      schedLock.unlock();
+    }
+  }
+
+  /**
    * Release the exclusive lock
-   * @see #tryAcquireServerExclusiveLock(ServerName)
-   * @param serverName the server that has the exclusive lock
+   * @see #tryAcquireNamespaceExclusiveLock(String)
+   * @param nsName the namespace that has the exclusive lock
+   */
+  public void releaseNamespaceExclusiveLock(final String nsName) {
+    schedLock.lock();
+    try {
+      TableQueue tableQueue = getTableQueue(TableName.NAMESPACE_TABLE_NAME);
+      tableQueue.releaseSharedLock();
+
+      NamespaceQueue queue = getNamespaceQueue(nsName);
+      queue.releaseExclusiveLock();
+    } finally {
+      schedLock.unlock();
+    }
+  }
+
+  // ============================================================================
+  //  Server Locking Helpers
+  // ============================================================================
+  /**
+   * Try to acquire the exclusive lock on the specified server.
+   * @see #releaseServerExclusiveLock(ServerName)
+   * @param serverName Server to lock
+   * @return true if we were able to acquire the lock on the server, otherwise false.
    */
   public boolean tryAcquireServerExclusiveLock(final ServerName serverName) {
     schedLock.lock();

http://git-wip-us.apache.org/repos/asf/hbase/blob/ae7cc0c8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyNamespaceProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyNamespaceProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyNamespaceProcedure.java
index 0f8c172..0db2c66 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyNamespaceProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyNamespaceProcedure.java
@@ -192,12 +192,13 @@ public class ModifyNamespaceProcedure
 
   @Override
   protected boolean acquireLock(final MasterProcedureEnv env) {
-    return getTableNamespaceManager(env).acquireExclusiveLock();
+    if (env.waitInitialized(this)) return false;
+    return env.getProcedureQueue().tryAcquireNamespaceExclusiveLock(getNamespaceName());
   }
 
   @Override
   protected void releaseLock(final MasterProcedureEnv env) {
-    getTableNamespaceManager(env).releaseExclusiveLock();
+    env.getProcedureQueue().releaseNamespaceExclusiveLock(getNamespaceName());
   }
 
   @Override
@@ -210,6 +211,10 @@ public class ModifyNamespaceProcedure
     return TableOperationType.EDIT;
   }
 
+  private String getNamespaceName() {
+    return newNsDescriptor.getName();
+  }
+
   /**
    * Action before any real action of adding namespace.
    * @param env MasterProcedureEnv

http://git-wip-us.apache.org/repos/asf/hbase/blob/ae7cc0c8/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
index 106b9fa..6795f2f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.master.TableLockManager;
 import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 
@@ -291,6 +292,58 @@ public class TestMasterProcedureScheduler {
     assertTrue("queue should be deleted", queue.markTableAsDeleted(tableName));
   }
 
+  @Test
+  public void testVerifyNamespaceRwLocks() throws Exception {
+    String nsName1 = "ns1";
+    String nsName2 = "ns2";
+    TableName tableName1 = TableName.valueOf(nsName1, "testtb");
+    TableName tableName2 = TableName.valueOf(nsName2, "testtb");
+    queue.addBack(new TestNamespaceProcedure(1, nsName1,
+          TableProcedureInterface.TableOperationType.EDIT));
+    queue.addBack(new TestTableProcedure(2, tableName1,
+          TableProcedureInterface.TableOperationType.EDIT));
+    queue.addBack(new TestTableProcedure(3, tableName2,
+          TableProcedureInterface.TableOperationType.EDIT));
+    queue.addBack(new TestNamespaceProcedure(4, nsName2,
+          TableProcedureInterface.TableOperationType.EDIT));
+
+    // Fetch the 1st item and take the write lock
+    long procId = queue.poll().getProcId();
+    assertEquals(1, procId);
+    assertEquals(true, queue.tryAcquireNamespaceExclusiveLock(nsName1));
+
+    // System tables have 2 as default priority
+    Procedure proc = queue.poll();
+    assertEquals(4, proc.getProcId());
+    assertEquals(true, queue.tryAcquireNamespaceExclusiveLock(nsName2));
+    queue.releaseNamespaceExclusiveLock(nsName2);
+    queue.yield(proc);
+
+    // table on ns1 is locked, so we get table on ns2
+    procId = queue.poll().getProcId();
+    assertEquals(3, procId);
+    assertEquals(true, queue.tryAcquireTableExclusiveLock(tableName2, "lock " + procId));
+
+    // ns2 is not available (TODO we may avoid this one)
+    proc = queue.poll();
+    assertEquals(4, proc.getProcId());
+    assertEquals(false, queue.tryAcquireNamespaceExclusiveLock(nsName2));
+    queue.yield(proc);
+
+    // release the ns1 lock
+    queue.releaseNamespaceExclusiveLock(nsName1);
+
+    // we are now able to execute table of ns1
+    procId = queue.poll().getProcId();
+    assertEquals(2, procId);
+
+    queue.releaseTableExclusiveLock(tableName2);
+
+    // we are now able to execute ns2
+    procId = queue.poll().getProcId();
+    assertEquals(4, procId);
+  }
+
   /**
    * Verify that "write" operations for a single table are serialized,
    * but different tables can be executed in parallel.
@@ -440,7 +493,7 @@ public class TestMasterProcedureScheduler {
     }
   }
 
-  public static class TestTableProcedure extends Procedure<Void>
+  public static class TestTableProcedure extends TestProcedure
       implements TableProcedureInterface {
     private final TableOperationType opType;
     private final TableName tableName;
@@ -450,9 +503,9 @@ public class TestMasterProcedureScheduler {
     }
 
     public TestTableProcedure(long procId, TableName tableName, TableOperationType opType) {
+      super(procId);
       this.tableName = tableName;
       this.opType = opType;
-      setProcId(procId);
     }
 
     @Override
@@ -464,26 +517,31 @@ public class TestMasterProcedureScheduler {
     public TableOperationType getTableOperationType() {
       return opType;
     }
+  }
 
-    @Override
-    protected Procedure[] execute(Void env) {
-      return null;
-    }
+  public static class TestNamespaceProcedure extends TestProcedure
+      implements TableProcedureInterface {
+    private final TableOperationType opType;
+    private final String nsName;
 
-    @Override
-    protected void rollback(Void env) {
-      throw new UnsupportedOperationException();
+    public TestNamespaceProcedure() {
+      throw new UnsupportedOperationException("recovery should not be triggered here");
     }
 
-    @Override
-    protected boolean abort(Void env) {
-      throw new UnsupportedOperationException();
+    public TestNamespaceProcedure(long procId, String nsName, TableOperationType opType) {
+      super(procId);
+      this.nsName = nsName;
+      this.opType = opType;
     }
 
     @Override
-    protected void serializeStateData(final OutputStream stream) throws IOException {}
+    public TableName getTableName() {
+      return TableName.NAMESPACE_TABLE_NAME;
+    }
 
     @Override
-    protected void deserializeStateData(final InputStream stream) throws IOException {}
+    public TableOperationType getTableOperationType() {
+      return opType;
+    }
   }
 }


[8/9] hbase git commit: update zhangduo affiliation

Posted by sy...@apache.org.
update zhangduo affiliation


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/46ea5494
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/46ea5494
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/46ea5494

Branch: refs/heads/hbase-12439
Commit: 46ea5494a5102e0f6b9f2b56283cf78da7cd2fee
Parents: 4ac8d4c
Author: zhangduo <zh...@apache.org>
Authored: Fri Jan 15 11:53:59 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Fri Jan 15 11:53:59 2016 +0800

----------------------------------------------------------------------
 pom.xml | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/46ea5494/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ccc7eb5..1169c78 100644
--- a/pom.xml
+++ b/pom.xml
@@ -500,8 +500,8 @@
       <name>Duo Zhang</name>
       <email>zhangduo@apache.org</email>
       <timezone>+8</timezone>
-      <organization>Wandoujia</organization>
-      <organizationUrl>http://www.wandoujia.com</organizationUrl>
+      <organization>unaffiliated</organization>
+      <organizationUrl>https://github.com/Apache9</organizationUrl>
     </developer>
     <developer>
       <id>zjushch</id>


[2/9] hbase git commit: HBASE-15104 Occasional failures due to NotServingRegionException in IT tests (Huaxiang Sun)

Posted by sy...@apache.org.
HBASE-15104 Occasional failures due to NotServingRegionException in IT tests (Huaxiang Sun)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/dc57996c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/dc57996c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/dc57996c

Branch: refs/heads/hbase-12439
Commit: dc57996ca6d52b48c7f274956c0deca513ea7a83
Parents: c8b9754
Author: Jonathan M Hsieh <jm...@apache.org>
Authored: Thu Jan 14 07:59:33 2016 -0800
Committer: Jonathan M Hsieh <jm...@apache.org>
Committed: Thu Jan 14 07:59:33 2016 -0800

----------------------------------------------------------------------
 .../chaos/actions/ChangeCompressionAction.java  | 21 +++++++++++++++++++-
 1 file changed, 20 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/dc57996c/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/ChangeCompressionAction.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/ChangeCompressionAction.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/ChangeCompressionAction.java
index 0d7f7ae..9c7bf45 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/ChangeCompressionAction.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/ChangeCompressionAction.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
+import org.apache.hadoop.io.compress.Compressor;
 
 /**
  * Action that changes the compression algorithm on a column family from a list of tables.
@@ -62,7 +63,25 @@ public class ChangeCompressionAction extends Action {
 
     // Since not every compression algorithm is supported,
     // let's use the same algorithm for all column families.
-    Algorithm algo = possibleAlgos[random.nextInt(possibleAlgos.length)];
+
+    // If an unsupported compression algorithm is chosen, pick a different one.
+    // This is to work around the issue that modifyTable() does not throw remote
+    // exception.
+    Algorithm algo;
+    do {
+      algo = possibleAlgos[random.nextInt(possibleAlgos.length)];
+
+      try {
+        Compressor c = algo.getCompressor();
+
+        // call returnCompressor() to release the Compressor
+        algo.returnCompressor(c);
+        break;
+      } catch (Throwable t) {
+        LOG.info("Performing action: Changing compression algorithms to " + algo +
+                " is not supported, pick another one");
+      }
+    } while (true);
 
     LOG.debug("Performing action: Changing compression algorithms on "
       + tableName.getNameAsString() + " to " + algo);


[3/9] hbase git commit: HBASE-14837 Procedure v2 - Procedure Queue Improvement

Posted by sy...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
new file mode 100644
index 0000000..106b9fa
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
@@ -0,0 +1,489 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.procedure;
+
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.TableLockManager;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({MasterTests.class, SmallTests.class})
+public class TestMasterProcedureScheduler {
+  private static final Log LOG = LogFactory.getLog(TestMasterProcedureScheduler.class);
+
+  private MasterProcedureScheduler queue;
+  private Configuration conf;
+
+  @Before
+  public void setUp() throws IOException {
+    conf = HBaseConfiguration.create();
+    queue = new MasterProcedureScheduler(conf, new TableLockManager.NullTableLockManager());
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    assertEquals(0, queue.size());
+  }
+
+  @Test
+  public void testConcurrentCreateDelete() throws Exception {
+    final MasterProcedureScheduler procQueue = queue;
+    final TableName table = TableName.valueOf("testtb");
+    final AtomicBoolean running = new AtomicBoolean(true);
+    final AtomicBoolean failure = new AtomicBoolean(false);
+    Thread createThread = new Thread() {
+      @Override
+      public void run() {
+        try {
+          while (running.get() && !failure.get()) {
+            if (procQueue.tryAcquireTableExclusiveLock(table, "create")) {
+              procQueue.releaseTableExclusiveLock(table);
+            }
+          }
+        } catch (Throwable e) {
+          LOG.error("create failed", e);
+          failure.set(true);
+        }
+      }
+    };
+
+    Thread deleteThread = new Thread() {
+      @Override
+      public void run() {
+        try {
+          while (running.get() && !failure.get()) {
+            if (procQueue.tryAcquireTableExclusiveLock(table, "delete")) {
+              procQueue.releaseTableExclusiveLock(table);
+            }
+            procQueue.markTableAsDeleted(table);
+          }
+        } catch (Throwable e) {
+          LOG.error("delete failed", e);
+          failure.set(true);
+        }
+      }
+    };
+
+    createThread.start();
+    deleteThread.start();
+    for (int i = 0; i < 100 && running.get() && !failure.get(); ++i) {
+      Thread.sleep(100);
+    }
+    running.set(false);
+    createThread.join();
+    deleteThread.join();
+    assertEquals(false, failure.get());
+  }
+
+  /**
+   * Verify simple create/insert/fetch/delete of the table queue.
+   */
+  @Test
+  public void testSimpleTableOpsQueues() throws Exception {
+    final int NUM_TABLES = 10;
+    final int NUM_ITEMS = 10;
+
+    int count = 0;
+    for (int i = 1; i <= NUM_TABLES; ++i) {
+      TableName tableName = TableName.valueOf(String.format("test-%04d", i));
+      // insert items
+      for (int j = 1; j <= NUM_ITEMS; ++j) {
+        queue.addBack(new TestTableProcedure(i * 1000 + j, tableName,
+          TableProcedureInterface.TableOperationType.EDIT));
+        assertEquals(++count, queue.size());
+      }
+    }
+    assertEquals(NUM_TABLES * NUM_ITEMS, queue.size());
+
+    for (int j = 1; j <= NUM_ITEMS; ++j) {
+      for (int i = 1; i <= NUM_TABLES; ++i) {
+        Procedure proc = queue.poll();
+        assertTrue(proc != null);
+        TableName tableName = ((TestTableProcedure)proc).getTableName();
+        queue.tryAcquireTableExclusiveLock(tableName, "test");
+        queue.releaseTableExclusiveLock(tableName);
+        queue.completionCleanup(proc);
+        assertEquals(--count, queue.size());
+        assertEquals(i * 1000 + j, proc.getProcId());
+      }
+    }
+    assertEquals(0, queue.size());
+
+    for (int i = 1; i <= NUM_TABLES; ++i) {
+      TableName tableName = TableName.valueOf(String.format("test-%04d", i));
+      // complete the table deletion
+      assertTrue(queue.markTableAsDeleted(tableName));
+    }
+  }
+
+  /**
+   * Check that the table queue is not deletable until every procedure
+   * in-progress is completed (this is a special case for write-locks).
+   */
+  @Test
+  public void testCreateDeleteTableOperationsWithWriteLock() throws Exception {
+    TableName tableName = TableName.valueOf("testtb");
+
+    queue.addBack(new TestTableProcedure(1, tableName,
+          TableProcedureInterface.TableOperationType.EDIT));
+
+    // table can't be deleted because one item is in the queue
+    assertFalse(queue.markTableAsDeleted(tableName));
+
+    // fetch item and take a lock
+    assertEquals(1, queue.poll().getProcId());
+    // take the xlock
+    assertTrue(queue.tryAcquireTableExclusiveLock(tableName, "write"));
+    // table can't be deleted because we have the lock
+    assertEquals(0, queue.size());
+    assertFalse(queue.markTableAsDeleted(tableName));
+    // release the xlock
+    queue.releaseTableExclusiveLock(tableName);
+    // complete the table deletion
+    assertTrue(queue.markTableAsDeleted(tableName));
+  }
+
+  /**
+   * Check that the table queue is not deletable until every procedure
+   * in-progress is completed (this is a special case for read-locks).
+   */
+  @Test
+  public void testCreateDeleteTableOperationsWithReadLock() throws Exception {
+    final TableName tableName = TableName.valueOf("testtb");
+    final int nitems = 2;
+
+    for (int i = 1; i <= nitems; ++i) {
+      queue.addBack(new TestTableProcedure(i, tableName,
+            TableProcedureInterface.TableOperationType.READ));
+    }
+
+    // table can't be deleted because one item is in the queue
+    assertFalse(queue.markTableAsDeleted(tableName));
+
+    for (int i = 1; i <= nitems; ++i) {
+      // fetch item and take a lock
+      assertEquals(i, queue.poll().getProcId());
+      // take the rlock
+      assertTrue(queue.tryAcquireTableSharedLock(tableName, "read " + i));
+      // table can't be deleted because we have locks and/or items in the queue
+      assertFalse(queue.markTableAsDeleted(tableName));
+    }
+
+    for (int i = 1; i <= nitems; ++i) {
+      // table can't be deleted because we have locks
+      assertFalse(queue.markTableAsDeleted(tableName));
+      // release the rlock
+      queue.releaseTableSharedLock(tableName);
+    }
+
+    // there are no items and no lock in the queeu
+    assertEquals(0, queue.size());
+    // complete the table deletion
+    assertTrue(queue.markTableAsDeleted(tableName));
+  }
+
+  /**
+   * Verify the correct logic of RWLocks on the queue
+   */
+  @Test
+  public void testVerifyRwLocks() throws Exception {
+    TableName tableName = TableName.valueOf("testtb");
+    queue.addBack(new TestTableProcedure(1, tableName,
+          TableProcedureInterface.TableOperationType.EDIT));
+    queue.addBack(new TestTableProcedure(2, tableName,
+          TableProcedureInterface.TableOperationType.READ));
+    queue.addBack(new TestTableProcedure(3, tableName,
+          TableProcedureInterface.TableOperationType.EDIT));
+    queue.addBack(new TestTableProcedure(4, tableName,
+          TableProcedureInterface.TableOperationType.READ));
+    queue.addBack(new TestTableProcedure(5, tableName,
+          TableProcedureInterface.TableOperationType.READ));
+
+    // Fetch the 1st item and take the write lock
+    long procId = queue.poll().getProcId();
+    assertEquals(1, procId);
+    assertEquals(true, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId));
+
+    // Fetch the 2nd item and verify that the lock can't be acquired
+    assertEquals(null, queue.poll(0));
+
+    // Release the write lock and acquire the read lock
+    queue.releaseTableExclusiveLock(tableName);
+
+    // Fetch the 2nd item and take the read lock
+    procId = queue.poll().getProcId();
+    assertEquals(2, procId);
+    assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId));
+
+    // Fetch the 3rd item and verify that the lock can't be acquired
+    procId = queue.poll().getProcId();
+    assertEquals(3, procId);
+    assertEquals(false, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId));
+
+    // release the rdlock of item 2 and take the wrlock for the 3d item
+    queue.releaseTableSharedLock(tableName);
+    assertEquals(true, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId));
+
+    // Fetch 4th item and verify that the lock can't be acquired
+    assertEquals(null, queue.poll(0));
+
+    // Release the write lock and acquire the read lock
+    queue.releaseTableExclusiveLock(tableName);
+
+    // Fetch the 4th item and take the read lock
+    procId = queue.poll().getProcId();
+    assertEquals(4, procId);
+    assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId));
+
+    // Fetch the 4th item and take the read lock
+    procId = queue.poll().getProcId();
+    assertEquals(5, procId);
+    assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId));
+
+    // Release 4th and 5th read-lock
+    queue.releaseTableSharedLock(tableName);
+    queue.releaseTableSharedLock(tableName);
+
+    // remove table queue
+    assertEquals(0, queue.size());
+    assertTrue("queue should be deleted", queue.markTableAsDeleted(tableName));
+  }
+
+  /**
+   * Verify that "write" operations for a single table are serialized,
+   * but different tables can be executed in parallel.
+   */
+  @Test(timeout=90000)
+  public void testConcurrentWriteOps() throws Exception {
+    final TestTableProcSet procSet = new TestTableProcSet(queue);
+
+    final int NUM_ITEMS = 10;
+    final int NUM_TABLES = 4;
+    final AtomicInteger opsCount = new AtomicInteger(0);
+    for (int i = 0; i < NUM_TABLES; ++i) {
+      TableName tableName = TableName.valueOf(String.format("testtb-%04d", i));
+      for (int j = 1; j < NUM_ITEMS; ++j) {
+        procSet.addBack(new TestTableProcedure(i * 100 + j, tableName,
+          TableProcedureInterface.TableOperationType.EDIT));
+        opsCount.incrementAndGet();
+      }
+    }
+    assertEquals(opsCount.get(), queue.size());
+
+    final Thread[] threads = new Thread[NUM_TABLES * 2];
+    final HashSet<TableName> concurrentTables = new HashSet<TableName>();
+    final ArrayList<String> failures = new ArrayList<String>();
+    final AtomicInteger concurrentCount = new AtomicInteger(0);
+    for (int i = 0; i < threads.length; ++i) {
+      threads[i] = new Thread() {
+        @Override
+        public void run() {
+          while (opsCount.get() > 0) {
+            try {
+              TableProcedureInterface proc = procSet.acquire();
+              if (proc == null) {
+                queue.signalAll();
+                if (opsCount.get() > 0) {
+                  continue;
+                }
+                break;
+              }
+              synchronized (concurrentTables) {
+                assertTrue("unexpected concurrency on " + proc.getTableName(),
+                  concurrentTables.add(proc.getTableName()));
+              }
+              assertTrue(opsCount.decrementAndGet() >= 0);
+              try {
+                long procId = ((Procedure)proc).getProcId();
+                TableName tableId = proc.getTableName();
+                int concurrent = concurrentCount.incrementAndGet();
+                assertTrue("inc-concurrent="+ concurrent +" 1 <= concurrent <= "+ NUM_TABLES,
+                  concurrent >= 1 && concurrent <= NUM_TABLES);
+                LOG.debug("[S] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent);
+                Thread.sleep(2000);
+                concurrent = concurrentCount.decrementAndGet();
+                LOG.debug("[E] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent);
+                assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_TABLES);
+              } finally {
+                synchronized (concurrentTables) {
+                  assertTrue(concurrentTables.remove(proc.getTableName()));
+                }
+                procSet.release(proc);
+              }
+            } catch (Throwable e) {
+              LOG.error("Failed " + e.getMessage(), e);
+              synchronized (failures) {
+                failures.add(e.getMessage());
+              }
+            } finally {
+              queue.signalAll();
+            }
+          }
+        }
+      };
+      threads[i].start();
+    }
+    for (int i = 0; i < threads.length; ++i) {
+      threads[i].join();
+    }
+    assertTrue(failures.toString(), failures.isEmpty());
+    assertEquals(0, opsCount.get());
+    assertEquals(0, queue.size());
+
+    for (int i = 1; i <= NUM_TABLES; ++i) {
+      TableName table = TableName.valueOf(String.format("testtb-%04d", i));
+      assertTrue("queue should be deleted, table=" + table, queue.markTableAsDeleted(table));
+    }
+  }
+
+  public static class TestTableProcSet {
+    private final MasterProcedureScheduler queue;
+    private Map<Long, TableProcedureInterface> procsMap =
+      new ConcurrentHashMap<Long, TableProcedureInterface>();
+
+    public TestTableProcSet(final MasterProcedureScheduler queue) {
+      this.queue = queue;
+    }
+
+    public void addBack(TableProcedureInterface tableProc) {
+      Procedure proc = (Procedure)tableProc;
+      procsMap.put(proc.getProcId(), tableProc);
+      queue.addBack(proc);
+    }
+
+    public void addFront(TableProcedureInterface tableProc) {
+      Procedure proc = (Procedure)tableProc;
+      procsMap.put(proc.getProcId(), tableProc);
+      queue.addFront(proc);
+    }
+
+    public TableProcedureInterface acquire() {
+      TableProcedureInterface proc = null;
+      boolean avail = false;
+      while (!avail) {
+        Procedure xProc = queue.poll();
+        proc = xProc != null ? procsMap.remove(xProc.getProcId()) : null;
+        if (proc == null) break;
+        switch (proc.getTableOperationType()) {
+          case CREATE:
+          case DELETE:
+          case EDIT:
+            avail = queue.tryAcquireTableExclusiveLock(proc.getTableName(),
+              "op="+ proc.getTableOperationType());
+            break;
+          case READ:
+            avail = queue.tryAcquireTableSharedLock(proc.getTableName(),
+              "op="+ proc.getTableOperationType());
+            break;
+        }
+        if (!avail) {
+          addFront(proc);
+          LOG.debug("yield procId=" + proc);
+        }
+      }
+      return proc;
+    }
+
+    public void release(TableProcedureInterface proc) {
+      switch (proc.getTableOperationType()) {
+        case CREATE:
+        case DELETE:
+        case EDIT:
+          queue.releaseTableExclusiveLock(proc.getTableName());
+          break;
+        case READ:
+          queue.releaseTableSharedLock(proc.getTableName());
+          break;
+      }
+    }
+  }
+
+  public static class TestTableProcedure extends Procedure<Void>
+      implements TableProcedureInterface {
+    private final TableOperationType opType;
+    private final TableName tableName;
+
+    public TestTableProcedure() {
+      throw new UnsupportedOperationException("recovery should not be triggered here");
+    }
+
+    public TestTableProcedure(long procId, TableName tableName, TableOperationType opType) {
+      this.tableName = tableName;
+      this.opType = opType;
+      setProcId(procId);
+    }
+
+    @Override
+    public TableName getTableName() {
+      return tableName;
+    }
+
+    @Override
+    public TableOperationType getTableOperationType() {
+      return opType;
+    }
+
+    @Override
+    protected Procedure[] execute(Void env) {
+      return null;
+    }
+
+    @Override
+    protected void rollback(Void env) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected boolean abort(Void env) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected void serializeStateData(final OutputStream stream) throws IOException {}
+
+    @Override
+    protected void deserializeStateData(final InputStream stream) throws IOException {}
+  }
+}


[9/9] hbase git commit: HBASE-15117 Resolve ICAST findbugs warnings in current codes (Yu Li)

Posted by sy...@apache.org.
HBASE-15117 Resolve ICAST findbugs warnings in current codes (Yu Li)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cb17c7a9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cb17c7a9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cb17c7a9

Branch: refs/heads/hbase-12439
Commit: cb17c7a97a1e2eb0ebd532f614191e4edbb9e49b
Parents: 46ea549
Author: stack <st...@apache.org>
Authored: Fri Jan 15 06:00:11 2016 -0800
Committer: stack <st...@apache.org>
Committed: Fri Jan 15 06:00:11 2016 -0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/regionserver/compactions/Compactor.java    | 2 +-
 .../main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/cb17c7a9/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 660ea91..3728c7c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -335,7 +335,7 @@ public abstract class Compactor {
     int minFilesToCompact = Math.max(2,
         conf.getInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY,
             /* old name */ conf.getInt("hbase.hstore.compactionThreshold", 3)));
-    long shippedCallSizeLimit = minFilesToCompact * HConstants.DEFAULT_BLOCKSIZE;
+    long shippedCallSizeLimit = (long) minFilesToCompact * HConstants.DEFAULT_BLOCKSIZE;
     try {
       do {
         hasMore = scanner.next(cells, scannerContext);

http://git-wip-us.apache.org/repos/asf/hbase/blob/cb17c7a9/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
index a574410..c0b2ce8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
@@ -270,7 +270,7 @@ public class ExportSnapshot extends Configured implements Tool {
       InputStream in = openSourceFile(context, inputInfo);
       int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
       if (Integer.MAX_VALUE != bandwidthMB) {
-        in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024);
+        in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L);
       }
 
       try {


[7/9] hbase git commit: HBASE-14865 Support passing multiple QOPs to SaslClient/Server via hbase.rpc.protection (Appy)

Posted by sy...@apache.org.
HBASE-14865 Support passing multiple QOPs to SaslClient/Server via hbase.rpc.protection (Appy)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4ac8d4ce
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4ac8d4ce
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4ac8d4ce

Branch: refs/heads/hbase-12439
Commit: 4ac8d4ce610a107112acb6aa070157691c022e90
Parents: ae7cc0c
Author: tedyu <yu...@gmail.com>
Authored: Thu Jan 14 16:39:52 2016 -0800
Committer: tedyu <yu...@gmail.com>
Committed: Thu Jan 14 16:39:52 2016 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/ipc/AsyncRpcChannel.java       |  33 +-
 .../hbase/security/HBaseSaslRpcClient.java      |   8 +-
 .../hbase/security/SaslClientHandler.java       |  30 +-
 .../apache/hadoop/hbase/security/SaslUtil.java  |  74 +++--
 .../hbase/security/TestHBaseSaslRpcClient.java  | 309 ++++++++++++++++++
 .../hadoop/hbase/security/TestSaslUtil.java     |  59 ++++
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  |   4 +-
 .../hbase/security/HBaseSaslRpcServer.java      |   9 +-
 .../hbase/security/AbstractTestSecureIPC.java   | 245 ++++++++++++++
 .../hbase/security/TestAsyncSecureIPC.java      |  33 ++
 .../hbase/security/TestHBaseSaslRpcClient.java  | 325 -------------------
 .../hadoop/hbase/security/TestSecureIPC.java    |  33 ++
 .../hadoop/hbase/security/TestSecureRPC.java    | 216 ------------
 13 files changed, 751 insertions(+), 627 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/4ac8d4ce/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
index 44e8322..69978fc 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
@@ -32,7 +32,6 @@ import io.netty.util.concurrent.Promise;
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
-import java.net.SocketException;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
@@ -104,8 +103,7 @@ public class AsyncRpcChannel {
   final String serviceName;
   final InetSocketAddress address;
 
-  private int ioFailureCounter = 0;
-  private int connectFailureCounter = 0;
+  private int failureCounter = 0;
 
   boolean useSasl;
   AuthMethod authMethod;
@@ -134,7 +132,7 @@ public class AsyncRpcChannel {
    * @param bootstrap to construct channel on
    * @param client    to connect with
    * @param ticket of user which uses connection
-   *               @param serviceName name of service to connect to
+   * @param serviceName name of service to connect to
    * @param address to connect to
    */
   public AsyncRpcChannel(Bootstrap bootstrap, final AsyncRpcClient client, User ticket, String
@@ -166,11 +164,7 @@ public class AsyncRpcChannel {
           @Override
           public void operationComplete(final ChannelFuture f) throws Exception {
             if (!f.isSuccess()) {
-              if (f.cause() instanceof SocketException) {
-                retryOrClose(bootstrap, connectFailureCounter++, f.cause());
-              } else {
-                retryOrClose(bootstrap, ioFailureCounter++, f.cause());
-              }
+              retryOrClose(bootstrap, failureCounter++, client.failureSleep, f.cause());
               return;
             }
             channel = f.channel();
@@ -263,13 +257,8 @@ public class AsyncRpcChannel {
               // Handle Sasl failure. Try to potentially get new credentials
               handleSaslConnectionFailure(retryCount, cause, realTicket);
 
-              // Try to reconnect
-              client.newTimeout(new TimerTask() {
-                @Override
-                public void run(Timeout timeout) throws Exception {
-                  connect(bootstrap);
-                }
-              }, random.nextInt(reloginMaxBackoff) + 1, TimeUnit.MILLISECONDS);
+              retryOrClose(bootstrap, failureCounter++, random.nextInt(reloginMaxBackoff) + 1,
+                  cause);
             } catch (IOException | InterruptedException e) {
               close(e);
             }
@@ -286,16 +275,18 @@ public class AsyncRpcChannel {
    * Retry to connect or close
    *
    * @param bootstrap      to connect with
-   * @param connectCounter amount of tries
+   * @param failureCount   failure count
    * @param e              exception of fail
    */
-  private void retryOrClose(final Bootstrap bootstrap, int connectCounter, Throwable e) {
-    if (connectCounter < client.maxRetries) {
+  private void retryOrClose(final Bootstrap bootstrap, int failureCount,
+      long timeout, Throwable e) {
+    if (failureCount < client.maxRetries) {
       client.newTimeout(new TimerTask() {
-        @Override public void run(Timeout timeout) throws Exception {
+        @Override
+        public void run(Timeout timeout) throws Exception {
           connect(bootstrap);
         }
-      }, client.failureSleep, TimeUnit.MILLISECONDS);
+      }, timeout, TimeUnit.MILLISECONDS);
     } else {
       client.failedServers.addToFailedServers(address);
       close(e);

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ac8d4ce/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java
index ab3ee0e..ce32ed9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java
@@ -25,6 +25,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Map;
 
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
@@ -59,6 +60,7 @@ public class HBaseSaslRpcClient {
 
   private final SaslClient saslClient;
   private final boolean fallbackAllowed;
+  protected final Map<String, String> saslProps;
   /**
    * Create a HBaseSaslRpcClient for an authentication method
    *
@@ -96,7 +98,7 @@ public class HBaseSaslRpcClient {
       Token<? extends TokenIdentifier> token, String serverPrincipal, boolean fallbackAllowed,
       String rpcProtection) throws IOException {
     this.fallbackAllowed = fallbackAllowed;
-    SaslUtil.initSaslProperties(rpcProtection);
+    saslProps = SaslUtil.initSaslProperties(rpcProtection);
     switch (method) {
     case DIGEST:
       if (LOG.isDebugEnabled())
@@ -138,13 +140,13 @@ public class HBaseSaslRpcClient {
       String saslDefaultRealm, CallbackHandler saslClientCallbackHandler)
       throws IOException {
     return Sasl.createSaslClient(mechanismNames, null, null, saslDefaultRealm,
-        SaslUtil.SASL_PROPS, saslClientCallbackHandler);
+        saslProps, saslClientCallbackHandler);
   }
 
   protected SaslClient createKerberosSaslClient(String[] mechanismNames,
       String userFirstPart, String userSecondPart) throws IOException {
     return Sasl.createSaslClient(mechanismNames, null, userFirstPart,
-        userSecondPart, SaslUtil.SASL_PROPS, null);
+        userSecondPart, saslProps, null);
   }
 
   private static void readStatus(DataInputStream inStream) throws IOException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ac8d4ce/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java
index f52987b..bfb625b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java
@@ -41,6 +41,7 @@ import javax.security.sasl.SaslException;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.security.PrivilegedExceptionAction;
+import java.util.Map;
 import java.util.Random;
 
 /**
@@ -58,6 +59,7 @@ public class SaslClientHandler extends ChannelDuplexHandler {
    * Used for client or server's token to send or receive from each other.
    */
   private final SaslClient saslClient;
+  private final Map<String, String> saslProps;
   private final SaslExceptionHandler exceptionHandler;
   private final SaslSuccessfulConnectHandler successfulConnectHandler;
   private byte[] saslToken;
@@ -67,8 +69,6 @@ public class SaslClientHandler extends ChannelDuplexHandler {
   private Random random;
 
   /**
-   * Constructor
-   *
    * @param ticket                   the ugi
    * @param method                   auth method
    * @param token                    for Sasl
@@ -76,8 +76,6 @@ public class SaslClientHandler extends ChannelDuplexHandler {
    * @param fallbackAllowed          True if server may also fall back to less secure connection
    * @param rpcProtection            Quality of protection. Can be 'authentication', 'integrity' or
    *                                 'privacy'.
-   * @param exceptionHandler         handler for exceptions
-   * @param successfulConnectHandler handler for succesful connects
    * @throws java.io.IOException if handler could not be created
    */
   public SaslClientHandler(UserGroupInformation ticket, AuthMethod method,
@@ -90,7 +88,7 @@ public class SaslClientHandler extends ChannelDuplexHandler {
     this.exceptionHandler = exceptionHandler;
     this.successfulConnectHandler = successfulConnectHandler;
 
-    SaslUtil.initSaslProperties(rpcProtection);
+    saslProps = SaslUtil.initSaslProperties(rpcProtection);
     switch (method) {
     case DIGEST:
       if (LOG.isDebugEnabled())
@@ -125,32 +123,23 @@ public class SaslClientHandler extends ChannelDuplexHandler {
 
   /**
    * Create a Digest Sasl client
-   *
-   * @param mechanismNames            names of mechanisms
-   * @param saslDefaultRealm          default realm for sasl
-   * @param saslClientCallbackHandler handler for the client
-   * @return new SaslClient
-   * @throws java.io.IOException if creation went wrong
    */
   protected SaslClient createDigestSaslClient(String[] mechanismNames, String saslDefaultRealm,
       CallbackHandler saslClientCallbackHandler) throws IOException {
-    return Sasl.createSaslClient(mechanismNames, null, null, saslDefaultRealm, SaslUtil.SASL_PROPS,
+    return Sasl.createSaslClient(mechanismNames, null, null, saslDefaultRealm, saslProps,
         saslClientCallbackHandler);
   }
 
   /**
    * Create Kerberos client
    *
-   * @param mechanismNames names of mechanisms
    * @param userFirstPart  first part of username
    * @param userSecondPart second part of username
-   * @return new SaslClient
-   * @throws java.io.IOException if fails
    */
   protected SaslClient createKerberosSaslClient(String[] mechanismNames, String userFirstPart,
       String userSecondPart) throws IOException {
     return Sasl
-        .createSaslClient(mechanismNames, null, userFirstPart, userSecondPart, SaslUtil.SASL_PROPS,
+        .createSaslClient(mechanismNames, null, userFirstPart, userSecondPart, saslProps,
             null);
   }
 
@@ -269,11 +258,6 @@ public class SaslClientHandler extends ChannelDuplexHandler {
     }
   }
 
-  /**
-   * Write SASL token
-   * @param ctx to write to
-   * @param saslToken to write
-   */
   private void writeSaslToken(final ChannelHandlerContext ctx, byte[] saslToken) {
     ByteBuf b = ctx.alloc().buffer(4 + saslToken.length);
     b.writeInt(saslToken.length);
@@ -290,9 +274,6 @@ public class SaslClientHandler extends ChannelDuplexHandler {
 
   /**
    * Get the read status
-   *
-   * @param inStream to read
-   * @throws org.apache.hadoop.ipc.RemoteException if status was not success
    */
   private static void readStatus(ByteBuf inStream) throws RemoteException {
     int status = inStream.readInt(); // read status
@@ -360,7 +341,6 @@ public class SaslClientHandler extends ChannelDuplexHandler {
      *
      * @param retryCount current retry count
      * @param random     to create new backoff with
-     * @param cause      of fail
      */
     public void handle(int retryCount, Random random, Throwable cause);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ac8d4ce/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java
index f2f3393..b505fc0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java
@@ -32,24 +32,31 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 public class SaslUtil {
   private static final Log log = LogFactory.getLog(SaslUtil.class);
   public static final String SASL_DEFAULT_REALM = "default";
-  public static final Map<String, String> SASL_PROPS =
-      new TreeMap<String, String>();
   public static final int SWITCH_TO_SIMPLE_AUTH = -88;
 
-  public static enum QualityOfProtection {
+  public enum QualityOfProtection {
     AUTHENTICATION("auth"),
     INTEGRITY("auth-int"),
     PRIVACY("auth-conf");
 
-    public final String saslQop;
+    private final String saslQop;
 
-    private QualityOfProtection(String saslQop) {
+    QualityOfProtection(String saslQop) {
       this.saslQop = saslQop;
     }
 
     public String getSaslQop() {
       return saslQop;
     }
+
+    public boolean matches(String stringQop) {
+      if (saslQop.equals(stringQop)) {
+        log.warn("Use authentication/integrity/privacy as value for rpc protection "
+            + "configurations instead of auth/auth-int/auth-conf.");
+        return true;
+      }
+      return name().equalsIgnoreCase(stringQop);
+    }
   }
 
   /** Splitting fully qualified Kerberos name into parts */
@@ -71,40 +78,39 @@ public class SaslUtil {
 
   /**
    * Returns {@link org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection}
-   * corresponding to the given {@code stringQop} value. Returns null if value is
-   * invalid.
+   * corresponding to the given {@code stringQop} value.
+   * @throws IllegalArgumentException If stringQop doesn't match any QOP.
    */
   public static QualityOfProtection getQop(String stringQop) {
-    QualityOfProtection qop = null;
-    if (QualityOfProtection.AUTHENTICATION.name().toLowerCase().equals(stringQop)
-        || QualityOfProtection.AUTHENTICATION.saslQop.equals(stringQop)) {
-      qop = QualityOfProtection.AUTHENTICATION;
-    } else if (QualityOfProtection.INTEGRITY.name().toLowerCase().equals(stringQop)
-        || QualityOfProtection.INTEGRITY.saslQop.equals(stringQop)) {
-      qop = QualityOfProtection.INTEGRITY;
-    } else if (QualityOfProtection.PRIVACY.name().toLowerCase().equals(stringQop)
-        || QualityOfProtection.PRIVACY.saslQop.equals(stringQop)) {
-      qop = QualityOfProtection.PRIVACY;
-    }
-    if (qop == null) {
-      throw new IllegalArgumentException("Invalid qop: " +  stringQop
-          + ". It must be one of 'authentication', 'integrity', 'privacy'.");
+    for (QualityOfProtection qop : QualityOfProtection.values()) {
+      if (qop.matches(stringQop)) {
+        return qop;
+      }
     }
-    if (QualityOfProtection.AUTHENTICATION.saslQop.equals(stringQop)
-        || QualityOfProtection.INTEGRITY.saslQop.equals(stringQop)
-        || QualityOfProtection.PRIVACY.saslQop.equals(stringQop)) {
-      log.warn("Use authentication/integrity/privacy as value for rpc protection "
-          + "configurations instead of auth/auth-int/auth-conf.");
-    }
-    return qop;
+    throw new IllegalArgumentException("Invalid qop: " +  stringQop
+        + ". It must be one of 'authentication', 'integrity', 'privacy'.");
   }
 
-  static void initSaslProperties(String rpcProtection) {
-    QualityOfProtection saslQOP = getQop(rpcProtection);
-    if (saslQOP == null) {
-      saslQOP = QualityOfProtection.AUTHENTICATION;
+  /**
+   * @param rpcProtection Value of 'hbase.rpc.protection' configuration.
+   * @return Map with values for SASL properties.
+   */
+  static Map<String, String> initSaslProperties(String rpcProtection) {
+    String saslQop;
+    if (rpcProtection.isEmpty()) {
+      saslQop = QualityOfProtection.AUTHENTICATION.getSaslQop();
+    } else {
+      String[] qops = rpcProtection.split(",");
+      StringBuilder saslQopBuilder = new StringBuilder();
+      for (int i = 0; i < qops.length; ++i) {
+        QualityOfProtection qop = getQop(qops[i]);
+        saslQopBuilder.append(",").append(qop.getSaslQop());
+      }
+      saslQop = saslQopBuilder.substring(1);  // remove first ','
     }
-    SaslUtil.SASL_PROPS.put(Sasl.QOP, saslQOP.getSaslQop());
-    SaslUtil.SASL_PROPS.put(Sasl.SERVER_AUTH, "true");
+    Map<String, String> saslProps = new TreeMap<>();
+    saslProps.put(Sasl.QOP, saslQop);
+    saslProps.put(Sasl.SERVER_AUTH, "true");
+    return saslProps;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ac8d4ce/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java
new file mode 100644
index 0000000..0e3aeab
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.TextOutputCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.SaslClient;
+
+import org.apache.hadoop.hbase.testclassification.SecurityTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcClient.SaslClientCallbackHandler;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mockito;
+
+import com.google.common.base.Strings;
+
+@Category({SecurityTests.class, SmallTests.class})
+public class TestHBaseSaslRpcClient {
+
+  static {
+    System.setProperty("java.security.krb5.realm", "DOMAIN.COM");
+    System.setProperty("java.security.krb5.kdc", "DOMAIN.COM");
+  }
+
+  static final String DEFAULT_USER_NAME = "principal";
+  static final String DEFAULT_USER_PASSWORD = "password";
+
+  private static final Logger LOG = Logger.getLogger(TestHBaseSaslRpcClient.class);
+
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @BeforeClass
+  public static void before() {
+    Logger.getRootLogger().setLevel(Level.DEBUG);
+  }
+
+  @Test
+  public void testSaslClientUsesGivenRpcProtection() throws Exception {
+    Token<? extends TokenIdentifier> token = createTokenMockWithCredentials(DEFAULT_USER_NAME,
+        DEFAULT_USER_PASSWORD);
+    for (SaslUtil.QualityOfProtection qop : SaslUtil.QualityOfProtection.values()) {
+      String negotiatedQop = new HBaseSaslRpcClient(AuthMethod.DIGEST, token,
+          "principal/host@DOMAIN.COM", false, qop.name()) {
+        public String getQop() {
+          return saslProps.get(Sasl.QOP);
+        }
+      }.getQop();
+      assertEquals(negotiatedQop, qop.getSaslQop());
+    }
+  }
+
+  @Test
+  public void testSaslClientCallbackHandler() throws UnsupportedCallbackException {
+    final Token<? extends TokenIdentifier> token = createTokenMock();
+    when(token.getIdentifier()).thenReturn(DEFAULT_USER_NAME.getBytes());
+    when(token.getPassword()).thenReturn(DEFAULT_USER_PASSWORD.getBytes());
+
+    final NameCallback nameCallback = mock(NameCallback.class);
+    final PasswordCallback passwordCallback = mock(PasswordCallback.class);
+    final RealmCallback realmCallback = mock(RealmCallback.class);
+    final RealmChoiceCallback realmChoiceCallback = mock(RealmChoiceCallback.class);
+
+    Callback[] callbackArray = {nameCallback, passwordCallback,
+        realmCallback, realmChoiceCallback};
+    final SaslClientCallbackHandler saslClCallbackHandler = new SaslClientCallbackHandler(token);
+    saslClCallbackHandler.handle(callbackArray);
+    verify(nameCallback).setName(anyString());
+    verify(realmCallback).setText(anyString());
+    verify(passwordCallback).setPassword(any(char[].class));
+  }
+
+  @Test
+  public void testSaslClientCallbackHandlerWithException() {
+    final Token<? extends TokenIdentifier> token = createTokenMock();
+    when(token.getIdentifier()).thenReturn(DEFAULT_USER_NAME.getBytes());
+    when(token.getPassword()).thenReturn(DEFAULT_USER_PASSWORD.getBytes());
+    final SaslClientCallbackHandler saslClCallbackHandler = new SaslClientCallbackHandler(token);
+    try {
+      saslClCallbackHandler.handle(new Callback[] { mock(TextOutputCallback.class) });
+    } catch (UnsupportedCallbackException expEx) {
+      //expected
+    } catch (Exception ex) {
+      fail("testSaslClientCallbackHandlerWithException error : " + ex.getMessage());
+    }
+  }
+
+  @Test
+  public void testHBaseSaslRpcClientCreation() throws Exception {
+    //creation kerberos principal check section
+    assertFalse(assertSuccessCreationKerberosPrincipal(null));
+    assertFalse(assertSuccessCreationKerberosPrincipal("DOMAIN.COM"));
+    assertFalse(assertSuccessCreationKerberosPrincipal("principal/DOMAIN.COM"));
+    if (!assertSuccessCreationKerberosPrincipal("principal/localhost@DOMAIN.COM")) {
+      // XXX: This can fail if kerberos support in the OS is not sane, see HBASE-10107.
+      // For now, don't assert, just warn
+      LOG.warn("Could not create a SASL client with valid Kerberos credential");
+    }
+
+    //creation digest principal check section
+    assertFalse(assertSuccessCreationDigestPrincipal(null, null));
+    assertFalse(assertSuccessCreationDigestPrincipal("", ""));
+    assertFalse(assertSuccessCreationDigestPrincipal("", null));
+    assertFalse(assertSuccessCreationDigestPrincipal(null, ""));
+    assertTrue(assertSuccessCreationDigestPrincipal(DEFAULT_USER_NAME, DEFAULT_USER_PASSWORD));
+
+    //creation simple principal check section
+    assertFalse(assertSuccessCreationSimplePrincipal("", ""));
+    assertFalse(assertSuccessCreationSimplePrincipal(null, null));
+    assertFalse(assertSuccessCreationSimplePrincipal(DEFAULT_USER_NAME, DEFAULT_USER_PASSWORD));
+
+    //exceptions check section
+    assertTrue(assertIOExceptionThenSaslClientIsNull(DEFAULT_USER_NAME, DEFAULT_USER_PASSWORD));
+    assertTrue(assertIOExceptionWhenGetStreamsBeforeConnectCall(
+        DEFAULT_USER_NAME, DEFAULT_USER_PASSWORD));
+  }
+
+  @Test
+  public void testAuthMethodReadWrite() throws IOException {
+    DataInputBuffer in = new DataInputBuffer();
+    DataOutputBuffer out = new DataOutputBuffer();
+
+    assertAuthMethodRead(in, AuthMethod.SIMPLE);
+    assertAuthMethodRead(in, AuthMethod.KERBEROS);
+    assertAuthMethodRead(in, AuthMethod.DIGEST);
+
+    assertAuthMethodWrite(out, AuthMethod.SIMPLE);
+    assertAuthMethodWrite(out, AuthMethod.KERBEROS);
+    assertAuthMethodWrite(out, AuthMethod.DIGEST);
+  }
+
+  private void assertAuthMethodRead(DataInputBuffer in, AuthMethod authMethod)
+      throws IOException {
+    in.reset(new byte[] {authMethod.code}, 1);
+    assertEquals(authMethod, AuthMethod.read(in));
+  }
+
+  private void assertAuthMethodWrite(DataOutputBuffer out, AuthMethod authMethod)
+      throws IOException {
+    authMethod.write(out);
+    assertEquals(authMethod.code, out.getData()[0]);
+    out.reset();
+  }
+
+  private boolean assertIOExceptionWhenGetStreamsBeforeConnectCall(String principal,
+      String password) throws IOException {
+    boolean inState = false;
+    boolean outState = false;
+
+    HBaseSaslRpcClient rpcClient = new HBaseSaslRpcClient(AuthMethod.DIGEST,
+        createTokenMockWithCredentials(principal, password), principal, false) {
+      @Override
+      public SaslClient createDigestSaslClient(String[] mechanismNames,
+          String saslDefaultRealm, CallbackHandler saslClientCallbackHandler)
+              throws IOException {
+        return Mockito.mock(SaslClient.class);
+      }
+
+      @Override
+      public SaslClient createKerberosSaslClient(String[] mechanismNames,
+          String userFirstPart, String userSecondPart) throws IOException {
+        return Mockito.mock(SaslClient.class);
+      }
+    };
+
+    try {
+      rpcClient.getInputStream(Mockito.mock(InputStream.class));
+    } catch(IOException ex) {
+      //Sasl authentication exchange hasn't completed yet
+      inState = true;
+    }
+
+    try {
+      rpcClient.getOutputStream(Mockito.mock(OutputStream.class));
+    } catch(IOException ex) {
+      //Sasl authentication exchange hasn't completed yet
+      outState = true;
+    }
+
+    return inState && outState;
+  }
+
+  private boolean assertIOExceptionThenSaslClientIsNull(String principal, String password) {
+    try {
+      new HBaseSaslRpcClient(AuthMethod.DIGEST,
+          createTokenMockWithCredentials(principal, password), principal, false) {
+        @Override
+        public SaslClient createDigestSaslClient(String[] mechanismNames,
+            String saslDefaultRealm, CallbackHandler saslClientCallbackHandler)
+                throws IOException {
+          return null;
+        }
+
+        @Override
+        public SaslClient createKerberosSaslClient(String[] mechanismNames,
+            String userFirstPart, String userSecondPart) throws IOException {
+          return null;
+        }
+      };
+      return false;
+    } catch (IOException ex) {
+      return true;
+    }
+  }
+
+  private boolean assertSuccessCreationKerberosPrincipal(String principal) {
+    HBaseSaslRpcClient rpcClient = null;
+    try {
+      rpcClient = createSaslRpcClientForKerberos(principal);
+    } catch(Exception ex) {
+      LOG.error(ex.getMessage(), ex);
+    }
+    return rpcClient != null;
+  }
+
+  private boolean assertSuccessCreationDigestPrincipal(String principal, String password) {
+    HBaseSaslRpcClient rpcClient = null;
+    try {
+      rpcClient = new HBaseSaslRpcClient(AuthMethod.DIGEST,
+          createTokenMockWithCredentials(principal, password), principal, false);
+    } catch(Exception ex) {
+      LOG.error(ex.getMessage(), ex);
+    }
+    return rpcClient != null;
+  }
+
+  private boolean assertSuccessCreationSimplePrincipal(String principal, String password) {
+    HBaseSaslRpcClient rpcClient = null;
+    try {
+      rpcClient = createSaslRpcClientSimple(principal, password);
+    } catch(Exception ex) {
+      LOG.error(ex.getMessage(), ex);
+    }
+    return rpcClient != null;
+  }
+
+  private HBaseSaslRpcClient createSaslRpcClientForKerberos(String principal)
+      throws IOException {
+    return new HBaseSaslRpcClient(AuthMethod.KERBEROS, createTokenMock(), principal, false);
+  }
+
+  private Token<? extends TokenIdentifier> createTokenMockWithCredentials(
+      String principal, String password)
+      throws IOException {
+    Token<? extends TokenIdentifier> token = createTokenMock();
+    if (!Strings.isNullOrEmpty(principal) && !Strings.isNullOrEmpty(password)) {
+      when(token.getIdentifier()).thenReturn(DEFAULT_USER_NAME.getBytes());
+      when(token.getPassword()).thenReturn(DEFAULT_USER_PASSWORD.getBytes());
+    }
+    return token;
+  }
+
+  private HBaseSaslRpcClient createSaslRpcClientSimple(String principal, String password)
+      throws IOException {
+    return new HBaseSaslRpcClient(AuthMethod.SIMPLE, createTokenMock(), principal, false);
+  }
+
+  @SuppressWarnings("unchecked")
+  private Token<? extends TokenIdentifier> createTokenMock() {
+    return mock(Token.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ac8d4ce/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestSaslUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestSaslUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestSaslUtil.java
new file mode 100644
index 0000000..6c99739
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestSaslUtil.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.hbase.testclassification.SecurityTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+
+import javax.security.sasl.Sasl;
+import java.util.Map;
+
+@Category({SecurityTests.class, SmallTests.class})
+public class TestSaslUtil {
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @Test
+  public void testInitSaslProperties() {
+    Map<String, String> props;
+
+    props = SaslUtil.initSaslProperties("integrity");
+    assertEquals(props.get(Sasl.QOP), "auth-int");
+
+    props = SaslUtil.initSaslProperties("privacy,authentication");
+    assertEquals(props.get(Sasl.QOP), "auth-conf,auth");
+
+    props = SaslUtil.initSaslProperties("integrity,authentication,privacy");
+    assertEquals(props.get(Sasl.QOP), "auth-int,auth,auth-conf");
+
+    exception.expect(IllegalArgumentException.class);
+    props = SaslUtil.initSaslProperties("xyz");
+    assertEquals(props.get(Sasl.QOP), "auth");
+
+    exception.expect(IllegalArgumentException.class);
+    props = SaslUtil.initSaslProperties("");
+    assertEquals(props.get(Sasl.QOP), "auth");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ac8d4ce/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index ed8d37d..d32fca7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -1413,7 +1413,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
               }
               saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
                   .getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
-                  SaslUtil.SASL_PROPS, new SaslDigestCallbackHandler(
+                  HBaseSaslRpcServer.getSaslProps(), new SaslDigestCallbackHandler(
                       secretManager, this));
               break;
             default:
@@ -1433,7 +1433,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
                 public Object run() throws SaslException {
                   saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
                       .getMechanismName(), names[0], names[1],
-                      SaslUtil.SASL_PROPS, new SaslGssCallbackHandler());
+                      HBaseSaslRpcServer.getSaslProps(), new SaslGssCallbackHandler());
                   return null;
                 }
               });

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ac8d4ce/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java
index b9e56d9..450db64 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.security;
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.util.Map;
 
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
@@ -48,11 +49,17 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 public class HBaseSaslRpcServer {
   private static final Log LOG = LogFactory.getLog(HBaseSaslRpcServer.class);
 
+  private static Map<String, String> saslProps = null;
+
   public static void init(Configuration conf) {
-    SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection", 
+    saslProps = SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection",
           QualityOfProtection.AUTHENTICATION.name().toLowerCase()));
   }
 
+  public static Map<String, String> getSaslProps() {
+    return saslProps;
+  }
+
   public static <T extends TokenIdentifier> T getIdentifier(String id,
       SecretManager<T> secretManager) throws InvalidToken {
     byte[] tokenId = SaslUtil.decodeIdentifier(id);

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ac8d4ce/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java
new file mode 100644
index 0000000..6145838
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java
@@ -0,0 +1,245 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting;
+import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
+import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConfiguration;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientFactory;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.ipc.RpcServerInterface;
+import org.apache.hadoop.hbase.ipc.TestDelayedRpc.TestDelayedImplementation;
+import org.apache.hadoop.hbase.ipc.TestDelayedRpc.TestThread;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mockito;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.BlockingRpcChannel;
+import com.google.protobuf.BlockingService;
+
+import javax.security.sasl.SaslException;
+
+public abstract class AbstractTestSecureIPC {
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static final File KEYTAB_FILE = new File(TEST_UTIL.getDataTestDir("keytab").toUri()
+      .getPath());
+
+  private static MiniKdc KDC;
+  private static String HOST = "localhost";
+  private static String PRINCIPAL;
+
+  String krbKeytab;
+  String krbPrincipal;
+  UserGroupInformation ugi;
+  Configuration clientConf;
+  Configuration serverConf;
+
+  abstract Class<? extends RpcClient> getRpcClientClass();
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    Properties conf = MiniKdc.createConf();
+    conf.put(MiniKdc.DEBUG, true);
+    KDC = new MiniKdc(conf, new File(TEST_UTIL.getDataTestDir("kdc").toUri().getPath()));
+    KDC.start();
+    PRINCIPAL = "hbase/" + HOST;
+    KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL);
+    HBaseKerberosUtils.setKeytabFileForTesting(KEYTAB_FILE.getAbsolutePath());
+    HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    if (KDC != null) {
+      KDC.stop();
+    }
+    TEST_UTIL.cleanupTestDir();
+  }
+
+  @Before
+  public void setUpTest() throws Exception {
+    krbKeytab = getKeytabFileForTesting();
+    krbPrincipal = getPrincipalForTesting();
+    ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
+    clientConf = getSecuredConfiguration();
+    clientConf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, getRpcClientClass().getName());
+    serverConf = getSecuredConfiguration();
+  }
+
+  @Test
+  public void testRpcCallWithEnabledKerberosSaslAuth() throws Exception {
+    UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
+
+    // check that the login user is okay:
+    assertSame(ugi, ugi2);
+    assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
+    assertEquals(krbPrincipal, ugi.getUserName());
+
+    callRpcService(User.create(ugi2));
+  }
+
+  @Test
+  public void testRpcFallbackToSimpleAuth() throws Exception {
+    String clientUsername = "testuser";
+    UserGroupInformation clientUgi = UserGroupInformation.createUserForTesting(clientUsername,
+        new String[]{clientUsername});
+
+    // check that the client user is insecure
+    assertNotSame(ugi, clientUgi);
+    assertEquals(AuthenticationMethod.SIMPLE, clientUgi.getAuthenticationMethod());
+    assertEquals(clientUsername, clientUgi.getUserName());
+
+    clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
+    serverConf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, true);
+    callRpcService(User.create(clientUgi));
+  }
+
+  void setRpcProtection(String clientProtection, String serverProtection) {
+    clientConf.set("hbase.rpc.protection", clientProtection);
+    serverConf.set("hbase.rpc.protection", serverProtection);
+  }
+
+  /**
+   * Test various combinations of Server and Client qops.
+   * @throws Exception
+   */
+  @Test
+  public void testSaslWithCommonQop() throws Exception {
+    setRpcProtection("privacy,authentication", "authentication");
+    callRpcService(User.create(ugi));
+
+    setRpcProtection("authentication", "privacy,authentication");
+    callRpcService(User.create(ugi));
+
+    setRpcProtection("integrity,authentication", "privacy,authentication");
+    callRpcService(User.create(ugi));
+  }
+
+  @Test
+  public void testSaslNoCommonQop() throws Exception {
+    exception.expect(SaslException.class);
+    exception.expectMessage("No common protection layer between client and server");
+    setRpcProtection("integrity", "privacy");
+    callRpcService(User.create(ugi));
+  }
+
+  private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal)
+      throws Exception {
+    Configuration cnf = new Configuration();
+    cnf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+    UserGroupInformation.setConfiguration(cnf);
+    UserGroupInformation.loginUserFromKeytab(krbPrincipal, krbKeytab);
+    return UserGroupInformation.getLoginUser();
+  }
+
+  /**
+   * Sets up a RPC Server and a Client. Does a RPC checks the result. If an exception is thrown
+   * from the stub, this function will throw root cause of that exception.
+   */
+  private void callRpcService(User clientUser) throws Exception {
+    SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
+    Mockito.when(securityInfoMock.getServerPrincipal())
+        .thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
+    SecurityInfo.addInfo("TestDelayedService", securityInfoMock);
+
+    boolean delayReturnValue = false;
+    InetSocketAddress isa = new InetSocketAddress(HOST, 0);
+    TestDelayedImplementation instance = new TestDelayedImplementation(delayReturnValue);
+    BlockingService service =
+        TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
+
+    RpcServerInterface rpcServer =
+        new RpcServer(null, "testSecuredDelayedRpc",
+            Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), isa,
+            serverConf, new FifoRpcScheduler(serverConf, 1));
+    rpcServer.start();
+    RpcClient rpcClient =
+        RpcClientFactory.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString());
+    try {
+      InetSocketAddress address = rpcServer.getListenerAddress();
+      if (address == null) {
+        throw new IOException("Listener channel is closed");
+      }
+      BlockingRpcChannel channel =
+          rpcClient.createBlockingRpcChannel(
+            ServerName.valueOf(address.getHostName(), address.getPort(),
+            System.currentTimeMillis()), clientUser, 0);
+      TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
+          TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
+      List<Integer> results = new ArrayList<>();
+      TestThread th1 = new TestThread(stub, true, results);
+      final Throwable exception[] = new Throwable[1];
+          Collections.synchronizedList(new ArrayList<Throwable>());
+      Thread.UncaughtExceptionHandler exceptionHandler =
+          new Thread.UncaughtExceptionHandler() {
+            public void uncaughtException(Thread th, Throwable ex) {
+              exception[0] = ex;
+            }
+          };
+      th1.setUncaughtExceptionHandler(exceptionHandler);
+      th1.start();
+      th1.join();
+      if (exception[0] != null) {
+        // throw root cause.
+        while (exception[0].getCause() != null) {
+          exception[0] = exception[0].getCause();
+        }
+        throw (Exception) exception[0];
+      }
+
+      assertEquals(0xDEADBEEF, results.get(0).intValue());
+    } finally {
+      rpcClient.close();
+      rpcServer.stop();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ac8d4ce/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestAsyncSecureIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestAsyncSecureIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestAsyncSecureIPC.java
new file mode 100644
index 0000000..ea37915
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestAsyncSecureIPC.java
@@ -0,0 +1,33 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.testclassification.SecurityTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.experimental.categories.Category;
+
+@Category({ SecurityTests.class, SmallTests.class })
+public class TestAsyncSecureIPC extends AbstractTestSecureIPC {
+
+  Class<? extends RpcClient> getRpcClientClass() {
+    return AsyncRpcClient.class;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ac8d4ce/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java
deleted file mode 100644
index 96ed986..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.security;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.TextOutputCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.RealmCallback;
-import javax.security.sasl.RealmChoiceCallback;
-import javax.security.sasl.SaslClient;
-
-import org.apache.hadoop.hbase.testclassification.SecurityTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.security.HBaseSaslRpcClient.SaslClientCallbackHandler;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
-import org.mockito.Mockito;
-
-import com.google.common.base.Strings;
-
-@Category({SecurityTests.class, SmallTests.class})
-public class TestHBaseSaslRpcClient {
-  
-  static {
-    System.setProperty("java.security.krb5.realm", "DOMAIN.COM");
-    System.setProperty("java.security.krb5.kdc", "DOMAIN.COM");
-  }
-  
-  static final String DEFAULT_USER_NAME = "principal";
-  static final String DEFAULT_USER_PASSWORD = "password";
-
-  private static final Logger LOG = Logger.getLogger(TestHBaseSaslRpcClient.class);
-
-
-  @Rule
-  public ExpectedException exception = ExpectedException.none();
-
-  @BeforeClass
-  public static void before() {
-    Logger.getRootLogger().setLevel(Level.DEBUG);
-  }
-
-  @Test
-  public void testSaslQOPNotEmpty() throws Exception {
-    Token<? extends TokenIdentifier> token = createTokenMockWithCredentials(DEFAULT_USER_NAME,
-        DEFAULT_USER_PASSWORD);
-    // default QOP is authentication
-    new HBaseSaslRpcClient(AuthMethod.DIGEST, token, "principal/host@DOMAIN.COM", false);
-    assertTrue(SaslUtil.SASL_PROPS.get(Sasl.QOP).equals(SaslUtil.QualityOfProtection.
-        AUTHENTICATION.getSaslQop()));
-
-    // check with specific QOPs
-    new HBaseSaslRpcClient(AuthMethod.DIGEST, token, "principal/host@DOMAIN.COM", false,
-        "authentication");
-    assertTrue(SaslUtil.SASL_PROPS.get(Sasl.QOP).equals(SaslUtil.QualityOfProtection.
-        AUTHENTICATION.getSaslQop()));
-
-    new HBaseSaslRpcClient(AuthMethod.DIGEST, token, "principal/host@DOMAIN.COM", false,
-        "privacy");
-    assertTrue(SaslUtil.SASL_PROPS.get(Sasl.QOP).equals(SaslUtil.QualityOfProtection.
-        PRIVACY.getSaslQop()));
-
-    new HBaseSaslRpcClient(AuthMethod.DIGEST, token, "principal/host@DOMAIN.COM", false,
-        "integrity");
-    assertTrue(SaslUtil.SASL_PROPS.get(Sasl.QOP).equals(SaslUtil.QualityOfProtection.
-        INTEGRITY.getSaslQop()));
-
-    exception.expect(IllegalArgumentException.class);
-    new HBaseSaslRpcClient(AuthMethod.DIGEST, token, "principal/host@DOMAIN.COM", false,
-        "wrongvalue");
-  }
-
-  @Test
-  public void testSaslClientCallbackHandler() throws UnsupportedCallbackException {
-    final Token<? extends TokenIdentifier> token = createTokenMock();
-    when(token.getIdentifier()).thenReturn(DEFAULT_USER_NAME.getBytes());
-    when(token.getPassword()).thenReturn(DEFAULT_USER_PASSWORD.getBytes());
-
-    final NameCallback nameCallback = mock(NameCallback.class);
-    final PasswordCallback passwordCallback = mock(PasswordCallback.class);
-    final RealmCallback realmCallback = mock(RealmCallback.class);
-    final RealmChoiceCallback realmChoiceCallback = mock(RealmChoiceCallback.class);
-
-    Callback[] callbackArray = {nameCallback, passwordCallback,
-        realmCallback, realmChoiceCallback};
-    final SaslClientCallbackHandler saslClCallbackHandler = new SaslClientCallbackHandler(token);
-    saslClCallbackHandler.handle(callbackArray);
-    verify(nameCallback).setName(anyString());
-    verify(realmCallback).setText(anyString());
-    verify(passwordCallback).setPassword(any(char[].class));
-  }
-
-  @Test
-  public void testSaslClientCallbackHandlerWithException() {
-    final Token<? extends TokenIdentifier> token = createTokenMock();
-    when(token.getIdentifier()).thenReturn(DEFAULT_USER_NAME.getBytes());
-    when(token.getPassword()).thenReturn(DEFAULT_USER_PASSWORD.getBytes());
-    final SaslClientCallbackHandler saslClCallbackHandler = new SaslClientCallbackHandler(token);
-    try {
-      saslClCallbackHandler.handle(new Callback[] { mock(TextOutputCallback.class) });
-    } catch (UnsupportedCallbackException expEx) {
-      //expected
-    } catch (Exception ex) {
-      fail("testSaslClientCallbackHandlerWithException error : " + ex.getMessage());
-    }
-  }
-
-  @Test
-  public void testHBaseSaslRpcClientCreation() throws Exception {
-    //creation kerberos principal check section
-    assertFalse(assertSuccessCreationKerberosPrincipal(null));
-    assertFalse(assertSuccessCreationKerberosPrincipal("DOMAIN.COM"));
-    assertFalse(assertSuccessCreationKerberosPrincipal("principal/DOMAIN.COM"));
-    if (!assertSuccessCreationKerberosPrincipal("principal/localhost@DOMAIN.COM")) {
-      // XXX: This can fail if kerberos support in the OS is not sane, see HBASE-10107.
-      // For now, don't assert, just warn
-      LOG.warn("Could not create a SASL client with valid Kerberos credential");
-    }
-
-    //creation digest principal check section
-    assertFalse(assertSuccessCreationDigestPrincipal(null, null));
-    assertFalse(assertSuccessCreationDigestPrincipal("", ""));
-    assertFalse(assertSuccessCreationDigestPrincipal("", null));
-    assertFalse(assertSuccessCreationDigestPrincipal(null, ""));
-    assertTrue(assertSuccessCreationDigestPrincipal(DEFAULT_USER_NAME, DEFAULT_USER_PASSWORD));
-
-    //creation simple principal check section
-    assertFalse(assertSuccessCreationSimplePrincipal("", ""));
-    assertFalse(assertSuccessCreationSimplePrincipal(null, null));
-    assertFalse(assertSuccessCreationSimplePrincipal(DEFAULT_USER_NAME, DEFAULT_USER_PASSWORD));
-
-    //exceptions check section
-    assertTrue(assertIOExceptionThenSaslClientIsNull(DEFAULT_USER_NAME, DEFAULT_USER_PASSWORD));
-    assertTrue(assertIOExceptionWhenGetStreamsBeforeConnectCall(
-        DEFAULT_USER_NAME, DEFAULT_USER_PASSWORD));
-  }
-
-  @Test
-  public void testAuthMethodReadWrite() throws IOException {
-    DataInputBuffer in = new DataInputBuffer();
-    DataOutputBuffer out = new DataOutputBuffer();
-
-    assertAuthMethodRead(in, AuthMethod.SIMPLE);
-    assertAuthMethodRead(in, AuthMethod.KERBEROS);
-    assertAuthMethodRead(in, AuthMethod.DIGEST);
-
-    assertAuthMethodWrite(out, AuthMethod.SIMPLE);
-    assertAuthMethodWrite(out, AuthMethod.KERBEROS);
-    assertAuthMethodWrite(out, AuthMethod.DIGEST);
-  }
-
-  private void assertAuthMethodRead(DataInputBuffer in, AuthMethod authMethod)
-      throws IOException {
-    in.reset(new byte[] {authMethod.code}, 1);
-    assertEquals(authMethod, AuthMethod.read(in));
-  }
-
-  private void assertAuthMethodWrite(DataOutputBuffer out, AuthMethod authMethod)
-      throws IOException {
-    authMethod.write(out);
-    assertEquals(authMethod.code, out.getData()[0]);
-    out.reset();
-  }
-
-  private boolean assertIOExceptionWhenGetStreamsBeforeConnectCall(String principal,
-      String password) throws IOException {
-    boolean inState = false;
-    boolean outState = false;
-
-    HBaseSaslRpcClient rpcClient = new HBaseSaslRpcClient(AuthMethod.DIGEST, 
-        createTokenMockWithCredentials(principal, password), principal, false) {
-      @Override
-      public SaslClient createDigestSaslClient(String[] mechanismNames,
-          String saslDefaultRealm, CallbackHandler saslClientCallbackHandler)
-              throws IOException {
-        return Mockito.mock(SaslClient.class);
-      }
-
-      @Override
-      public SaslClient createKerberosSaslClient(String[] mechanismNames,
-          String userFirstPart, String userSecondPart) throws IOException {
-        return Mockito.mock(SaslClient.class);
-      }
-    };
-    
-    try {
-      rpcClient.getInputStream(Mockito.mock(InputStream.class));
-    } catch(IOException ex) {
-      //Sasl authentication exchange hasn't completed yet
-      inState = true;
-    }
-
-    try {
-      rpcClient.getOutputStream(Mockito.mock(OutputStream.class));
-    } catch(IOException ex) {
-      //Sasl authentication exchange hasn't completed yet
-      outState = true;
-    }
-
-    return inState && outState;
-  }
-
-  private boolean assertIOExceptionThenSaslClientIsNull(String principal, String password) {
-    try {
-      new HBaseSaslRpcClient(AuthMethod.DIGEST, 
-          createTokenMockWithCredentials(principal, password), principal, false) {
-        @Override
-        public SaslClient createDigestSaslClient(String[] mechanismNames,
-            String saslDefaultRealm, CallbackHandler saslClientCallbackHandler)
-                throws IOException {
-          return null;
-        }
-  
-        @Override
-        public SaslClient createKerberosSaslClient(String[] mechanismNames,
-            String userFirstPart, String userSecondPart) throws IOException {
-          return null;
-        }
-      };
-      return false;
-    } catch (IOException ex) {
-      return true;
-    }
-  }
-
-  private boolean assertSuccessCreationKerberosPrincipal(String principal) {
-    HBaseSaslRpcClient rpcClient = null;
-    try {
-      rpcClient = createSaslRpcClientForKerberos(principal);
-    } catch(Exception ex) {
-      LOG.error(ex.getMessage(), ex);
-    }
-    return rpcClient != null;
-  }
-
-  private boolean assertSuccessCreationDigestPrincipal(String principal, String password) {
-    HBaseSaslRpcClient rpcClient = null;
-    try {
-      rpcClient = new HBaseSaslRpcClient(AuthMethod.DIGEST, 
-          createTokenMockWithCredentials(principal, password), principal, false);
-    } catch(Exception ex) {
-      LOG.error(ex.getMessage(), ex);
-    }
-    return rpcClient != null;
-  }
-
-  private boolean assertSuccessCreationSimplePrincipal(String principal, String password) {
-    HBaseSaslRpcClient rpcClient = null;
-    try {
-      rpcClient = createSaslRpcClientSimple(principal, password);
-    } catch(Exception ex) {
-      LOG.error(ex.getMessage(), ex);
-    }
-    return rpcClient != null;
-  }
-
-  private HBaseSaslRpcClient createSaslRpcClientForKerberos(String principal)
-      throws IOException {
-    return new HBaseSaslRpcClient(AuthMethod.KERBEROS, createTokenMock(), principal, false);
-  }
-
-  private Token<? extends TokenIdentifier> createTokenMockWithCredentials(
-      String principal, String password)
-      throws IOException {
-    Token<? extends TokenIdentifier> token = createTokenMock();
-    if (!Strings.isNullOrEmpty(principal) && !Strings.isNullOrEmpty(password)) {
-      when(token.getIdentifier()).thenReturn(DEFAULT_USER_NAME.getBytes());
-      when(token.getPassword()).thenReturn(DEFAULT_USER_PASSWORD.getBytes());
-    }
-    return token;
-  }
-
-  private HBaseSaslRpcClient createSaslRpcClientSimple(String principal, String password)
-      throws IOException {
-    return new HBaseSaslRpcClient(AuthMethod.SIMPLE, createTokenMock(), principal, false);
-  }
-
-  @SuppressWarnings("unchecked")
-  private Token<? extends TokenIdentifier> createTokenMock() {
-    return mock(Token.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ac8d4ce/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java
new file mode 100644
index 0000000..98ea221
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java
@@ -0,0 +1,33 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientImpl;
+import org.apache.hadoop.hbase.testclassification.SecurityTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.experimental.categories.Category;
+
+@Category({ SecurityTests.class, SmallTests.class })
+public class TestSecureIPC extends AbstractTestSecureIPC {
+
+  Class<? extends RpcClient> getRpcClientClass() {
+    return RpcClientImpl.class;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4ac8d4ce/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
deleted file mode 100644
index 769e014..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.security;
-
-import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting;
-import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
-import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConfiguration;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertSame;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
-import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
-import org.apache.hadoop.hbase.ipc.RpcClient;
-import org.apache.hadoop.hbase.ipc.RpcClientFactory;
-import org.apache.hadoop.hbase.ipc.RpcClientImpl;
-import org.apache.hadoop.hbase.ipc.RpcServer;
-import org.apache.hadoop.hbase.ipc.RpcServerInterface;
-import org.apache.hadoop.hbase.ipc.TestDelayedRpc.TestDelayedImplementation;
-import org.apache.hadoop.hbase.ipc.TestDelayedRpc.TestThread;
-import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos;
-import org.apache.hadoop.hbase.testclassification.SecurityTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.minikdc.MiniKdc;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.mockito.Mockito;
-
-import com.google.common.collect.Lists;
-import com.google.protobuf.BlockingRpcChannel;
-import com.google.protobuf.BlockingService;
-
-@Category({ SecurityTests.class, SmallTests.class })
-public class TestSecureRPC {
-
-  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-
-  private static final File KEYTAB_FILE = new File(TEST_UTIL.getDataTestDir("keytab").toUri()
-      .getPath());
-
-  private static MiniKdc KDC;
-
-  private static String HOST = "localhost";
-
-  private static String PRINCIPAL;
-
-  @BeforeClass
-  public static void setUp() throws Exception {
-    Properties conf = MiniKdc.createConf();
-    conf.put(MiniKdc.DEBUG, true);
-    KDC = new MiniKdc(conf, new File(TEST_UTIL.getDataTestDir("kdc").toUri().getPath()));
-    KDC.start();
-    PRINCIPAL = "hbase/" + HOST;
-    KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL);
-    HBaseKerberosUtils.setKeytabFileForTesting(KEYTAB_FILE.getAbsolutePath());
-    HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
-  }
-
-  @AfterClass
-  public static void tearDown() throws IOException {
-    if (KDC != null) {
-      KDC.stop();
-    }
-    TEST_UTIL.cleanupTestDir();
-  }
-
-  @Test
-  public void testRpc() throws Exception {
-    testRpcCallWithEnabledKerberosSaslAuth(RpcClientImpl.class);
-  }
-
-  @Test
-  public void testRpcWithInsecureFallback() throws Exception {
-    testRpcFallbackToSimpleAuth(RpcClientImpl.class);
-  }
-
-  @Test
-  public void testAsyncRpc() throws Exception {
-    testRpcCallWithEnabledKerberosSaslAuth(AsyncRpcClient.class);
-  }
-
-  @Test
-  public void testAsyncRpcWithInsecureFallback() throws Exception {
-    testRpcFallbackToSimpleAuth(AsyncRpcClient.class);
-  }
-
-  private void testRpcCallWithEnabledKerberosSaslAuth(Class<? extends RpcClient> rpcImplClass)
-      throws Exception {
-    String krbKeytab = getKeytabFileForTesting();
-    String krbPrincipal = getPrincipalForTesting();
-
-    UserGroupInformation ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
-    UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
-
-    // check that the login user is okay:
-    assertSame(ugi, ugi2);
-    assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
-    assertEquals(krbPrincipal, ugi.getUserName());
-
-    Configuration clientConf = getSecuredConfiguration();
-    callRpcService(rpcImplClass, User.create(ugi2), clientConf, false);
-  }
-
-  private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal)
-      throws Exception {
-    Configuration cnf = new Configuration();
-    cnf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
-    UserGroupInformation.setConfiguration(cnf);
-    UserGroupInformation.loginUserFromKeytab(krbPrincipal, krbKeytab);
-    return UserGroupInformation.getLoginUser();
-  }
-
-  private void callRpcService(Class<? extends RpcClient> rpcImplClass, User clientUser,
-                              Configuration clientConf, boolean allowInsecureFallback)
-      throws Exception {
-    Configuration clientConfCopy = new Configuration(clientConf);
-    clientConfCopy.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcImplClass.getName());
-
-    Configuration conf = getSecuredConfiguration();
-    conf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, allowInsecureFallback);
-
-    SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
-    Mockito.when(securityInfoMock.getServerPrincipal())
-        .thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
-    SecurityInfo.addInfo("TestDelayedService", securityInfoMock);
-
-    boolean delayReturnValue = false;
-    InetSocketAddress isa = new InetSocketAddress(HOST, 0);
-    TestDelayedImplementation instance = new TestDelayedImplementation(delayReturnValue);
-    BlockingService service =
-        TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
-
-    RpcServerInterface rpcServer =
-        new RpcServer(null, "testSecuredDelayedRpc",
-            Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), isa,
-            conf, new FifoRpcScheduler(conf, 1));
-    rpcServer.start();
-    RpcClient rpcClient =
-        RpcClientFactory.createClient(clientConfCopy, HConstants.DEFAULT_CLUSTER_ID.toString());
-    try {
-      InetSocketAddress address = rpcServer.getListenerAddress();
-      if (address == null) {
-        throw new IOException("Listener channel is closed");
-      }
-      BlockingRpcChannel channel =
-          rpcClient.createBlockingRpcChannel(
-            ServerName.valueOf(address.getHostName(), address.getPort(),
-            System.currentTimeMillis()), clientUser, 5000);
-      TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
-          TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
-      List<Integer> results = new ArrayList<Integer>();
-      TestThread th1 = new TestThread(stub, true, results);
-      th1.start();
-      th1.join();
-
-      assertEquals(0xDEADBEEF, results.get(0).intValue());
-    } finally {
-      rpcClient.close();
-      rpcServer.stop();
-    }
-  }
-
-  public void testRpcFallbackToSimpleAuth(Class<? extends RpcClient> rpcImplClass) throws Exception {
-    String krbKeytab = getKeytabFileForTesting();
-    String krbPrincipal = getPrincipalForTesting();
-
-    UserGroupInformation ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
-    assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
-    assertEquals(krbPrincipal, ugi.getUserName());
-
-    String clientUsername = "testuser";
-    UserGroupInformation clientUgi = UserGroupInformation.createUserForTesting(clientUsername,
-        new String[]{clientUsername});
-
-    // check that the client user is insecure
-    assertNotSame(ugi, clientUgi);
-    assertEquals(AuthenticationMethod.SIMPLE, clientUgi.getAuthenticationMethod());
-    assertEquals(clientUsername, clientUgi.getUserName());
-
-    Configuration clientConf = new Configuration();
-    clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
-    callRpcService(rpcImplClass, User.create(clientUgi), clientConf, true);
-  }
-}
\ No newline at end of file


[5/9] hbase git commit: HBASE-14837 Procedure v2 - Procedure Queue Improvement

Posted by sy...@apache.org.
HBASE-14837 Procedure v2 - Procedure Queue Improvement


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/18a48af2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/18a48af2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/18a48af2

Branch: refs/heads/hbase-12439
Commit: 18a48af2424a9a45d24c08014d4948e3274513a1
Parents: dc57996
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Thu Jan 14 08:29:10 2016 -0800
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Thu Jan 14 09:24:42 2016 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/ProcedureInfo.java  |    4 +-
 .../hbase/procedure2/ProcedureExecutor.java     |    3 +-
 .../procedure2/ProcedureFairRunQueues.java      |  174 ---
 .../hbase/procedure2/ProcedureRunnableSet.java  |    4 +-
 .../procedure2/ProcedureSimpleRunQueue.java     |    8 +-
 .../procedure2/TestProcedureFairRunQueues.java  |  155 ---
 .../org/apache/hadoop/hbase/master/HMaster.java |   39 +-
 .../procedure/AddColumnFamilyProcedure.java     |    7 +-
 .../procedure/CreateNamespaceProcedure.java     |    4 +-
 .../master/procedure/CreateTableProcedure.java  |    2 +-
 .../procedure/DeleteColumnFamilyProcedure.java  |    7 +-
 .../master/procedure/DeleteTableProcedure.java  |    2 +-
 .../master/procedure/DisableTableProcedure.java |    7 +-
 .../master/procedure/EnableTableProcedure.java  |    7 +-
 .../master/procedure/MasterProcedureEnv.java    |   34 +-
 .../master/procedure/MasterProcedureQueue.java  |  578 --------
 .../procedure/MasterProcedureScheduler.java     | 1241 ++++++++++++++++++
 .../procedure/ModifyColumnFamilyProcedure.java  |    7 +-
 .../master/procedure/ModifyTableProcedure.java  |    7 +-
 .../master/procedure/ServerCrashProcedure.java  |   11 +-
 .../procedure/ServerProcedureInterface.java     |   14 +-
 .../procedure/TruncateTableProcedure.java       |    2 +-
 .../apache/hadoop/hbase/master/TestMaster.java  |    8 +-
 .../hbase/master/TestMasterNoCluster.java       |    2 +-
 .../procedure/TestMasterProcedureEvents.java    |  179 +++
 .../procedure/TestMasterProcedureQueue.java     |  484 -------
 .../procedure/TestMasterProcedureScheduler.java |  489 +++++++
 27 files changed, 2020 insertions(+), 1459 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java
index b7ea47e..fca2eac 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java
@@ -224,10 +224,10 @@ public class ProcedureInfo implements Cloneable {
       procProto.getOwner(),
       procProto.getState(),
       procProto.hasParentId() ? procProto.getParentId() : -1,
-          procProto.getState() == ProcedureState.ROLLEDBACK ? procProto.getException() : null,
+      procProto.hasException() ? procProto.getException() : null,
       procProto.getLastUpdate(),
       procProto.getStartTime(),
-      procProto.getState() == ProcedureState.FINISHED ? procProto.getResult().toByteArray() : null);
+      procProto.hasResult() ? procProto.getResult().toByteArray() : null);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 11073c6..74d28d7 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -785,8 +785,7 @@ public class ProcedureExecutor<TEnvironment> {
    */
   private void execLoop() {
     while (isRunning()) {
-      Long procId = runnables.poll();
-      Procedure proc = procId != null ? procedures.get(procId) : null;
+      Procedure proc = runnables.poll();
       if (proc == null) continue;
 
       try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java
deleted file mode 100644
index 242ae86..0000000
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.procedure2;
-
-import java.util.Map;
-
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.ConcurrentSkipListMap;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-
-/**
- * This class is a container of queues that allows to select a queue
- * in a round robin fashion, considering priority of the queue.
- *
- * the quantum is just how many poll() will return the same object.
- * e.g. if quantum is 1 and you have A and B as object you'll get: A B A B
- * e.g. if quantum is 2 and you have A and B as object you'll get: A A B B A A B B
- * then the object priority is just a priority * quantum
- *
- * Example:
- *  - three queues (A, B, C) with priorities (1, 1, 2)
- *  - The first poll() will return A
- *  - The second poll() will return B
- *  - The third and forth poll() will return C
- *  - and so on again and again.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class ProcedureFairRunQueues<TKey, TQueue extends ProcedureFairRunQueues.FairObject> {
-  private ConcurrentSkipListMap<TKey, TQueue> objMap =
-    new ConcurrentSkipListMap<TKey, TQueue>();
-
-  private final ReentrantLock lock = new ReentrantLock();
-  private final int quantum;
-
-  private Map.Entry<TKey, TQueue> current = null;
-  private int currentQuantum = 0;
-
-  public interface FairObject {
-    boolean isAvailable();
-    int getPriority();
-  }
-
-  /**
-   * @param quantum how many poll() will return the same object.
-   */
-  public ProcedureFairRunQueues(final int quantum) {
-    this.quantum = quantum;
-  }
-
-  public TQueue get(final TKey key) {
-    return objMap.get(key);
-  }
-
-  public TQueue add(final TKey key, final TQueue queue) {
-    TQueue oldq = objMap.putIfAbsent(key, queue);
-    return oldq != null ? oldq : queue;
-  }
-
-  public TQueue remove(final TKey key) {
-    TQueue queue = objMap.get(key);
-    if (queue != null) {
-      lock.lock();
-      try {
-        queue = objMap.remove(key);
-        if (current != null && queue == current.getValue()) {
-          currentQuantum = 0;
-          current = null;
-        }
-      } finally {
-        lock.unlock();
-      }
-    }
-    return queue;
-  }
-
-  public void clear() {
-    lock.lock();
-    try {
-      currentQuantum = 0;
-      current = null;
-      objMap.clear();
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * @return the next available item if present
-   */
-  public TQueue poll() {
-    lock.lock();
-    try {
-      TQueue queue;
-      if (currentQuantum == 0) {
-        if (nextObject() == null) {
-          // nothing here
-          return null;
-        }
-
-        queue = current.getValue();
-        currentQuantum = calculateQuantum(queue) - 1;
-      } else {
-        currentQuantum--;
-        queue = current.getValue();
-      }
-
-      if (!queue.isAvailable()) {
-        Map.Entry<TKey, TQueue> last = current;
-        // Try the next one
-        do {
-          if (nextObject() == null)
-            return null;
-        } while (current.getValue() != last.getValue() && !current.getValue().isAvailable());
-
-        queue = current.getValue();
-        currentQuantum = calculateQuantum(queue) - 1;
-      }
-
-      return queue;
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder builder = new StringBuilder();
-    builder.append('{');
-    for (Map.Entry<TKey, TQueue> entry: objMap.entrySet()) {
-      builder.append(entry.getKey());
-      builder.append(':');
-      builder.append(entry.getValue());
-    }
-    builder.append('}');
-    return builder.toString();
-  }
-
-  private Map.Entry<TKey, TQueue> nextObject() {
-    Map.Entry<TKey, TQueue> next = null;
-
-    // If we have already a key, try the next one
-    if (current != null) {
-      next = objMap.higherEntry(current.getKey());
-    }
-
-    // if there is no higher key, go back to the first
-    current = (next != null) ? next : objMap.firstEntry();
-    return current;
-  }
-
-  private int calculateQuantum(final TQueue fairObject) {
-    // TODO
-    return Math.max(1, fairObject.getPriority() * quantum);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java
index 2d7ba39..65df692 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java
@@ -55,9 +55,9 @@ public interface ProcedureRunnableSet {
 
   /**
    * Fetch one Procedure from the queue
-   * @return the Procedure ID to execute, or null if nothing present.
+   * @return the Procedure to execute, or null if nothing present.
    */
-  Long poll();
+  Procedure poll();
 
   /**
    * In case the class is blocking on poll() waiting for items to be added,

http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java
index 7b17fb2..d23680d 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class ProcedureSimpleRunQueue implements ProcedureRunnableSet {
-  private final Deque<Long> runnables = new ArrayDeque<Long>();
+  private final Deque<Procedure> runnables = new ArrayDeque<Procedure>();
   private final ReentrantLock lock = new ReentrantLock();
   private final Condition waitCond = lock.newCondition();
 
@@ -40,7 +40,7 @@ public class ProcedureSimpleRunQueue implements ProcedureRunnableSet {
   public void addFront(final Procedure proc) {
     lock.lock();
     try {
-      runnables.addFirst(proc.getProcId());
+      runnables.addFirst(proc);
       waitCond.signal();
     } finally {
       lock.unlock();
@@ -51,7 +51,7 @@ public class ProcedureSimpleRunQueue implements ProcedureRunnableSet {
   public void addBack(final Procedure proc) {
     lock.lock();
     try {
-      runnables.addLast(proc.getProcId());
+      runnables.addLast(proc);
       waitCond.signal();
     } finally {
       lock.unlock();
@@ -65,7 +65,7 @@ public class ProcedureSimpleRunQueue implements ProcedureRunnableSet {
 
   @Override
   @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
-  public Long poll() {
+  public Procedure poll() {
     lock.lock();
     try {
       if (runnables.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureFairRunQueues.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureFairRunQueues.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureFairRunQueues.java
deleted file mode 100644
index e36a295..0000000
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureFairRunQueues.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.procedure2;
-
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.testclassification.MasterTests;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import static org.junit.Assert.assertEquals;
-
-@Category({MasterTests.class, SmallTests.class})
-public class TestProcedureFairRunQueues {
-  private static class TestRunQueue implements ProcedureFairRunQueues.FairObject {
-    private final int priority;
-    private final String name;
-
-    private boolean available = true;
-
-    public TestRunQueue(String name, int priority) {
-      this.name = name;
-      this.priority = priority;
-    }
-
-    @Override
-    public String toString() {
-      return name;
-    }
-
-    private void setAvailable(boolean available) {
-      this.available = available;
-    }
-
-    @Override
-    public boolean isAvailable() {
-      return available;
-    }
-
-    @Override
-    public int getPriority() {
-      return priority;
-    }
-  }
-
-  @Test
-  public void testEmptyFairQueues() throws Exception {
-    ProcedureFairRunQueues<String, TestRunQueue> fairq
-      = new ProcedureFairRunQueues<String, TestRunQueue>(1);
-    for (int i = 0; i < 3; ++i) {
-      assertEquals(null, fairq.poll());
-    }
-  }
-
-  @Test
-  public void testFairQueues() throws Exception {
-    ProcedureFairRunQueues<String, TestRunQueue> fairq
-      = new ProcedureFairRunQueues<String, TestRunQueue>(1);
-    TestRunQueue a = fairq.add("A", new TestRunQueue("A", 1));
-    TestRunQueue b = fairq.add("B", new TestRunQueue("B", 1));
-    TestRunQueue m = fairq.add("M", new TestRunQueue("M", 2));
-
-    for (int i = 0; i < 3; ++i) {
-      assertEquals(a, fairq.poll());
-      assertEquals(b, fairq.poll());
-      assertEquals(m, fairq.poll());
-      assertEquals(m, fairq.poll());
-    }
-  }
-
-  @Test
-  public void testFairQueuesNotAvailable() throws Exception {
-    ProcedureFairRunQueues<String, TestRunQueue> fairq
-      = new ProcedureFairRunQueues<String, TestRunQueue>(1);
-    TestRunQueue a = fairq.add("A", new TestRunQueue("A", 1));
-    TestRunQueue b = fairq.add("B", new TestRunQueue("B", 1));
-    TestRunQueue m = fairq.add("M", new TestRunQueue("M", 2));
-
-    // m is not available
-    m.setAvailable(false);
-    for (int i = 0; i < 3; ++i) {
-      assertEquals(a, fairq.poll());
-      assertEquals(b, fairq.poll());
-    }
-
-    // m is available
-    m.setAvailable(true);
-    for (int i = 0; i < 3; ++i) {
-      assertEquals(m, fairq.poll());
-      assertEquals(m, fairq.poll());
-      assertEquals(a, fairq.poll());
-      assertEquals(b, fairq.poll());
-    }
-
-    // b is not available
-    b.setAvailable(false);
-    for (int i = 0; i < 3; ++i) {
-      assertEquals(m, fairq.poll());
-      assertEquals(m, fairq.poll());
-      assertEquals(a, fairq.poll());
-    }
-
-    assertEquals(m, fairq.poll());
-    m.setAvailable(false);
-    // m should be fetched next, but is no longer available
-    assertEquals(a, fairq.poll());
-    assertEquals(a, fairq.poll());
-    b.setAvailable(true);
-    for (int i = 0; i < 3; ++i) {
-      assertEquals(b, fairq.poll());
-      assertEquals(a, fairq.poll());
-    }
-  }
-
-  @Test
-  public void testFairQueuesDelete() throws Exception {
-    ProcedureFairRunQueues<String, TestRunQueue> fairq
-      = new ProcedureFairRunQueues<String, TestRunQueue>(1);
-    TestRunQueue a = fairq.add("A", new TestRunQueue("A", 1));
-    TestRunQueue b = fairq.add("B", new TestRunQueue("B", 1));
-    TestRunQueue m = fairq.add("M", new TestRunQueue("M", 2));
-
-    // Fetch A and then remove it
-    assertEquals(a, fairq.poll());
-    assertEquals(a, fairq.remove("A"));
-
-    // Fetch B and then remove it
-    assertEquals(b, fairq.poll());
-    assertEquals(b, fairq.remove("B"));
-
-    // Fetch M and then remove it
-    assertEquals(m, fairq.poll());
-    assertEquals(m, fairq.remove("M"));
-
-    // nothing left
-    assertEquals(null, fairq.poll());
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 8c34b91..9f5e7e3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -104,6 +104,7 @@ import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
 import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
 import org.apache.hadoop.hbase.master.procedure.ModifyColumnFamilyProcedure;
 import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
 import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
@@ -277,14 +278,15 @@ public class HMaster extends HRegionServer implements MasterServices {
 
   // flag set after we complete initialization once active,
   // it is not private since it's used in unit tests
-  volatile boolean initialized = false;
+  private final ProcedureEvent initialized = new ProcedureEvent("master initialized");
 
   // flag set after master services are started,
   // initialization may have not completed yet.
   volatile boolean serviceStarted = false;
 
   // flag set after we complete assignMeta.
-  private volatile boolean serverCrashProcessingEnabled = false;
+  private final ProcedureEvent serverCrashProcessingEnabled =
+    new ProcedureEvent("server crash processing");
 
   LoadBalancer balancer;
   private RegionNormalizer normalizer;
@@ -781,8 +783,10 @@ public class HMaster extends HRegionServer implements MasterServices {
     status.markComplete("Initialization successful");
     LOG.info("Master has completed initialization");
     configurationManager.registerObserver(this.balancer);
+
     // Set master as 'initialized'.
-    initialized = true;
+    setInitialized(true);
+
     // assign the meta replicas
     Set<ServerName> EMPTY_SET = new HashSet<ServerName>();
     int numReplicas = conf.getInt(HConstants.META_REPLICAS_NUM,
@@ -976,8 +980,8 @@ public class HMaster extends HRegionServer implements MasterServices {
     // servers. This is required so that if meta is assigning to a server which dies after
     // assignMeta starts assignment, ServerCrashProcedure can re-assign it. Otherwise, we will be
     // stuck here waiting forever if waitForMeta is specified.
-    if (!serverCrashProcessingEnabled) {
-      serverCrashProcessingEnabled = true;
+    if (!isServerCrashProcessingEnabled()) {
+      setServerCrashProcessingEnabled(true);
       this.serverManager.processQueuedDeadServers();
     }
 
@@ -1207,7 +1211,7 @@ public class HMaster extends HRegionServer implements MasterServices {
 
   public boolean balance(boolean force) throws IOException {
     // if master not initialized, don't run balancer.
-    if (!this.initialized) {
+    if (!isInitialized()) {
       LOG.debug("Master has not been initialized, don't run balancer.");
       return false;
     }
@@ -1308,7 +1312,7 @@ public class HMaster extends HRegionServer implements MasterServices {
    *    is globally disabled)
    */
   public boolean normalizeRegions() throws IOException {
-    if (!this.initialized) {
+    if (!isInitialized()) {
       LOG.debug("Master has not been initialized, don't run region normalizer.");
       return false;
     }
@@ -1615,7 +1619,7 @@ public class HMaster extends HRegionServer implements MasterServices {
     }
   }
 
-  private void checkCompactionPolicy(Configuration conf, HTableDescriptor htd) 
+  private void checkCompactionPolicy(Configuration conf, HTableDescriptor htd)
       throws IOException {
     // FIFO compaction has some requirements
     // Actually FCP ignores periodic major compactions
@@ -1672,7 +1676,7 @@ public class HMaster extends HRegionServer implements MasterServices {
       }
     }
   }
-  
+
   // HBASE-13350 - Helper method to log warning on sanity check failures if checks disabled.
   private static void warnOrThrowExceptionForFailure(boolean logWarn, String confKey,
       String message, Exception cause) throws IOException {
@@ -2300,6 +2304,15 @@ public class HMaster extends HRegionServer implements MasterServices {
    */
   @Override
   public boolean isInitialized() {
+    return initialized.isReady();
+  }
+
+  @VisibleForTesting
+  public void setInitialized(boolean isInitialized) {
+    procedureExecutor.getEnvironment().setEventReady(initialized, isInitialized);
+  }
+
+  public ProcedureEvent getInitializedEvent() {
     return initialized;
   }
 
@@ -2310,12 +2323,16 @@ public class HMaster extends HRegionServer implements MasterServices {
    */
   @Override
   public boolean isServerCrashProcessingEnabled() {
-    return this.serverCrashProcessingEnabled;
+    return serverCrashProcessingEnabled.isReady();
   }
 
   @VisibleForTesting
   public void setServerCrashProcessingEnabled(final boolean b) {
-    this.serverCrashProcessingEnabled = b;
+    procedureExecutor.getEnvironment().setEventReady(serverCrashProcessingEnabled, b);
+  }
+
+  public ProcedureEvent getServerCrashProcessingEnabledEvent() {
+    return serverCrashProcessingEnabled;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java
index 58da1d1..b57540b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.InvalidFamilyOperationException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.TableState;
-import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -185,10 +184,8 @@ public class AddColumnFamilyProcedure
 
   @Override
   protected boolean acquireLock(final MasterProcedureEnv env) {
-    if (!env.isInitialized()) return false;
-    return env.getProcedureQueue().tryAcquireTableExclusiveLock(
-      tableName,
-      EventType.C_M_ADD_FAMILY.toString());
+    if (env.waitInitialized(this)) return false;
+    return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "add family");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java
index f934737..87b411e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java
@@ -205,7 +205,9 @@ public class CreateNamespaceProcedure
         return true;
       }
 
-      return false;
+      if (env.waitInitialized(this)) {
+        return false;
+      }
     }
     return getTableNamespaceManager(env).acquireExclusiveLock();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
index 7b48f3b..d786bb3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
@@ -266,7 +266,7 @@ public class CreateTableProcedure
 
   @Override
   protected boolean acquireLock(final MasterProcedureEnv env) {
-    if (!env.isInitialized() && !getTableName().isSystemTable()) {
+    if (!getTableName().isSystemTable() && env.waitInitialized(this)) {
       return false;
     }
     return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "create table");

http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java
index 5781ae6..7e135f8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.InvalidFamilyOperationException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.TableState;
-import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -202,10 +201,8 @@ public class DeleteColumnFamilyProcedure
 
   @Override
   protected boolean acquireLock(final MasterProcedureEnv env) {
-    if (!env.isInitialized()) return false;
-    return env.getProcedureQueue().tryAcquireTableExclusiveLock(
-      tableName,
-      EventType.C_M_DELETE_FAMILY.toString());
+    if (env.waitInitialized(this)) return false;
+    return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "delete family");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
index baef112..0c43c57 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java
@@ -200,7 +200,7 @@ public class DeleteTableProcedure
 
   @Override
   protected boolean acquireLock(final MasterProcedureEnv env) {
-    if (!env.isInitialized()) return false;
+    if (env.waitInitialized(this)) return false;
     return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "delete table");
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java
index 716897f..fcc1b7b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.constraint.ConstraintException;
-import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.master.AssignmentManager;
 import org.apache.hadoop.hbase.master.BulkAssigner;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
@@ -215,10 +214,8 @@ public class DisableTableProcedure
 
   @Override
   protected boolean acquireLock(final MasterProcedureEnv env) {
-    if (!env.isInitialized()) return false;
-    return env.getProcedureQueue().tryAcquireTableExclusiveLock(
-      tableName,
-      EventType.C_M_DISABLE_TABLE.toString());
+    if (env.waitInitialized(this)) return false;
+    return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "disable table");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java
index bc1fc0f..d24d94b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.TableNotDisabledException;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.TableState;
-import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.master.AssignmentManager;
 import org.apache.hadoop.hbase.master.BulkAssigner;
 import org.apache.hadoop.hbase.master.GeneralBulkAssigner;
@@ -235,10 +234,8 @@ public class EnableTableProcedure
 
   @Override
   protected boolean acquireLock(final MasterProcedureEnv env) {
-    if (!env.isInitialized()) return false;
-    return env.getProcedureQueue().tryAcquireTableExclusiveLock(
-      tableName,
-      EventType.C_M_ENABLE_TABLE.toString());
+    if (env.waitInitialized(this)) return false;
+    return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "enable table");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
index 6700b63..090b8cc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
+import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
 import org.apache.hadoop.hbase.security.User;
@@ -85,12 +87,12 @@ public class MasterProcedureEnv {
     }
   }
 
-  private final MasterProcedureQueue procQueue;
+  private final MasterProcedureScheduler procSched;
   private final MasterServices master;
 
   public MasterProcedureEnv(final MasterServices master) {
     this.master = master;
-    this.procQueue = new MasterProcedureQueue(master.getConfiguration(),
+    this.procSched = new MasterProcedureScheduler(master.getConfiguration(),
       master.getTableLockManager());
   }
 
@@ -114,8 +116,8 @@ public class MasterProcedureEnv {
     return master.getMasterCoprocessorHost();
   }
 
-  public MasterProcedureQueue getProcedureQueue() {
-    return procQueue;
+  public MasterProcedureScheduler getProcedureQueue() {
+    return procSched;
   }
 
   public boolean isRunning() {
@@ -125,4 +127,28 @@ public class MasterProcedureEnv {
   public boolean isInitialized() {
     return master.isInitialized();
   }
+
+  public boolean waitInitialized(Procedure proc) {
+    return procSched.waitEvent(((HMaster)master).getInitializedEvent(), proc);
+  }
+
+  public boolean waitServerCrashProcessingEnabled(Procedure proc) {
+    return procSched.waitEvent(((HMaster)master).getServerCrashProcessingEnabledEvent(), proc);
+  }
+
+  public void wake(ProcedureEvent event) {
+    procSched.wake(event);
+  }
+
+  public void suspend(ProcedureEvent event) {
+    procSched.suspend(event);
+  }
+
+  public void setEventReady(ProcedureEvent event, boolean isReady) {
+    if (isReady) {
+      procSched.wake(event);
+    } else {
+      procSched.suspend(event);
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java
deleted file mode 100644
index c4c7747..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java
+++ /dev/null
@@ -1,578 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.master.procedure;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Deque;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableExistsException;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.procedure2.ProcedureFairRunQueues;
-import org.apache.hadoop.hbase.procedure2.ProcedureRunnableSet;
-import org.apache.hadoop.hbase.master.TableLockManager;
-import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
-import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
-
-/**
- * ProcedureRunnableSet for the Master Procedures.
- * This RunnableSet tries to provide to the ProcedureExecutor procedures
- * that can be executed without having to wait on a lock.
- * Most of the master operations can be executed concurrently, if they
- * are operating on different tables (e.g. two create table can be performed
- * at the same, time assuming table A and table B) or against two different servers; say
- * two servers that crashed at about the same time.
- *
- * <p>Each procedure should implement an interface providing information for this queue.
- * for example table related procedures should implement TableProcedureInterface.
- * each procedure will be pushed in its own queue, and based on the operation type
- * we may take smarter decision. e.g. we can abort all the operations preceding
- * a delete table, or similar.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class MasterProcedureQueue implements ProcedureRunnableSet {
-  private static final Log LOG = LogFactory.getLog(MasterProcedureQueue.class);
-
-  // Two queues to ensure that server procedures run ahead of table precedures always.
-  private final ProcedureFairRunQueues<TableName, RunQueue> tableFairQ;
-  /**
-   * Rely on basic fair q. ServerCrashProcedure will yield if meta is not assigned. This way, the
-   * server that was carrying meta should rise to the top of the queue (this is how it used to
-   * work when we had handlers and ServerShutdownHandler ran). TODO: special handling of servers
-   * that were carrying system tables on crash; do I need to have these servers have priority?
-   *
-   * <p>Apart from the special-casing of meta and system tables, fairq is what we want
-   */
-  private final ProcedureFairRunQueues<ServerName, RunQueue> serverFairQ;
-
-  private final ReentrantLock lock = new ReentrantLock();
-  private final Condition waitCond = lock.newCondition();
-  private final TableLockManager lockManager;
-
-  private final int metaTablePriority;
-  private final int userTablePriority;
-  private final int sysTablePriority;
-  private static final int DEFAULT_SERVER_PRIORITY = 1;
-
-  /**
-   * Keeps count across server and table queues.
-   */
-  private int queueSize;
-
-  public MasterProcedureQueue(final Configuration conf, final TableLockManager lockManager) {
-    this.tableFairQ = new ProcedureFairRunQueues<TableName, RunQueue>(1);
-    this.serverFairQ = new ProcedureFairRunQueues<ServerName, RunQueue>(1);
-    this.lockManager = lockManager;
-
-    // TODO: should this be part of the HTD?
-    metaTablePriority = conf.getInt("hbase.master.procedure.queue.meta.table.priority", 3);
-    sysTablePriority = conf.getInt("hbase.master.procedure.queue.system.table.priority", 2);
-    userTablePriority = conf.getInt("hbase.master.procedure.queue.user.table.priority", 1);
-  }
-
-  @Override
-  public void addFront(final Procedure proc) {
-    lock.lock();
-    try {
-      getRunQueueOrCreate(proc).addFront(proc);
-      queueSize++;
-      waitCond.signal();
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  @Override
-  public void addBack(final Procedure proc) {
-    lock.lock();
-    try {
-      getRunQueueOrCreate(proc).addBack(proc);
-      queueSize++;
-      waitCond.signal();
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  @Override
-  public void yield(final Procedure proc) {
-    addBack(proc);
-  }
-
-  @Override
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
-  public Long poll() {
-    Long pollResult = null;
-    lock.lock();
-    try {
-      if (queueSize == 0) {
-        waitCond.await();
-        if (queueSize == 0) {
-          return null;
-        }
-      }
-      // For now, let server handling have precedence over table handling; presumption is that it
-      // is more important handling crashed servers than it is running the
-      // enabling/disabling tables, etc.
-      pollResult = doPoll(serverFairQ.poll());
-      if (pollResult == null) {
-        pollResult = doPoll(tableFairQ.poll());
-      }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-    } finally {
-      lock.unlock();
-    }
-    return pollResult;
-  }
-
-  private Long doPoll(final RunQueue rq) {
-    if (rq == null || !rq.isAvailable()) return null;
-    this.queueSize--;
-    return rq.poll();
-  }
-
-  @Override
-  public void signalAll() {
-    lock.lock();
-    try {
-      waitCond.signalAll();
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  @Override
-  public void clear() {
-    lock.lock();
-    try {
-      serverFairQ.clear();
-      tableFairQ.clear();
-      queueSize = 0;
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  @Override
-  public int size() {
-    lock.lock();
-    try {
-      return queueSize;
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  @Override
-  public String toString() {
-    lock.lock();
-    try {
-      return "MasterProcedureQueue size=" + queueSize + ": tableFairQ: " + tableFairQ +
-        ", serverFairQ: " + serverFairQ;
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  @Override
-  public void completionCleanup(Procedure proc) {
-    if (proc instanceof TableProcedureInterface) {
-      TableProcedureInterface iProcTable = (TableProcedureInterface)proc;
-      boolean tableDeleted;
-      if (proc.hasException()) {
-        IOException procEx =  proc.getException().unwrapRemoteException();
-        if (iProcTable.getTableOperationType() == TableOperationType.CREATE) {
-          // create failed because the table already exist
-          tableDeleted = !(procEx instanceof TableExistsException);
-        } else {
-          // the operation failed because the table does not exist
-          tableDeleted = (procEx instanceof TableNotFoundException);
-        }
-      } else {
-        // the table was deleted
-        tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE);
-      }
-      if (tableDeleted) {
-        markTableAsDeleted(iProcTable.getTableName());
-      }
-    }
-    // No cleanup for ServerProcedureInterface types, yet.
-  }
-
-  private RunQueue getRunQueueOrCreate(final Procedure proc) {
-    if (proc instanceof TableProcedureInterface) {
-      final TableName table = ((TableProcedureInterface)proc).getTableName();
-      return getRunQueueOrCreate(table);
-    }
-    if (proc instanceof ServerProcedureInterface) {
-      return getRunQueueOrCreate((ServerProcedureInterface)proc);
-    }
-    // TODO: at the moment we only have Table and Server procedures
-    // if you are implementing a non-table/non-server procedure, you have two options: create
-    // a group for all the non-table/non-server procedures or try to find a key for your
-    // non-table/non-server procedures and implement something similar to the TableRunQueue.
-    throw new UnsupportedOperationException("RQs for non-table procedures are not implemented yet");
-  }
-
-  private TableRunQueue getRunQueueOrCreate(final TableName table) {
-    final TableRunQueue queue = getRunQueue(table);
-    if (queue != null) return queue;
-    return (TableRunQueue)tableFairQ.add(table, createTableRunQueue(table));
-  }
-
-  private ServerRunQueue getRunQueueOrCreate(final ServerProcedureInterface spi) {
-    final ServerRunQueue queue = getRunQueue(spi.getServerName());
-    if (queue != null) return queue;
-    return (ServerRunQueue)serverFairQ.add(spi.getServerName(), createServerRunQueue(spi));
-  }
-
-  private TableRunQueue createTableRunQueue(final TableName table) {
-    int priority = userTablePriority;
-    if (table.equals(TableName.META_TABLE_NAME)) {
-      priority = metaTablePriority;
-    } else if (table.isSystemTable()) {
-      priority = sysTablePriority;
-    }
-    return new TableRunQueue(priority);
-  }
-
-  private ServerRunQueue createServerRunQueue(final ServerProcedureInterface spi) {
-    return new ServerRunQueue(DEFAULT_SERVER_PRIORITY);
-  }
-
-  private TableRunQueue getRunQueue(final TableName table) {
-    return (TableRunQueue)tableFairQ.get(table);
-  }
-
-  private ServerRunQueue getRunQueue(final ServerName sn) {
-    return (ServerRunQueue)serverFairQ.get(sn);
-  }
-
-  /**
-   * Try to acquire the write lock on the specified table.
-   * other operations in the table-queue will be executed after the lock is released.
-   * @param table Table to lock
-   * @param purpose Human readable reason for locking the table
-   * @return true if we were able to acquire the lock on the table, otherwise false.
-   */
-  public boolean tryAcquireTableExclusiveLock(final TableName table, final String purpose) {
-    return getRunQueueOrCreate(table).tryExclusiveLock(lockManager, table, purpose);
-  }
-
-  /**
-   * Release the write lock taken with tryAcquireTableWrite()
-   * @param table the name of the table that has the write lock
-   */
-  public void releaseTableExclusiveLock(final TableName table) {
-    getRunQueue(table).releaseExclusiveLock(lockManager, table);
-  }
-
-  /**
-   * Try to acquire the read lock on the specified table.
-   * other read operations in the table-queue may be executed concurrently,
-   * otherwise they have to wait until all the read-locks are released.
-   * @param table Table to lock
-   * @param purpose Human readable reason for locking the table
-   * @return true if we were able to acquire the lock on the table, otherwise false.
-   */
-  public boolean tryAcquireTableSharedLock(final TableName table, final String purpose) {
-    return getRunQueueOrCreate(table).trySharedLock(lockManager, table, purpose);
-  }
-
-  /**
-   * Release the read lock taken with tryAcquireTableRead()
-   * @param table the name of the table that has the read lock
-   */
-  public void releaseTableSharedLock(final TableName table) {
-    getRunQueue(table).releaseSharedLock(lockManager, table);
-  }
-
-  /**
-   * Try to acquire the write lock on the specified server.
-   * @see #releaseServerExclusiveLock(ServerProcedureInterface)
-   * @param spi Server to lock
-   * @return true if we were able to acquire the lock on the server, otherwise false.
-   */
-  public boolean tryAcquireServerExclusiveLock(final ServerProcedureInterface spi) {
-    return getRunQueueOrCreate(spi).tryExclusiveLock();
-  }
-
-  /**
-   * Release the write lock
-   * @see #tryAcquireServerExclusiveLock(ServerProcedureInterface)
-   * @param spi the server that has the write lock
-   */
-  public void releaseServerExclusiveLock(final ServerProcedureInterface spi) {
-    getRunQueue(spi.getServerName()).releaseExclusiveLock();
-  }
-
-  /**
-   * Try to acquire the read lock on the specified server.
-   * @see #releaseServerSharedLock(ServerProcedureInterface)
-   * @param spi Server to lock
-   * @return true if we were able to acquire the lock on the server, otherwise false.
-   */
-  public boolean tryAcquireServerSharedLock(final ServerProcedureInterface spi) {
-    return getRunQueueOrCreate(spi).trySharedLock();
-  }
-
-  /**
-   * Release the read lock taken
-   * @see #tryAcquireServerSharedLock(ServerProcedureInterface)
-   * @param spi the server that has the read lock
-   */
-  public void releaseServerSharedLock(final ServerProcedureInterface spi) {
-    getRunQueue(spi.getServerName()).releaseSharedLock();
-  }
-
-  /**
-   * Tries to remove the queue and the table-lock of the specified table.
-   * If there are new operations pending (e.g. a new create),
-   * the remove will not be performed.
-   * @param table the name of the table that should be marked as deleted
-   * @return true if deletion succeeded, false otherwise meaning that there are
-   *    other new operations pending for that table (e.g. a new create).
-   */
-  protected boolean markTableAsDeleted(final TableName table) {
-    TableRunQueue queue = getRunQueue(table);
-    if (queue != null) {
-      lock.lock();
-      try {
-        if (queue.isEmpty() && queue.acquireDeleteLock()) {
-          tableFairQ.remove(table);
-
-          // Remove the table lock
-          try {
-            lockManager.tableDeleted(table);
-          } catch (IOException e) {
-            LOG.warn("Received exception from TableLockManager.tableDeleted:", e); //not critical
-          }
-        } else {
-          // TODO: If there are no create, we can drop all the other ops
-          return false;
-        }
-      } finally {
-        lock.unlock();
-      }
-    }
-    return true;
-  }
-
-  private interface RunQueue extends ProcedureFairRunQueues.FairObject {
-    void addFront(Procedure proc);
-    void addBack(Procedure proc);
-    Long poll();
-    boolean acquireDeleteLock();
-  }
-
-  /**
-   * Base abstract class for RunQueue implementations.
-   * Be careful honoring synchronizations in subclasses. In here we protect access but if you are
-   * acting on a state found in here, be sure dependent code keeps synchronization.
-   * Implements basic in-memory read/write locking mechanism to prevent procedure steps being run
-   * in parallel.
-   */
-  private static abstract class AbstractRunQueue implements RunQueue {
-    // All modification of runnables happens with #lock held.
-    private final Deque<Long> runnables = new ArrayDeque<Long>();
-    private final int priority;
-    private boolean exclusiveLock = false;
-    private int sharedLock = 0;
-
-    public AbstractRunQueue(int priority) {
-      this.priority = priority;
-    }
-
-    boolean isEmpty() {
-      return this.runnables.isEmpty();
-    }
-
-    @Override
-    public boolean isAvailable() {
-      synchronized (this) {
-        return !exclusiveLock && !runnables.isEmpty();
-      }
-    }
-
-    @Override
-    public int getPriority() {
-      return this.priority;
-    }
-
-    @Override
-    public void addFront(Procedure proc) {
-      this.runnables.addFirst(proc.getProcId());
-    }
-
-    @Override
-    public void addBack(Procedure proc) {
-      this.runnables.addLast(proc.getProcId());
-    }
-
-    @Override
-    public Long poll() {
-      return this.runnables.poll();
-    }
-
-    @Override
-    public synchronized boolean acquireDeleteLock() {
-      return tryExclusiveLock();
-    }
-
-    public synchronized boolean isLocked() {
-      return isExclusiveLock() || sharedLock > 0;
-    }
-
-    public synchronized boolean isExclusiveLock() {
-      return this.exclusiveLock;
-    }
-
-    public synchronized boolean trySharedLock() {
-      if (isExclusiveLock()) return false;
-      sharedLock++;
-      return true;
-    }
-
-    public synchronized void releaseSharedLock() {
-      sharedLock--;
-    }
-
-    /**
-     * @return True if only one instance of a shared lock outstanding.
-     */
-    synchronized boolean isSingleSharedLock() {
-      return sharedLock == 1;
-    }
-
-    public synchronized boolean tryExclusiveLock() {
-      if (isLocked()) return false;
-      exclusiveLock = true;
-      return true;
-    }
-
-    public synchronized void releaseExclusiveLock() {
-      exclusiveLock = false;
-    }
-
-    @Override
-    public String toString() {
-      return this.runnables.toString();
-    }
-  }
-
-  /**
-   * Run Queue for Server procedures.
-   */
-  private static class ServerRunQueue extends AbstractRunQueue {
-    public ServerRunQueue(int priority) {
-      super(priority);
-    }
-  }
-
-  /**
-   * Run Queue for a Table. It contains a read-write lock that is used by the
-   * MasterProcedureQueue to decide if we should fetch an item from this queue
-   * or skip to another one which will be able to run without waiting for locks.
-   */
-  private static class TableRunQueue extends AbstractRunQueue {
-    private TableLock tableLock = null;
-
-    public TableRunQueue(int priority) {
-      super(priority);
-    }
-
-    // TODO: Improve run-queue push with TableProcedureInterface.getType()
-    //       we can take smart decisions based on the type of the operation (e.g. create/delete)
-    @Override
-    public void addBack(final Procedure proc) {
-      super.addBack(proc);
-    }
-
-    public synchronized boolean trySharedLock(final TableLockManager lockManager,
-        final TableName tableName, final String purpose) {
-      if (isExclusiveLock()) return false;
-
-      // Take zk-read-lock
-      tableLock = lockManager.readLock(tableName, purpose);
-      try {
-        tableLock.acquire();
-      } catch (IOException e) {
-        LOG.error("failed acquire read lock on " + tableName, e);
-        tableLock = null;
-        return false;
-      }
-      trySharedLock();
-      return true;
-    }
-
-    public synchronized void releaseSharedLock(final TableLockManager lockManager,
-        final TableName tableName) {
-      releaseTableLock(lockManager, isSingleSharedLock());
-      releaseSharedLock();
-    }
-
-    public synchronized boolean tryExclusiveLock(final TableLockManager lockManager,
-        final TableName tableName, final String purpose) {
-      if (isLocked()) return false;
-      // Take zk-write-lock
-      tableLock = lockManager.writeLock(tableName, purpose);
-      try {
-        tableLock.acquire();
-      } catch (IOException e) {
-        LOG.error("failed acquire write lock on " + tableName, e);
-        tableLock = null;
-        return false;
-      }
-      tryExclusiveLock();
-      return true;
-    }
-
-    public synchronized void releaseExclusiveLock(final TableLockManager lockManager,
-        final TableName tableName) {
-      releaseTableLock(lockManager, true);
-      releaseExclusiveLock();
-    }
-
-    private void releaseTableLock(final TableLockManager lockManager, boolean reset) {
-      for (int i = 0; i < 3; ++i) {
-        try {
-          tableLock.release();
-          if (reset) {
-            tableLock = null;
-          }
-          break;
-        } catch (IOException e) {
-          LOG.warn("Could not release the table write-lock", e);
-        }
-      }
-    }
-  }
-}


[4/9] hbase git commit: HBASE-14837 Procedure v2 - Procedure Queue Improvement

Posted by sy...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
new file mode 100644
index 0000000..9a3714f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
@@ -0,0 +1,1241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUTKey WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.master.TableLockManager;
+import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
+import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureRunnableSet;
+
+/**
+ * ProcedureRunnableSet for the Master Procedures.
+ * This RunnableSet tries to provide to the ProcedureExecutor procedures
+ * that can be executed without having to wait on a lock.
+ * Most of the master operations can be executed concurrently, if they
+ * are operating on different tables (e.g. two create table can be performed
+ * at the same, time assuming table A and table B) or against two different servers; say
+ * two servers that crashed at about the same time.
+ *
+ * <p>Each procedure should implement an interface providing information for this queue.
+ * for example table related procedures should implement TableProcedureInterface.
+ * each procedure will be pushed in its own queue, and based on the operation type
+ * we may take smarter decision. e.g. we can abort all the operations preceding
+ * a delete table, or similar.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class MasterProcedureScheduler implements ProcedureRunnableSet {
+  private static final Log LOG = LogFactory.getLog(MasterProcedureScheduler.class);
+
+  private final TableLockManager lockManager;
+  private final ReentrantLock schedLock = new ReentrantLock();
+  private final Condition schedWaitCond = schedLock.newCondition();
+
+  private final FairQueue<ServerName> serverRunQueue = new FairQueue<ServerName>();
+  private final FairQueue<TableName> tableRunQueue = new FairQueue<TableName>();
+  private int queueSize = 0;
+
+  private final Object[] serverBuckets = new Object[128];
+  private Queue<String> namespaceMap = null;
+  private Queue<TableName> tableMap = null;
+
+  private final int metaTablePriority;
+  private final int userTablePriority;
+  private final int sysTablePriority;
+
+  // TODO: metrics
+  private long pollCalls = 0;
+  private long nullPollCalls = 0;
+
+  public MasterProcedureScheduler(final Configuration conf, final TableLockManager lockManager) {
+    this.lockManager = lockManager;
+
+    // TODO: should this be part of the HTD?
+    metaTablePriority = conf.getInt("hbase.master.procedure.queue.meta.table.priority", 3);
+    sysTablePriority = conf.getInt("hbase.master.procedure.queue.system.table.priority", 2);
+    userTablePriority = conf.getInt("hbase.master.procedure.queue.user.table.priority", 1);
+  }
+
+  @Override
+  public void addFront(Procedure proc) {
+    doAdd(proc, true);
+  }
+
+  @Override
+  public void addBack(Procedure proc) {
+    doAdd(proc, false);
+  }
+
+  @Override
+  public void yield(final Procedure proc) {
+    doAdd(proc, isTableProcedure(proc));
+  }
+
+  private void doAdd(final Procedure proc, final boolean addFront) {
+    schedLock.lock();
+    try {
+      if (isTableProcedure(proc)) {
+        doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
+      } else if (isServerProcedure(proc)) {
+        doAdd(serverRunQueue, getServerQueue(getServerName(proc)), proc, addFront);
+      } else {
+        // TODO: at the moment we only have Table and Server procedures
+        // if you are implementing a non-table/non-server procedure, you have two options: create
+        // a group for all the non-table/non-server procedures or try to find a key for your
+        // non-table/non-server procedures and implement something similar to the TableRunQueue.
+        throw new UnsupportedOperationException(
+          "RQs for non-table/non-server procedures are not implemented yet");
+      }
+      schedWaitCond.signal();
+    } finally {
+      schedLock.unlock();
+    }
+  }
+
+  private <T extends Comparable<T>> void doAdd(final FairQueue<T> fairq,
+      final Queue<T> queue, final Procedure proc, final boolean addFront) {
+    queue.add(proc, addFront);
+    if (!(queue.isSuspended() || queue.hasExclusiveLock())) {
+      if (queue.size() == 1 && !IterableList.isLinked(queue)) {
+        fairq.add(queue);
+      }
+      queueSize++;
+    }
+  }
+
+  @Override
+  public Procedure poll() {
+    return poll(-1);
+  }
+
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
+  Procedure poll(long waitNsec) {
+    Procedure pollResult = null;
+    schedLock.lock();
+    try {
+      if (queueSize == 0) {
+        if (waitNsec < 0) {
+          schedWaitCond.await();
+        } else {
+          schedWaitCond.awaitNanos(waitNsec);
+        }
+        if (queueSize == 0) {
+          return null;
+        }
+      }
+
+      // For now, let server handling have precedence over table handling; presumption is that it
+      // is more important handling crashed servers than it is running the
+      // enabling/disabling tables, etc.
+      pollResult = doPoll(serverRunQueue);
+      if (pollResult == null) {
+        pollResult = doPoll(tableRunQueue);
+      }
+
+      // update metrics
+      pollCalls++;
+      nullPollCalls += (pollResult == null) ? 1 : 0;
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    } finally {
+      schedLock.unlock();
+    }
+    return pollResult;
+  }
+
+  private <T extends Comparable<T>> Procedure doPoll(final FairQueue<T> fairq) {
+    Queue<T> rq = fairq.poll();
+    if (rq == null || !rq.isAvailable()) {
+      return null;
+    }
+
+    assert !rq.isSuspended() : "rq=" + rq + " is suspended";
+    Procedure pollResult = rq.poll();
+    this.queueSize--;
+    if (rq.isEmpty() || rq.requireExclusiveLock(pollResult)) {
+      removeFromRunQueue(fairq, rq);
+    }
+    return pollResult;
+  }
+
+  @Override
+  public void clear() {
+    // NOTE: USED ONLY FOR TESTING
+    schedLock.lock();
+    try {
+      // Remove Servers
+      for (int i = 0; i < serverBuckets.length; ++i) {
+        clear((ServerQueue)serverBuckets[i], serverRunQueue);
+        serverBuckets[i] = null;
+      }
+
+      // Remove Tables
+      clear(tableMap, tableRunQueue);
+      tableMap = null;
+
+      assert queueSize == 0 : "expected queue size to be 0, got " + queueSize;
+    } finally {
+      schedLock.unlock();
+    }
+  }
+
+  private <T extends Comparable<T>> void clear(Queue<T> treeMap, FairQueue<T> fairq) {
+    while (treeMap != null) {
+      Queue<T> node = AvlTree.getFirst(treeMap);
+      assert !node.isSuspended() : "can't clear suspended " + node.getKey();
+      treeMap = AvlTree.remove(treeMap, node.getKey());
+      removeFromRunQueue(fairq, node);
+    }
+  }
+
+  @Override
+  public void signalAll() {
+    schedLock.lock();
+    try {
+      schedWaitCond.signalAll();
+    } finally {
+      schedLock.unlock();
+    }
+  }
+
+  @Override
+  public int size() {
+    schedLock.lock();
+    try {
+      return queueSize;
+    } finally {
+      schedLock.unlock();
+    }
+  }
+
+  @Override
+  public void completionCleanup(Procedure proc) {
+    if (proc instanceof TableProcedureInterface) {
+      TableProcedureInterface iProcTable = (TableProcedureInterface)proc;
+      boolean tableDeleted;
+      if (proc.hasException()) {
+        IOException procEx =  proc.getException().unwrapRemoteException();
+        if (iProcTable.getTableOperationType() == TableOperationType.CREATE) {
+          // create failed because the table already exist
+          tableDeleted = !(procEx instanceof TableExistsException);
+        } else {
+          // the operation failed because the table does not exist
+          tableDeleted = (procEx instanceof TableNotFoundException);
+        }
+      } else {
+        // the table was deleted
+        tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE);
+      }
+      if (tableDeleted) {
+        markTableAsDeleted(iProcTable.getTableName());
+        return;
+      }
+    } else {
+      // No cleanup for ServerProcedureInterface types, yet.
+      return;
+    }
+  }
+
+  private <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue) {
+    if (IterableList.isLinked(queue)) return;
+    if (!queue.isEmpty())  {
+      fairq.add(queue);
+      queueSize += queue.size();
+    }
+  }
+
+  private <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq, Queue<T> queue) {
+    if (!IterableList.isLinked(queue)) return;
+    fairq.remove(queue);
+    queueSize -= queue.size();
+  }
+
+  // ============================================================================
+  //  TODO: Metrics
+  // ============================================================================
+  public long getPollCalls() {
+    return pollCalls;
+  }
+
+  public long getNullPollCalls() {
+    return nullPollCalls;
+  }
+
+  // ============================================================================
+  //  Event Helpers
+  // ============================================================================
+  public boolean waitEvent(ProcedureEvent event, Procedure procedure) {
+    return waitEvent(event, procedure, false);
+  }
+
+  public boolean waitEvent(ProcedureEvent event, Procedure procedure, boolean suspendQueue) {
+    synchronized (event) {
+      if (event.isReady()) {
+        return false;
+      }
+
+      // TODO: Suspend single procedure not implemented yet, fallback to suspending the queue
+      if (!suspendQueue) suspendQueue = true;
+
+      if (isTableProcedure(procedure)) {
+        suspendTableQueue(event, getTableName(procedure));
+      } else if (isServerProcedure(procedure)) {
+        suspendServerQueue(event, getServerName(procedure));
+      } else {
+        // TODO: at the moment we only have Table and Server procedures
+        // if you are implementing a non-table/non-server procedure, you have two options: create
+        // a group for all the non-table/non-server procedures or try to find a key for your
+        // non-table/non-server procedures and implement something similar to the TableRunQueue.
+        throw new UnsupportedOperationException(
+          "RQs for non-table/non-server procedures are not implemented yet");
+      }
+    }
+    return true;
+  }
+
+  private void suspendTableQueue(ProcedureEvent event, TableName tableName) {
+    schedLock.lock();
+    try {
+      TableQueue queue = getTableQueue(tableName);
+      if (!queue.setSuspended(true)) return;
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Suspend table queue " + tableName);
+      }
+      removeFromRunQueue(tableRunQueue, queue);
+      event.suspendTableQueue(queue);
+    } finally {
+      schedLock.unlock();
+    }
+  }
+
+  private void suspendServerQueue(ProcedureEvent event, ServerName serverName) {
+    schedLock.lock();
+    try {
+      // TODO: This will change once we have the new AM
+      ServerQueue queue = getServerQueue(serverName);
+      if (!queue.setSuspended(true)) return;
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Suspend server queue " + serverName);
+      }
+      removeFromRunQueue(serverRunQueue, queue);
+      event.suspendServerQueue(queue);
+    } finally {
+      schedLock.unlock();
+    }
+  }
+
+  public void suspend(ProcedureEvent event) {
+    synchronized (event) {
+      event.setReady(false);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Suspend event " + event);
+      }
+    }
+  }
+
+  public void wake(ProcedureEvent event) {
+    synchronized (event) {
+      event.setReady(true);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Wake event " + event);
+      }
+
+      schedLock.lock();
+      try {
+        while (event.hasWaitingTables()) {
+          Queue<TableName> queue = event.popWaitingTable();
+          addToRunQueue(tableRunQueue, queue);
+        }
+        // TODO: This will change once we have the new AM
+        while (event.hasWaitingServers()) {
+          Queue<ServerName> queue = event.popWaitingServer();
+          addToRunQueue(serverRunQueue, queue);
+        }
+
+        if (queueSize > 1) {
+          schedWaitCond.signalAll();
+        } else if (queueSize > 0) {
+          schedWaitCond.signal();
+        }
+      } finally {
+        schedLock.unlock();
+      }
+    }
+  }
+
+  public static class ProcedureEvent {
+    private final String description;
+
+    private Queue<ServerName> waitingServers = null;
+    private Queue<TableName> waitingTables = null;
+    private boolean ready = false;
+
+    public ProcedureEvent(String description) {
+      this.description = description;
+    }
+
+    public synchronized boolean isReady() {
+      return ready;
+    }
+
+    private synchronized void setReady(boolean isReady) {
+      this.ready = isReady;
+    }
+
+    private void suspendTableQueue(Queue<TableName> queue) {
+      waitingTables = IterableList.append(waitingTables, queue);
+    }
+
+    private void suspendServerQueue(Queue<ServerName> queue) {
+      waitingServers = IterableList.append(waitingServers, queue);
+    }
+
+    private boolean hasWaitingTables() {
+      return waitingTables != null;
+    }
+
+    private Queue<TableName> popWaitingTable() {
+      Queue<TableName> node = waitingTables;
+      waitingTables = IterableList.remove(waitingTables, node);
+      node.setSuspended(false);
+      return node;
+    }
+
+    private boolean hasWaitingServers() {
+      return waitingServers != null;
+    }
+
+    private Queue<ServerName> popWaitingServer() {
+      Queue<ServerName> node = waitingServers;
+      waitingServers = IterableList.remove(waitingServers, node);
+      node.setSuspended(false);
+      return node;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("ProcedureEvent(%s)", description);
+    }
+  }
+
+  // ============================================================================
+  //  Table Queue Lookup Helpers
+  // ============================================================================
+  private TableQueue getTableQueueWithLock(TableName tableName) {
+    schedLock.lock();
+    try {
+      return getTableQueue(tableName);
+    } finally {
+      schedLock.unlock();
+    }
+  }
+
+  private TableQueue getTableQueue(TableName tableName) {
+    Queue<TableName> node = AvlTree.get(tableMap, tableName);
+    if (node != null) return (TableQueue)node;
+
+    node = new TableQueue(tableName, getTablePriority(tableName));
+    tableMap = AvlTree.insert(tableMap, node);
+    return (TableQueue)node;
+  }
+
+  private void removeTableQueue(TableName tableName) {
+    tableMap = AvlTree.remove(tableMap, tableName);
+  }
+
+  private int getTablePriority(TableName tableName) {
+    if (tableName.equals(TableName.META_TABLE_NAME)) {
+      return metaTablePriority;
+    } else if (tableName.isSystemTable()) {
+      return sysTablePriority;
+    }
+    return userTablePriority;
+  }
+
+  private static boolean isTableProcedure(Procedure proc) {
+    return proc instanceof TableProcedureInterface;
+  }
+
+  private static TableName getTableName(Procedure proc) {
+    return ((TableProcedureInterface)proc).getTableName();
+  }
+
+  // ============================================================================
+  //  Server Queue Lookup Helpers
+  // ============================================================================
+  private ServerQueue getServerQueueWithLock(ServerName serverName) {
+    schedLock.lock();
+    try {
+      return getServerQueue(serverName);
+    } finally {
+      schedLock.unlock();
+    }
+  }
+
+  private ServerQueue getServerQueue(ServerName serverName) {
+    int index = getBucketIndex(serverBuckets, serverName.hashCode());
+    Queue<ServerName> root = getTreeRoot(serverBuckets, index);
+    Queue<ServerName> node = AvlTree.get(root, serverName);
+    if (node != null) return (ServerQueue)node;
+
+    node = new ServerQueue(serverName);
+    serverBuckets[index] = AvlTree.insert(root, node);
+    return (ServerQueue)node;
+  }
+
+  private void removeServerQueue(ServerName serverName) {
+    int index = getBucketIndex(serverBuckets, serverName.hashCode());
+    serverBuckets[index] = AvlTree.remove((ServerQueue)serverBuckets[index], serverName);
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <T extends Comparable<T>> Queue<T> getTreeRoot(Object[] buckets, int index) {
+    return (Queue<T>) buckets[index];
+  }
+
+  private static int getBucketIndex(Object[] buckets, int hashCode) {
+    return Math.abs(hashCode) % buckets.length;
+  }
+
+  private static boolean isServerProcedure(Procedure proc) {
+    return proc instanceof ServerProcedureInterface;
+  }
+
+  private static ServerName getServerName(Procedure proc) {
+    return ((ServerProcedureInterface)proc).getServerName();
+  }
+
+  // ============================================================================
+  //  Table and Server Queue Implementation
+  // ============================================================================
+  public static class ServerQueue extends QueueImpl<ServerName> {
+    public ServerQueue(ServerName serverName) {
+      super(serverName);
+    }
+
+    public boolean requireExclusiveLock(Procedure proc) {
+      ServerProcedureInterface spi = (ServerProcedureInterface)proc;
+      switch (spi.getServerOperationType()) {
+        case CRASH_HANDLER:
+          return true;
+        default:
+          break;
+      }
+      throw new UnsupportedOperationException("unexpected type " + spi.getServerOperationType());
+    }
+  }
+
+  public static class TableQueue extends QueueImpl<TableName> {
+    private TableLock tableLock = null;
+
+    public TableQueue(TableName tableName, int priority) {
+      super(tableName, priority);
+    }
+
+    // TODO: We can abort pending/in-progress operation if the new call is
+    //       something like drop table. We can Override addBack(),
+    //       check the type and abort all the in-flight procedurs.
+    private boolean canAbortPendingOperations(Procedure proc) {
+      TableProcedureInterface tpi = (TableProcedureInterface)proc;
+      switch (tpi.getTableOperationType()) {
+        case DELETE:
+          return true;
+        default:
+          return false;
+      }
+    }
+
+    public boolean requireExclusiveLock(Procedure proc) {
+      TableProcedureInterface tpi = (TableProcedureInterface)proc;
+      switch (tpi.getTableOperationType()) {
+        case CREATE:
+        case DELETE:
+        case DISABLE:
+        case EDIT:
+        case ENABLE:
+          return true;
+        case READ:
+          return false;
+        default:
+          break;
+      }
+      throw new UnsupportedOperationException("unexpected type " + tpi.getTableOperationType());
+    }
+
+    private synchronized boolean trySharedLock(final TableLockManager lockManager,
+        final String purpose) {
+      if (hasExclusiveLock()) return false;
+
+      // Take zk-read-lock
+      TableName tableName = getKey();
+      tableLock = lockManager.readLock(tableName, purpose);
+      try {
+        tableLock.acquire();
+      } catch (IOException e) {
+        LOG.error("failed acquire read lock on " + tableName, e);
+        tableLock = null;
+        return false;
+      }
+
+      trySharedLock();
+      return true;
+    }
+
+    private synchronized void releaseSharedLock(final TableLockManager lockManager) {
+      releaseTableLock(lockManager, isSingleSharedLock());
+      releaseSharedLock();
+    }
+
+    private synchronized boolean tryZkExclusiveLock(final TableLockManager lockManager,
+        final String purpose) {
+      // Take zk-write-lock
+      TableName tableName = getKey();
+      tableLock = lockManager.writeLock(tableName, purpose);
+      try {
+        tableLock.acquire();
+      } catch (IOException e) {
+        LOG.error("failed acquire write lock on " + tableName, e);
+        tableLock = null;
+        return false;
+      }
+      return true;
+    }
+
+    private synchronized void releaseZkExclusiveLock(final TableLockManager lockManager) {
+      releaseTableLock(lockManager, true);
+    }
+
+    private void releaseTableLock(final TableLockManager lockManager, boolean reset) {
+      for (int i = 0; i < 3; ++i) {
+        try {
+          tableLock.release();
+          if (reset) {
+            tableLock = null;
+          }
+          break;
+        } catch (IOException e) {
+          LOG.warn("Could not release the table write-lock", e);
+        }
+      }
+    }
+  }
+
+  // ============================================================================
+  //  Locking Helpers
+  // ============================================================================
+  /**
+   * Try to acquire the exclusive lock on the specified table.
+   * other operations in the table-queue will be executed after the lock is released.
+   * @param table Table to lock
+   * @param purpose Human readable reason for locking the table
+   * @return true if we were able to acquire the lock on the table, otherwise false.
+   */
+  public boolean tryAcquireTableExclusiveLock(final TableName table, final String purpose) {
+    schedLock.lock();
+    TableQueue queue = getTableQueue(table);
+    boolean hasXLock = queue.tryExclusiveLock();
+    if (!hasXLock) {
+      schedLock.unlock();
+      return false;
+    }
+
+    removeFromRunQueue(tableRunQueue, queue);
+    schedLock.unlock();
+
+    // Zk lock is expensive...
+    hasXLock = queue.tryZkExclusiveLock(lockManager, purpose);
+    if (!hasXLock) {
+      schedLock.lock();
+      queue.releaseExclusiveLock();
+      addToRunQueue(tableRunQueue, queue);
+      schedLock.unlock();
+    }
+    return hasXLock;
+  }
+
+  /**
+   * Release the exclusive lock taken with tryAcquireTableWrite()
+   * @param table the name of the table that has the exclusive lock
+   */
+  public void releaseTableExclusiveLock(final TableName table) {
+    schedLock.lock();
+    TableQueue queue = getTableQueue(table);
+    schedLock.unlock();
+
+    // Zk lock is expensive...
+    queue.releaseZkExclusiveLock(lockManager);
+
+    schedLock.lock();
+    queue.releaseExclusiveLock();
+    addToRunQueue(tableRunQueue, queue);
+    schedLock.unlock();
+  }
+
+  /**
+   * Try to acquire the shared lock on the specified table.
+   * other "read" operations in the table-queue may be executed concurrently,
+   * @param table Table to lock
+   * @param purpose Human readable reason for locking the table
+   * @return true if we were able to acquire the lock on the table, otherwise false.
+   */
+  public boolean tryAcquireTableSharedLock(final TableName table, final String purpose) {
+    return getTableQueueWithLock(table).trySharedLock(lockManager, purpose);
+  }
+
+  /**
+   * Release the shared lock taken with tryAcquireTableRead()
+   * @param table the name of the table that has the shared lock
+   */
+  public void releaseTableSharedLock(final TableName table) {
+    getTableQueueWithLock(table).releaseSharedLock(lockManager);
+  }
+
+  /**
+   * Tries to remove the queue and the table-lock of the specified table.
+   * If there are new operations pending (e.g. a new create),
+   * the remove will not be performed.
+   * @param table the name of the table that should be marked as deleted
+   * @return true if deletion succeeded, false otherwise meaning that there are
+   *     other new operations pending for that table (e.g. a new create).
+   */
+  protected boolean markTableAsDeleted(final TableName table) {
+    final ReentrantLock l = schedLock;
+    l.lock();
+    try {
+      TableQueue queue = getTableQueue(table);
+      if (queue == null) return true;
+
+      if (queue.isEmpty() && queue.acquireDeleteLock()) {
+        // remove the table from the run-queue and the map
+        if (IterableList.isLinked(queue)) {
+          tableRunQueue.remove(queue);
+        }
+
+        // Remove the table lock
+        try {
+          lockManager.tableDeleted(table);
+        } catch (IOException e) {
+          LOG.warn("Received exception from TableLockManager.tableDeleted:", e); //not critical
+        }
+
+        removeTableQueue(table);
+      } else {
+        // TODO: If there are no create, we can drop all the other ops
+        return false;
+      }
+    } finally {
+      l.unlock();
+    }
+    return true;
+  }
+
+  // ============================================================================
+  //  Server Locking Helpers
+  // ============================================================================
+  /**
+   * Release the exclusive lock
+   * @see #tryAcquireServerExclusiveLock(ServerName)
+   * @param serverName the server that has the exclusive lock
+   */
+  public boolean tryAcquireServerExclusiveLock(final ServerName serverName) {
+    schedLock.lock();
+    try {
+      ServerQueue queue = getServerQueue(serverName);
+      if (queue.tryExclusiveLock()) {
+        removeFromRunQueue(serverRunQueue, queue);
+        return true;
+      }
+    } finally {
+      schedLock.unlock();
+    }
+    return false;
+  }
+
+  /**
+   * Release the exclusive lock
+   * @see #tryAcquireServerExclusiveLock(ServerName)
+   * @param serverName the server that has the exclusive lock
+   */
+  public void releaseServerExclusiveLock(final ServerName serverName) {
+    schedLock.lock();
+    try {
+      ServerQueue queue = getServerQueue(serverName);
+      queue.releaseExclusiveLock();
+      addToRunQueue(serverRunQueue, queue);
+    } finally {
+      schedLock.unlock();
+    }
+  }
+
+  /**
+   * Try to acquire the shared lock on the specified server.
+   * @see #releaseServerSharedLock(ServerName)
+   * @param serverName Server to lock
+   * @return true if we were able to acquire the lock on the server, otherwise false.
+   */
+  public boolean tryAcquireServerSharedLock(final ServerName serverName) {
+    return getServerQueueWithLock(serverName).trySharedLock();
+  }
+
+  /**
+   * Release the shared lock taken
+   * @see #tryAcquireServerSharedLock(ServerName)
+   * @param serverName the server that has the shared lock
+   */
+  public void releaseServerSharedLock(final ServerName serverName) {
+    getServerQueueWithLock(serverName).releaseSharedLock();
+  }
+
+  // ============================================================================
+  //  Generic Helpers
+  // ============================================================================
+  private static interface QueueInterface {
+    boolean isAvailable();
+    boolean isEmpty();
+    int size();
+    void add(Procedure proc, boolean addFront);
+    boolean requireExclusiveLock(Procedure proc);
+    Procedure poll();
+
+    boolean isSuspended();
+  }
+
+  private static abstract class Queue<TKey extends Comparable<TKey>> implements QueueInterface {
+    private Queue<TKey> avlRight = null;
+    private Queue<TKey> avlLeft = null;
+    private int avlHeight = 1;
+
+    private Queue<TKey> iterNext = null;
+    private Queue<TKey> iterPrev = null;
+    private boolean suspended = false;
+
+    private boolean exclusiveLock = false;
+    private int sharedLock = 0;
+
+    private final TKey key;
+    private final int priority;
+
+    public Queue(TKey key) {
+      this(key, 1);
+    }
+
+    public Queue(TKey key, int priority) {
+      this.key = key;
+      this.priority = priority;
+    }
+
+    protected TKey getKey() {
+      return key;
+    }
+
+    protected int getPriority() {
+      return priority;
+    }
+
+    /**
+     * True if the queue is not in the run-queue and it is owned by an event.
+     */
+    public boolean isSuspended() {
+      return suspended;
+    }
+
+    protected boolean setSuspended(boolean isSuspended) {
+      if (this.suspended == isSuspended) return false;
+      this.suspended = isSuspended;
+      return true;
+    }
+
+    // ======================================================================
+    //  Read/Write Locking helpers
+    // ======================================================================
+    public synchronized boolean isLocked() {
+      return hasExclusiveLock() || sharedLock > 0;
+    }
+
+    public synchronized boolean hasExclusiveLock() {
+      return this.exclusiveLock;
+    }
+
+    public synchronized boolean trySharedLock() {
+      if (hasExclusiveLock()) return false;
+      sharedLock++;
+      return true;
+    }
+
+    public synchronized void releaseSharedLock() {
+      sharedLock--;
+    }
+
+    protected synchronized boolean isSingleSharedLock() {
+      return sharedLock == 1;
+    }
+
+    public synchronized boolean tryExclusiveLock() {
+      if (isLocked()) return false;
+      exclusiveLock = true;
+      return true;
+    }
+
+    public synchronized void releaseExclusiveLock() {
+      exclusiveLock = false;
+    }
+
+    public synchronized boolean acquireDeleteLock() {
+      return tryExclusiveLock();
+    }
+
+    // This should go away when we have the new AM and its events
+    // and we move xlock to the lock-event-queue.
+    public synchronized boolean isAvailable() {
+      return !exclusiveLock && !isEmpty();
+    }
+
+    // ======================================================================
+    //  Generic Helpers
+    // ======================================================================
+    public int compareKey(TKey cmpKey) {
+      return key.compareTo(cmpKey);
+    }
+
+    public int compareTo(Queue<TKey> other) {
+      return compareKey(other.key);
+    }
+
+    @Override
+    public String toString() {
+      return String.format("%s(%s)", getClass().getSimpleName(), key);
+    }
+  }
+
+  // ======================================================================
+  //  Helper Data Structures
+  // ======================================================================
+  private static abstract class QueueImpl<TKey extends Comparable<TKey>> extends Queue<TKey> {
+    private final ArrayDeque<Procedure> runnables = new ArrayDeque<Procedure>();
+
+    public QueueImpl(TKey key) {
+      super(key);
+    }
+
+    public QueueImpl(TKey key, int priority) {
+      super(key, priority);
+    }
+
+    public void add(final Procedure proc, final boolean addToFront) {
+      if (addToFront) {
+        addFront(proc);
+      } else {
+        addBack(proc);
+      }
+    }
+
+    protected void addFront(final Procedure proc) {
+      runnables.addFirst(proc);
+    }
+
+    protected void addBack(final Procedure proc) {
+      runnables.addLast(proc);
+    }
+
+    @Override
+    public Procedure poll() {
+      return runnables.poll();
+    }
+
+    @Override
+    public boolean isEmpty() {
+      return runnables.isEmpty();
+    }
+
+    public int size() {
+      return runnables.size();
+    }
+  }
+
+  private static class FairQueue<T extends Comparable<T>> {
+    private final int quantum;
+
+    private Queue<T> currentQueue = null;
+    private Queue<T> queueHead = null;
+    private int currentQuantum = 0;
+
+    public FairQueue() {
+      this(1);
+    }
+
+    public FairQueue(int quantum) {
+      this.quantum = quantum;
+    }
+
+    public void add(Queue<T> queue) {
+      queueHead = IterableList.append(queueHead, queue);
+      if (currentQueue == null) setNextQueue(queueHead);
+    }
+
+    public void remove(Queue<T> queue) {
+      Queue<T> nextQueue = queue.iterNext;
+      queueHead = IterableList.remove(queueHead, queue);
+      if (currentQueue == queue) {
+        setNextQueue(queueHead != null ? nextQueue : null);
+      }
+    }
+
+    public Queue<T> poll() {
+      if (currentQuantum == 0) {
+        if (!nextQueue()) {
+          return null; // nothing here
+        }
+        currentQuantum = calculateQuantum(currentQueue) - 1;
+      } else {
+        currentQuantum--;
+      }
+
+      // This should go away when we have the new AM and its events
+      if (!currentQueue.isAvailable()) {
+        Queue<T> lastQueue = currentQueue;
+        do {
+          if (!nextQueue())
+            return null;
+        } while (currentQueue != lastQueue && !currentQueue.isAvailable());
+
+        currentQuantum = calculateQuantum(currentQueue) - 1;
+      }
+      return currentQueue;
+    }
+
+    private boolean nextQueue() {
+      if (currentQueue == null) return false;
+      currentQueue = currentQueue.iterNext;
+      return currentQueue != null;
+    }
+
+    private void setNextQueue(Queue<T> queue) {
+      currentQueue = queue;
+      if (queue != null) {
+        currentQuantum = calculateQuantum(currentQueue);
+      } else {
+        currentQuantum = 0;
+      }
+    }
+
+    private int calculateQuantum(final Queue queue) {
+      return Math.max(1, queue.getPriority() * quantum); // TODO
+    }
+  }
+
+  private static class AvlTree {
+    public static <T extends Comparable<T>> Queue<T> get(Queue<T> root, T key) {
+      while (root != null) {
+        int cmp = root.compareKey(key);
+        if (cmp > 0) {
+          root = root.avlLeft;
+        } else if (cmp < 0) {
+          root = root.avlRight;
+        } else {
+          return root;
+        }
+      }
+      return null;
+    }
+
+    public static <T extends Comparable<T>> Queue<T> getFirst(Queue<T> root) {
+      if (root != null) {
+        while (root.avlLeft != null) {
+          root = root.avlLeft;
+        }
+      }
+      return root;
+    }
+
+    public static <T extends Comparable<T>> Queue<T> getLast(Queue<T> root) {
+      if (root != null) {
+        while (root.avlRight != null) {
+          root = root.avlRight;
+        }
+      }
+      return root;
+    }
+
+    public static <T extends Comparable<T>> Queue<T> insert(Queue<T> root, Queue<T> node) {
+      if (root == null) return node;
+      if (node.compareTo(root) < 0) {
+        root.avlLeft = insert(root.avlLeft, node);
+      } else {
+        root.avlRight = insert(root.avlRight, node);
+      }
+      return balance(root);
+    }
+
+    private static <T extends Comparable<T>> Queue<T> removeMin(Queue<T> p) {
+      if (p.avlLeft == null)
+        return p.avlRight;
+      p.avlLeft = removeMin(p.avlLeft);
+      return balance(p);
+    }
+
+    public static <T extends Comparable<T>> Queue<T> remove(Queue<T> root, T key) {
+      if (root == null) return null;
+
+      int cmp = root.compareKey(key);
+      if (cmp == 0) {
+        Queue<T> q = root.avlLeft;
+        Queue<T> r = root.avlRight;
+        if (r == null) return q;
+        Queue<T> min = getFirst(r);
+        min.avlRight = removeMin(r);
+        min.avlLeft = q;
+        return balance(min);
+      } else if (cmp > 0) {
+        root.avlLeft = remove(root.avlLeft, key);
+      } else /* if (cmp < 0) */ {
+        root.avlRight = remove(root.avlRight, key);
+      }
+      return balance(root);
+    }
+
+    private static <T extends Comparable<T>> Queue<T> balance(Queue<T> p) {
+      fixHeight(p);
+      int balance = balanceFactor(p);
+      if (balance == 2) {
+        if (balanceFactor(p.avlRight) < 0) {
+          p.avlRight = rotateRight(p.avlRight);
+        }
+        return rotateLeft(p);
+      } else if (balance == -2) {
+        if (balanceFactor(p.avlLeft) > 0) {
+          p.avlLeft = rotateLeft(p.avlLeft);
+        }
+        return rotateRight(p);
+      }
+      return p;
+    }
+
+    private static <T extends Comparable<T>> Queue<T> rotateRight(Queue<T> p) {
+      Queue<T> q = p.avlLeft;
+      p.avlLeft = q.avlRight;
+      q.avlRight = p;
+      fixHeight(p);
+      fixHeight(q);
+      return q;
+    }
+
+    private static <T extends Comparable<T>> Queue<T> rotateLeft(Queue<T> q) {
+      Queue<T> p = q.avlRight;
+      q.avlRight = p.avlLeft;
+      p.avlLeft = q;
+      fixHeight(q);
+      fixHeight(p);
+      return p;
+    }
+
+    private static <T extends Comparable<T>> void fixHeight(Queue<T> node) {
+      int heightLeft = height(node.avlLeft);
+      int heightRight = height(node.avlRight);
+      node.avlHeight = 1 + Math.max(heightLeft, heightRight);
+    }
+
+    private static <T extends Comparable<T>> int height(Queue<T> node) {
+      return node != null ? node.avlHeight : 0;
+    }
+
+    private static <T extends Comparable<T>> int balanceFactor(Queue<T> node) {
+      return height(node.avlRight) - height(node.avlLeft);
+    }
+  }
+
+  private static class IterableList {
+    public static <T extends Comparable<T>> Queue<T> prepend(Queue<T> head, Queue<T> node) {
+      assert !isLinked(node) : node + " is already linked";
+      if (head != null) {
+        Queue<T> tail = head.iterPrev;
+        tail.iterNext = node;
+        head.iterPrev = node;
+        node.iterNext = head;
+        node.iterPrev = tail;
+      } else {
+        node.iterNext = node;
+        node.iterPrev = node;
+      }
+      return node;
+    }
+
+    public static <T extends Comparable<T>> Queue<T> append(Queue<T> head, Queue<T> node) {
+      assert !isLinked(node) : node + " is already linked";
+      if (head != null) {
+        Queue<T> tail = head.iterPrev;
+        tail.iterNext = node;
+        node.iterNext = head;
+        node.iterPrev = tail;
+        head.iterPrev = node;
+        return head;
+      }
+      node.iterNext = node;
+      node.iterPrev = node;
+      return node;
+    }
+
+    public static <T extends Comparable<T>> Queue<T> appendList(Queue<T> head, Queue<T> otherHead) {
+      if (head == null) return otherHead;
+      if (otherHead == null) return head;
+
+      Queue<T> tail = head.iterPrev;
+      Queue<T> otherTail = otherHead.iterPrev;
+      tail.iterNext = otherHead;
+      otherHead.iterPrev = tail;
+      otherTail.iterNext = head;
+      head.iterPrev = otherTail;
+      return head;
+    }
+
+    private static <T extends Comparable<T>> Queue<T> remove(Queue<T> head, Queue<T> node) {
+      assert isLinked(node) : node + " is not linked";
+      if (node != node.iterNext) {
+        node.iterPrev.iterNext = node.iterNext;
+        node.iterNext.iterPrev = node.iterPrev;
+        head = (head == node) ? node.iterNext : head;
+      } else {
+        head = null;
+      }
+      node.iterNext = null;
+      node.iterPrev = null;
+      return head;
+    }
+
+    private static <T extends Comparable<T>> boolean isLinked(Queue<T> node) {
+      return node.iterPrev != null && node.iterNext != null;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java
index b858e0c..3a30527 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.InvalidFamilyOperationException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.TableState;
-import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -182,10 +181,8 @@ public class ModifyColumnFamilyProcedure
 
   @Override
   protected boolean acquireLock(final MasterProcedureEnv env) {
-    if (!env.isInitialized()) return false;
-    return env.getProcedureQueue().tryAcquireTableExclusiveLock(
-      tableName,
-      EventType.C_M_MODIFY_FAMILY.toString());
+    if (env.waitInitialized(this)) return false;
+    return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "modify family");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
index a6300dd..6663e46 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
@@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableState;
-import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos;
@@ -215,10 +214,8 @@ public class ModifyTableProcedure
 
   @Override
   protected boolean acquireLock(final MasterProcedureEnv env) {
-    if (!env.isInitialized()) return false;
-    return env.getProcedureQueue().tryAcquireTableExclusiveLock(
-      getTableName(),
-      EventType.C_M_MODIFY_TABLE.toString());
+    if (env.waitInitialized(this)) return false;
+    return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "modify table");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
index bdcd89c..970c9c9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
@@ -553,13 +553,13 @@ implements ServerProcedureInterface {
 
   @Override
   protected boolean acquireLock(final MasterProcedureEnv env) {
-    if (!env.getMasterServices().isServerCrashProcessingEnabled()) return false;
-    return env.getProcedureQueue().tryAcquireServerExclusiveLock(this);
+    if (env.waitServerCrashProcessingEnabled(this)) return false;
+    return env.getProcedureQueue().tryAcquireServerExclusiveLock(getServerName());
   }
 
   @Override
   protected void releaseLock(final MasterProcedureEnv env) {
-    env.getProcedureQueue().releaseServerExclusiveLock(this);
+    env.getProcedureQueue().releaseServerExclusiveLock(getServerName());
   }
 
   @Override
@@ -751,6 +751,11 @@ implements ServerProcedureInterface {
     return this.carryingMeta;
   }
 
+  @Override
+  public ServerOperationType getServerOperationType() {
+    return ServerOperationType.CRASH_HANDLER;
+  }
+
   /**
    * For this procedure, yield at end of each successful flow step so that all crashed servers
    * can make progress rather than do the default which has each procedure running to completion

http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java
index 5b0c45f..b5c24ff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java
@@ -28,6 +28,10 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public interface ServerProcedureInterface {
+  public enum ServerOperationType {
+    CRASH_HANDLER
+  };
+
   /**
    * @return Name of this server instance.
    */
@@ -37,4 +41,12 @@ public interface ServerProcedureInterface {
    * @return True if this server has an hbase:meta table region.
    */
   boolean hasMetaTableRegion();
-}
\ No newline at end of file
+
+  /**
+   * Given an operation type we can take decisions about what to do with pending operations.
+   * e.g. if we get a crash handler and we have some assignment operation pending
+   * we can abort those operations.
+   * @return the operation type that the procedure is executing.
+   */
+  ServerOperationType getServerOperationType();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java
index 2e39b80..0d17bf6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java
@@ -182,7 +182,7 @@ public class TruncateTableProcedure
 
   @Override
   protected boolean acquireLock(final MasterProcedureEnv env) {
-    if (!env.isInitialized()) return false;
+    if (env.waitInitialized(this)) return false;
     return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "truncate table");
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java
index 4d0093c..99e0e3f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java
@@ -128,14 +128,14 @@ public class TestMaster {
     MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
     HMaster m = cluster.getMaster();
     try {
-      m.initialized = false; // fake it, set back later
+      m.setInitialized(false); // fake it, set back later
       HRegionInfo meta = HRegionInfo.FIRST_META_REGIONINFO;
       m.move(meta.getEncodedNameAsBytes(), null);
       fail("Region should not be moved since master is not initialized");
     } catch (IOException ioe) {
       assertTrue(ioe instanceof PleaseHoldException);
     } finally {
-      m.initialized = true;
+      m.setInitialized(true);
     }
   }
 
@@ -172,13 +172,13 @@ public class TestMaster {
     try {
       List<HRegionInfo> tableRegions = admin.getTableRegions(tableName);
 
-      master.initialized = false; // fake it, set back later
+      master.setInitialized(false); // fake it, set back later
       admin.move(tableRegions.get(0).getEncodedNameAsBytes(), null);
       fail("Region should not be moved since master is not initialized");
     } catch (IOException ioe) {
       assertTrue(StringUtils.stringifyException(ioe).contains("PleaseHoldException"));
     } finally {
-      master.initialized = true;
+      master.setInitialized(true);
       TEST_UTIL.deleteTable(tableName);
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
index 398a898..cafee7a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
@@ -306,7 +306,7 @@ public class TestMasterNoCluster {
 
     try {
       // Wait till master is initialized.
-      while (!master.initialized) Threads.sleep(10);
+      while (!master.isInitialized()) Threads.sleep(10);
       LOG.info("Master is initialized");
 
       assertFalse("The dead server should not be pulled in",

http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java
new file mode 100644
index 0000000..af8d6ba
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category({MasterTests.class, MediumTests.class})
+public class TestMasterProcedureEvents {
+  private static final Log LOG = LogFactory.getLog(TestCreateTableProcedure.class);
+
+  protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static long nonceGroup = HConstants.NO_NONCE;
+  private static long nonce = HConstants.NO_NONCE;
+
+  private static void setupConf(Configuration conf) {
+    conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 8);
+    conf.setBoolean("hbase.procedure.store.wal.use.hsync", false);
+  }
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    setupConf(UTIL.getConfiguration());
+    UTIL.startMiniCluster(3);
+  }
+
+  @AfterClass
+  public static void cleanupTest() throws Exception {
+    try {
+      UTIL.shutdownMiniCluster();
+    } catch (Exception e) {
+      LOG.warn("failure shutting down cluster", e);
+    }
+  }
+
+  @Test
+  public void testMasterInitializedEvent() throws Exception {
+    TableName tableName = TableName.valueOf("testMasterInitializedEvent");
+    HMaster master = UTIL.getMiniHBaseCluster().getMaster();
+    ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
+    MasterProcedureScheduler procSched = procExec.getEnvironment().getProcedureQueue();
+
+    HRegionInfo hri = new HRegionInfo(tableName);
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    HColumnDescriptor hcd = new HColumnDescriptor("f");
+    htd.addFamily(hcd);
+
+    while (!master.isInitialized()) Thread.sleep(250);
+    master.setInitialized(false); // fake it, set back later
+
+    CreateTableProcedure proc = new CreateTableProcedure(
+      procExec.getEnvironment(), htd, new HRegionInfo[] { hri });
+
+    long pollCalls = procSched.getPollCalls();
+    long nullPollCalls = procSched.getNullPollCalls();
+
+    long procId = procExec.submitProcedure(proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
+    for (int i = 0; i < 10; ++i) {
+      Thread.sleep(100);
+      assertEquals(pollCalls + 1, procSched.getPollCalls());
+      assertEquals(nullPollCalls, procSched.getNullPollCalls());
+    }
+
+    master.setInitialized(true);
+    ProcedureTestingUtility.waitProcedure(procExec, procId);
+
+    assertEquals(pollCalls + 2, procSched.getPollCalls());
+    assertEquals(nullPollCalls, procSched.getNullPollCalls());
+  }
+
+  @Test
+  public void testServerCrashProcedureEvent() throws Exception {
+    TableName tableName = TableName.valueOf("testServerCrashProcedureEventTb");
+    HMaster master = UTIL.getMiniHBaseCluster().getMaster();
+    ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
+    MasterProcedureScheduler procSched = procExec.getEnvironment().getProcedureQueue();
+
+    while (!master.isServerCrashProcessingEnabled() || !master.isInitialized() ||
+        master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
+      Thread.sleep(25);
+    }
+
+    UTIL.createTable(tableName, HBaseTestingUtility.COLUMNS[0]);
+    try (Table t = UTIL.getConnection().getTable(tableName)) {
+      // Load the table with a bit of data so some logs to split and some edits in each region.
+      UTIL.loadTable(t, HBaseTestingUtility.COLUMNS[0]);
+    }
+
+    master.setServerCrashProcessingEnabled(false);  // fake it, set back later
+
+    long pollCalls = procSched.getPollCalls();
+    long nullPollCalls = procSched.getNullPollCalls();
+
+    // Kill a server. Master will notice but do nothing other than add it to list of dead servers.
+    HRegionServer hrs = getServerWithRegions();
+    boolean carryingMeta = master.getAssignmentManager().isCarryingMeta(hrs.getServerName());
+    UTIL.getHBaseCluster().killRegionServer(hrs.getServerName());
+    hrs.join();
+
+    // Wait until the expiration of the server has arrived at the master. We won't process it
+    // by queuing a ServerCrashProcedure because we have disabled crash processing... but wait
+    // here so ServerManager gets notice and adds expired server to appropriate queues.
+    while (!master.getServerManager().isServerDead(hrs.getServerName())) Thread.sleep(10);
+
+    // Do some of the master processing of dead servers so when SCP runs, it has expected 'state'.
+    master.getServerManager().moveFromOnelineToDeadServers(hrs.getServerName());
+
+    long procId = procExec.submitProcedure(
+      new ServerCrashProcedure(hrs.getServerName(), true, carryingMeta));
+
+    for (int i = 0; i < 10; ++i) {
+      Thread.sleep(100);
+      assertEquals(pollCalls + 1, procSched.getPollCalls());
+      assertEquals(nullPollCalls, procSched.getNullPollCalls());
+    }
+
+    // Now, reenable processing else we can't get a lock on the ServerCrashProcedure.
+    master.setServerCrashProcessingEnabled(true);
+    ProcedureTestingUtility.waitProcedure(procExec, procId);
+
+    LOG.debug("server crash processing poll calls: " + procSched.getPollCalls());
+    assertTrue(procSched.getPollCalls() >= (pollCalls + 2));
+    assertEquals(nullPollCalls, procSched.getNullPollCalls());
+
+    UTIL.deleteTable(tableName);
+  }
+
+  private HRegionServer getServerWithRegions() {
+    for (int i = 0; i < 3; ++i) {
+      HRegionServer hrs = UTIL.getHBaseCluster().getRegionServer(i);
+      if (hrs.getNumberOfOnlineRegions() > 0) {
+        return hrs;
+      }
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureQueue.java
deleted file mode 100644
index 7e6e356..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureQueue.java
+++ /dev/null
@@ -1,484 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.master.procedure;
-
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.master.TableLockManager;
-import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.testclassification.MasterTests;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-@Category({MasterTests.class, SmallTests.class})
-public class TestMasterProcedureQueue {
-  private static final Log LOG = LogFactory.getLog(TestMasterProcedureQueue.class);
-
-  private MasterProcedureQueue queue;
-  private Configuration conf;
-
-  @Before
-  public void setUp() throws IOException {
-    conf = HBaseConfiguration.create();
-    queue = new MasterProcedureQueue(conf, new TableLockManager.NullTableLockManager());
-  }
-
-  @After
-  public void tearDown() throws IOException {
-    assertEquals(0, queue.size());
-  }
-
-  @Test
-  public void testConcurrentCreateDelete() throws Exception {
-    final MasterProcedureQueue procQueue = queue;
-    final TableName table = TableName.valueOf("testtb");
-    final AtomicBoolean running = new AtomicBoolean(true);
-    final AtomicBoolean failure = new AtomicBoolean(false);
-    Thread createThread = new Thread() {
-      @Override
-      public void run() {
-        try {
-          while (running.get() && !failure.get()) {
-            if (procQueue.tryAcquireTableExclusiveLock(table, "create")) {
-              procQueue.releaseTableExclusiveLock(table);
-            }
-          }
-        } catch (Throwable e) {
-          LOG.error("create failed", e);
-          failure.set(true);
-        }
-      }
-    };
-
-    Thread deleteThread = new Thread() {
-      @Override
-      public void run() {
-        try {
-          while (running.get() && !failure.get()) {
-            if (procQueue.tryAcquireTableExclusiveLock(table, "delete")) {
-              procQueue.releaseTableExclusiveLock(table);
-            }
-            procQueue.markTableAsDeleted(table);
-          }
-        } catch (Throwable e) {
-          LOG.error("delete failed", e);
-          failure.set(true);
-        }
-      }
-    };
-
-    createThread.start();
-    deleteThread.start();
-    for (int i = 0; i < 100 && running.get() && !failure.get(); ++i) {
-      Thread.sleep(100);
-    }
-    running.set(false);
-    createThread.join();
-    deleteThread.join();
-    assertEquals(false, failure.get());
-  }
-
-  /**
-   * Verify simple create/insert/fetch/delete of the table queue.
-   */
-  @Test
-  public void testSimpleTableOpsQueues() throws Exception {
-    final int NUM_TABLES = 10;
-    final int NUM_ITEMS = 10;
-
-    int count = 0;
-    for (int i = 1; i <= NUM_TABLES; ++i) {
-      TableName tableName = TableName.valueOf(String.format("test-%04d", i));
-      // insert items
-      for (int j = 1; j <= NUM_ITEMS; ++j) {
-        queue.addBack(new TestTableProcedure(i * 1000 + j, tableName,
-          TableProcedureInterface.TableOperationType.EDIT));
-        assertEquals(++count, queue.size());
-      }
-    }
-    assertEquals(NUM_TABLES * NUM_ITEMS, queue.size());
-
-    for (int j = 1; j <= NUM_ITEMS; ++j) {
-      for (int i = 1; i <= NUM_TABLES; ++i) {
-        Long procId = queue.poll();
-        assertEquals(--count, queue.size());
-        assertEquals(i * 1000 + j, procId.longValue());
-      }
-    }
-    assertEquals(0, queue.size());
-
-    for (int i = 1; i <= NUM_TABLES; ++i) {
-      TableName tableName = TableName.valueOf(String.format("test-%04d", i));
-      // complete the table deletion
-      assertTrue(queue.markTableAsDeleted(tableName));
-    }
-  }
-
-  /**
-   * Check that the table queue is not deletable until every procedure
-   * in-progress is completed (this is a special case for write-locks).
-   */
-  @Test
-  public void testCreateDeleteTableOperationsWithWriteLock() throws Exception {
-    TableName tableName = TableName.valueOf("testtb");
-
-    queue.addBack(new TestTableProcedure(1, tableName,
-          TableProcedureInterface.TableOperationType.EDIT));
-
-    // table can't be deleted because one item is in the queue
-    assertFalse(queue.markTableAsDeleted(tableName));
-
-    // fetch item and take a lock
-    assertEquals(1, queue.poll().longValue());
-    // take the xlock
-    assertTrue(queue.tryAcquireTableExclusiveLock(tableName, "write"));
-    // table can't be deleted because we have the lock
-    assertEquals(0, queue.size());
-    assertFalse(queue.markTableAsDeleted(tableName));
-    // release the xlock
-    queue.releaseTableExclusiveLock(tableName);
-    // complete the table deletion
-    assertTrue(queue.markTableAsDeleted(tableName));
-  }
-
-  /**
-   * Check that the table queue is not deletable until every procedure
-   * in-progress is completed (this is a special case for read-locks).
-   */
-  @Test
-  public void testCreateDeleteTableOperationsWithReadLock() throws Exception {
-    final TableName tableName = TableName.valueOf("testtb");
-    final int nitems = 2;
-
-    for (int i = 1; i <= nitems; ++i) {
-      queue.addBack(new TestTableProcedure(i, tableName,
-            TableProcedureInterface.TableOperationType.READ));
-    }
-
-    // table can't be deleted because one item is in the queue
-    assertFalse(queue.markTableAsDeleted(tableName));
-
-    for (int i = 1; i <= nitems; ++i) {
-      // fetch item and take a lock
-      assertEquals(i, queue.poll().longValue());
-      // take the rlock
-      assertTrue(queue.tryAcquireTableSharedLock(tableName, "read " + i));
-      // table can't be deleted because we have locks and/or items in the queue
-      assertFalse(queue.markTableAsDeleted(tableName));
-    }
-
-    for (int i = 1; i <= nitems; ++i) {
-      // table can't be deleted because we have locks
-      assertFalse(queue.markTableAsDeleted(tableName));
-      // release the rlock
-      queue.releaseTableSharedLock(tableName);
-    }
-
-    // there are no items and no lock in the queeu
-    assertEquals(0, queue.size());
-    // complete the table deletion
-    assertTrue(queue.markTableAsDeleted(tableName));
-  }
-
-  /**
-   * Verify the correct logic of RWLocks on the queue
-   */
-  @Test
-  public void testVerifyRwLocks() throws Exception {
-    TableName tableName = TableName.valueOf("testtb");
-    queue.addBack(new TestTableProcedure(1, tableName,
-          TableProcedureInterface.TableOperationType.EDIT));
-    queue.addBack(new TestTableProcedure(2, tableName,
-          TableProcedureInterface.TableOperationType.READ));
-    queue.addBack(new TestTableProcedure(3, tableName,
-          TableProcedureInterface.TableOperationType.EDIT));
-    queue.addBack(new TestTableProcedure(4, tableName,
-          TableProcedureInterface.TableOperationType.READ));
-    queue.addBack(new TestTableProcedure(5, tableName,
-          TableProcedureInterface.TableOperationType.READ));
-
-    // Fetch the 1st item and take the write lock
-    Long procId = queue.poll();
-    assertEquals(1, procId.longValue());
-    assertEquals(true, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId));
-
-    // Fetch the 2nd item and verify that the lock can't be acquired
-    assertEquals(null, queue.poll());
-
-    // Release the write lock and acquire the read lock
-    queue.releaseTableExclusiveLock(tableName);
-
-    // Fetch the 2nd item and take the read lock
-    procId = queue.poll();
-    assertEquals(2, procId.longValue());
-    assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId));
-
-    // Fetch the 3rd item and verify that the lock can't be acquired
-    procId = queue.poll();
-    assertEquals(3, procId.longValue());
-    assertEquals(false, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId));
-
-    // release the rdlock of item 2 and take the wrlock for the 3d item
-    queue.releaseTableSharedLock(tableName);
-    assertEquals(true, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId));
-
-    // Fetch 4th item and verify that the lock can't be acquired
-    assertEquals(null, queue.poll());
-
-    // Release the write lock and acquire the read lock
-    queue.releaseTableExclusiveLock(tableName);
-
-    // Fetch the 4th item and take the read lock
-    procId = queue.poll();
-    assertEquals(4, procId.longValue());
-    assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId));
-
-    // Fetch the 4th item and take the read lock
-    procId = queue.poll();
-    assertEquals(5, procId.longValue());
-    assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId));
-
-    // Release 4th and 5th read-lock
-    queue.releaseTableSharedLock(tableName);
-    queue.releaseTableSharedLock(tableName);
-
-    // remove table queue
-    assertEquals(0, queue.size());
-    assertTrue("queue should be deleted", queue.markTableAsDeleted(tableName));
-  }
-
-  /**
-   * Verify that "write" operations for a single table are serialized,
-   * but different tables can be executed in parallel.
-   */
-  @Test(timeout=90000)
-  public void testConcurrentWriteOps() throws Exception {
-    final TestTableProcSet procSet = new TestTableProcSet(queue);
-
-    final int NUM_ITEMS = 10;
-    final int NUM_TABLES = 4;
-    final AtomicInteger opsCount = new AtomicInteger(0);
-    for (int i = 0; i < NUM_TABLES; ++i) {
-      TableName tableName = TableName.valueOf(String.format("testtb-%04d", i));
-      for (int j = 1; j < NUM_ITEMS; ++j) {
-        procSet.addBack(new TestTableProcedure(i * 100 + j, tableName,
-          TableProcedureInterface.TableOperationType.EDIT));
-        opsCount.incrementAndGet();
-      }
-    }
-    assertEquals(opsCount.get(), queue.size());
-
-    final Thread[] threads = new Thread[NUM_TABLES * 2];
-    final HashSet<TableName> concurrentTables = new HashSet<TableName>();
-    final ArrayList<String> failures = new ArrayList<String>();
-    final AtomicInteger concurrentCount = new AtomicInteger(0);
-    for (int i = 0; i < threads.length; ++i) {
-      threads[i] = new Thread() {
-        @Override
-        public void run() {
-          while (opsCount.get() > 0) {
-            try {
-              TableProcedureInterface proc = procSet.acquire();
-              if (proc == null) {
-                queue.signalAll();
-                if (opsCount.get() > 0) {
-                  continue;
-                }
-                break;
-              }
-              synchronized (concurrentTables) {
-                assertTrue("unexpected concurrency on " + proc.getTableName(),
-                  concurrentTables.add(proc.getTableName()));
-              }
-              assertTrue(opsCount.decrementAndGet() >= 0);
-              try {
-                long procId = ((Procedure)proc).getProcId();
-                TableName tableId = proc.getTableName();
-                int concurrent = concurrentCount.incrementAndGet();
-                assertTrue("inc-concurrent="+ concurrent +" 1 <= concurrent <= "+ NUM_TABLES,
-                  concurrent >= 1 && concurrent <= NUM_TABLES);
-                LOG.debug("[S] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent);
-                Thread.sleep(2000);
-                concurrent = concurrentCount.decrementAndGet();
-                LOG.debug("[E] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent);
-                assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_TABLES);
-              } finally {
-                synchronized (concurrentTables) {
-                  assertTrue(concurrentTables.remove(proc.getTableName()));
-                }
-                procSet.release(proc);
-              }
-            } catch (Throwable e) {
-              LOG.error("Failed " + e.getMessage(), e);
-              synchronized (failures) {
-                failures.add(e.getMessage());
-              }
-            } finally {
-              queue.signalAll();
-            }
-          }
-        }
-      };
-      threads[i].start();
-    }
-    for (int i = 0; i < threads.length; ++i) {
-      threads[i].join();
-    }
-    assertTrue(failures.toString(), failures.isEmpty());
-    assertEquals(0, opsCount.get());
-    assertEquals(0, queue.size());
-
-    for (int i = 1; i <= NUM_TABLES; ++i) {
-      TableName table = TableName.valueOf(String.format("testtb-%04d", i));
-      assertTrue("queue should be deleted, table=" + table, queue.markTableAsDeleted(table));
-    }
-  }
-
-  public static class TestTableProcSet {
-    private final MasterProcedureQueue queue;
-    private Map<Long, TableProcedureInterface> procsMap =
-      new ConcurrentHashMap<Long, TableProcedureInterface>();
-
-    public TestTableProcSet(final MasterProcedureQueue queue) {
-      this.queue = queue;
-    }
-
-    public void addBack(TableProcedureInterface tableProc) {
-      Procedure proc = (Procedure)tableProc;
-      procsMap.put(proc.getProcId(), tableProc);
-      queue.addBack(proc);
-    }
-
-    public void addFront(TableProcedureInterface tableProc) {
-      Procedure proc = (Procedure)tableProc;
-      procsMap.put(proc.getProcId(), tableProc);
-      queue.addFront(proc);
-    }
-
-    public TableProcedureInterface acquire() {
-      TableProcedureInterface proc = null;
-      boolean avail = false;
-      while (!avail) {
-        Long procId = queue.poll();
-        proc = procId != null ? procsMap.remove(procId) : null;
-        if (proc == null) break;
-        switch (proc.getTableOperationType()) {
-          case CREATE:
-          case DELETE:
-          case EDIT:
-            avail = queue.tryAcquireTableExclusiveLock(proc.getTableName(),
-              "op="+ proc.getTableOperationType());
-            break;
-          case READ:
-            avail = queue.tryAcquireTableSharedLock(proc.getTableName(),
-              "op="+ proc.getTableOperationType());
-            break;
-        }
-        if (!avail) {
-          addFront(proc);
-          LOG.debug("yield procId=" + procId);
-        }
-      }
-      return proc;
-    }
-
-    public void release(TableProcedureInterface proc) {
-      switch (proc.getTableOperationType()) {
-        case CREATE:
-        case DELETE:
-        case EDIT:
-          queue.releaseTableExclusiveLock(proc.getTableName());
-          break;
-        case READ:
-          queue.releaseTableSharedLock(proc.getTableName());
-          break;
-      }
-    }
-  }
-
-  public static class TestTableProcedure extends Procedure<Void>
-      implements TableProcedureInterface {
-    private final TableOperationType opType;
-    private final TableName tableName;
-
-    public TestTableProcedure() {
-      throw new UnsupportedOperationException("recovery should not be triggered here");
-    }
-
-    public TestTableProcedure(long procId, TableName tableName, TableOperationType opType) {
-      this.tableName = tableName;
-      this.opType = opType;
-      setProcId(procId);
-    }
-
-    @Override
-    public TableName getTableName() {
-      return tableName;
-    }
-
-    @Override
-    public TableOperationType getTableOperationType() {
-      return opType;
-    }
-
-    @Override
-    protected Procedure[] execute(Void env) {
-      return null;
-    }
-
-    @Override
-    protected void rollback(Void env) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected boolean abort(Void env) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected void serializeStateData(final OutputStream stream) throws IOException {}
-
-    @Override
-    protected void deserializeStateData(final InputStream stream) throws IOException {}
-  }
-}