You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by al...@apache.org on 2018/11/05 12:23:48 UTC

hbase git commit: HBASE-21423 Procedures for meta table/region should be able to execute in separate workers

Repository: hbase
Updated Branches:
  refs/heads/branch-2.0 5834a4f90 -> 6b8cfd276


HBASE-21423 Procedures for meta table/region should be able to execute in separate workers


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6b8cfd27
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6b8cfd27
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6b8cfd27

Branch: refs/heads/branch-2.0
Commit: 6b8cfd276fc1e33c691f93575ed9ff2df06c08e2
Parents: 5834a4f
Author: Allan Yang <al...@apache.org>
Authored: Mon Nov 5 20:23:19 2018 +0800
Committer: Allan Yang <al...@apache.org>
Committed: Mon Nov 5 20:23:19 2018 +0800

----------------------------------------------------------------------
 .../procedure2/AbstractProcedureScheduler.java  |  29 ++-
 .../hbase/procedure2/ProcedureExecutor.java     |  64 ++++++-
 .../hbase/procedure2/ProcedureScheduler.java    |  17 ++
 .../procedure2/SimpleProcedureScheduler.java    |   2 +-
 .../procedure2/ProcedureTestingUtility.java     |  10 +-
 .../hbase/procedure2/TestChildProcedures.java   |   2 +-
 .../hbase/procedure2/TestProcedureExecutor.java |   2 +-
 .../procedure2/TestProcedureSuspended.java      |   3 +-
 .../hbase/procedure2/TestYieldProcedures.java   |   4 +-
 .../org/apache/hadoop/hbase/master/HMaster.java |   5 +-
 .../procedure/MasterProcedureConstants.java     |   7 +
 .../procedure/MasterProcedureScheduler.java     |   8 +-
 .../TestSplitTableRegionProcedure.java          |   2 +
 .../master/procedure/TestProcedurePriority.java |   1 +
 .../procedure/TestServerCrashProcedure.java     |   2 +
 .../procedure/TestTableDDLProcedureBase.java    |   2 +
 .../procedure/TestUrgentProcedureWorker.java    | 188 +++++++++++++++++++
 17 files changed, 327 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
index 7ab1329..b2a2e5a 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
@@ -139,20 +139,39 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
    * NOTE: this method is called with the sched lock held.
    * @return the Procedure to execute, or null if nothing is available.
    */
-  protected abstract Procedure dequeue();
+  protected Procedure dequeue() {
+    return dequeue(false);
+  }
+
+  protected abstract Procedure dequeue(boolean onlyUrgent);
+
+
+  @Override
+  public Procedure poll(boolean onlyUrgent) {
+    return poll(onlyUrgent, -1);
+  }
 
   @Override
   public Procedure poll() {
-    return poll(-1);
+    return poll(false, -1);
+  }
+
+  @Override
+  public Procedure poll(boolean onlyUrgent, long timeout, TimeUnit unit) {
+    return poll(onlyUrgent, unit.toNanos(timeout));
   }
 
   @Override
   public Procedure poll(long timeout, TimeUnit unit) {
-    return poll(unit.toNanos(timeout));
+    return poll(false, unit.toNanos(timeout));
   }
 
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
   public Procedure poll(final long nanos) {
+    return poll(false, nanos);
+  }
+
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
+  public Procedure poll(final boolean onlyUrgent, final long nanos) {
     schedLock();
     try {
       if (!running) {
@@ -173,7 +192,7 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
           return null;
         }
       }
-      final Procedure pollResult = dequeue();
+      final Procedure pollResult = dequeue(onlyUrgent);
 
       pollCalls++;
       nullPollCalls += (pollResult == null) ? 1 : 0;

http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index b1f3de3..3bd5e0f 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -209,6 +209,11 @@ public class ProcedureExecutor<TEnvironment> {
   private CopyOnWriteArrayList<WorkerThread> workerThreads;
 
   /**
+   * Worker thread only for urgent tasks.
+   */
+  private List<WorkerThread> urgentWorkerThreads;
+
+  /**
    * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing
    * resource handling rather than observing in a #join is unexpected).
    * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
@@ -218,6 +223,7 @@ public class ProcedureExecutor<TEnvironment> {
 
   private int corePoolSize;
   private int maxPoolSize;
+  private int urgentPoolSize;
 
   private volatile long keepAliveTime;
 
@@ -558,12 +564,30 @@ public class ProcedureExecutor<TEnvironment> {
    *          is found on replay. otherwise false.
    */
   public void init(int numThreads, boolean abortOnCorruption) throws IOException {
+    init(numThreads, 1, abortOnCorruption);
+  }
+
+  /**
+   * Initialize the procedure executor, but do not start workers. We will start them later.
+   * <p/>
+   * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to recover the lease, and
+   * ensure a single executor, and start the procedure replay to resume and recover the previous
+   * pending and in-progress procedures.
+   * @param numThreads number of threads available for procedure execution.
+   * @param urgentNumThreads number of threads available for urgent procedure execution.
+   * @param abortOnCorruption true if you want to abort your service in case a corrupted procedure
+   *          is found on replay. otherwise false.
+   */
+  public void init(int numThreads, int urgentNumThreads,
+      boolean abortOnCorruption) throws IOException {
     // We have numThreads executor + one timer thread used for timing out
     // procedures and triggering periodic procedures.
     this.corePoolSize = numThreads;
     this.maxPoolSize = 10 * numThreads;
-    LOG.info("Starting {} core workers (bigger of cpus/4 or 16) with max (burst) worker count={}",
-        corePoolSize, maxPoolSize);
+    this.urgentPoolSize = urgentNumThreads;
+    LOG.info("Starting {} core workers (bigger of cpus/4 or 16) with max (burst) worker "
+            + "count={}, start {} urgent thread(s)",
+        corePoolSize, maxPoolSize, urgentPoolSize);
 
     this.threadGroup = new ThreadGroup("PEWorkerGroup");
     this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup);
@@ -571,9 +595,14 @@ public class ProcedureExecutor<TEnvironment> {
     // Create the workers
     workerId.set(0);
     workerThreads = new CopyOnWriteArrayList<>();
+    urgentWorkerThreads = new ArrayList<>();
     for (int i = 0; i < corePoolSize; ++i) {
       workerThreads.add(new WorkerThread(threadGroup));
     }
+    for (int i = 0; i < urgentNumThreads; ++i) {
+      urgentWorkerThreads
+          .add(new WorkerThread(threadGroup, "UrgentPEWorker-", true));
+    }
 
     long st, et;
 
@@ -608,12 +637,17 @@ public class ProcedureExecutor<TEnvironment> {
       return;
     }
     // Start the executors. Here we must have the lastProcId set.
-    LOG.trace("Start workers {}", workerThreads.size());
+    LOG.debug("Start workers {}, urgent workers", workerThreads.size(),
+        urgentWorkerThreads.size());
     timeoutExecutor.start();
     for (WorkerThread worker: workerThreads) {
       worker.start();
     }
 
+    for (WorkerThread worker: urgentWorkerThreads) {
+      worker.start();
+    }
+
     // Internal chores
     timeoutExecutor.add(new WorkerMonitor());
 
@@ -663,6 +697,11 @@ public class ProcedureExecutor<TEnvironment> {
       worker.awaitTermination();
     }
 
+    // stop the worker threads
+    for (WorkerThread worker: urgentWorkerThreads) {
+      worker.awaitTermination();
+    }
+
     // Destroy the Thread Group for the executors
     // TODO: Fix. #join is not place to destroy resources.
     try {
@@ -700,7 +739,7 @@ public class ProcedureExecutor<TEnvironment> {
    * @return the current number of worker threads.
    */
   public int getWorkerThreadCount() {
-    return workerThreads.size();
+    return workerThreads.size() + urgentWorkerThreads.size();
   }
 
   /**
@@ -710,6 +749,10 @@ public class ProcedureExecutor<TEnvironment> {
     return corePoolSize;
   }
 
+  public int getUrgentPoolSize() {
+    return urgentPoolSize;
+  }
+
   public int getActiveExecutorCount() {
     return activeExecutorCount.get();
   }
@@ -1949,13 +1992,18 @@ public class ProcedureExecutor<TEnvironment> {
   private class WorkerThread extends StoppableThread {
     private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE);
     private volatile Procedure<TEnvironment> activeProcedure;
-
+    private boolean onlyPollUrgent = false;
     public WorkerThread(ThreadGroup group) {
       this(group, "PEWorker-");
     }
 
     protected WorkerThread(ThreadGroup group, String prefix) {
+      this(group, prefix, false);
+    }
+
+    protected WorkerThread(ThreadGroup group, String prefix, boolean onlyPollUrgent) {
       super(group, prefix + workerId.incrementAndGet());
+      this.onlyPollUrgent = onlyPollUrgent;
       setDaemon(true);
     }
 
@@ -2000,7 +2048,11 @@ public class ProcedureExecutor<TEnvironment> {
       } finally {
         LOG.trace("Worker terminated.");
       }
-      workerThreads.remove(this);
+      if (onlyPollUrgent) {
+        urgentWorkerThreads.remove(this);
+      } else {
+        workerThreads.remove(this);
+      }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
index 9489f52..2d16849 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
@@ -104,6 +104,13 @@ public interface ProcedureScheduler {
 
   /**
    * Fetch one Procedure from the queue
+   * @param onlyUrgent Only poll the urgent procedure to execute
+   * @return a Procedure
+   */
+  Procedure poll(boolean onlyUrgent);
+
+  /**
+   * Fetch one Procedure from the queue
    * @param timeout how long to wait before giving up, in units of unit
    * @param unit a TimeUnit determining how to interpret the timeout parameter
    * @return the Procedure to execute, or null if nothing present.
@@ -111,6 +118,16 @@ public interface ProcedureScheduler {
   Procedure poll(long timeout, TimeUnit unit);
 
   /**
+   * Fetch one Procedure from the queue
+   * @param onlyUrgent Only poll the urgent procedure to execute
+   * @param timeout how long to wait before giving up, in units of unit
+   * @param unit a TimeUnit determining how to interpret the timeout parameter
+   * @return a Procedure
+   */
+  Procedure poll(boolean onlyUrgent, long timeout, TimeUnit unit);
+
+
+  /**
    * List lock queues.
    * @return the locks
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java
index feab8be..3df0e20 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java
@@ -43,7 +43,7 @@ public class SimpleProcedureScheduler extends AbstractProcedureScheduler {
   }
 
   @Override
-  protected Procedure dequeue() {
+  protected Procedure dequeue(boolean onlyUrgent) {
     return runnables.poll();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
index e82fc7d..1709f63 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
@@ -87,7 +87,12 @@ public class ProcedureTestingUtility {
 
   public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads,
       boolean abortOnCorruption, boolean startWorkers) throws IOException {
-    procExecutor.init(numThreads, abortOnCorruption);
+    initAndStartWorkers(procExecutor, numThreads, 1, abortOnCorruption, startWorkers);
+  }
+
+  public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads,
+      int numUrgentThreads, boolean abortOnCorruption, boolean startWorkers) throws IOException {
+    procExecutor.init(numThreads, numUrgentThreads, abortOnCorruption);
     if (startWorkers) {
       procExecutor.startWorkers();
     }
@@ -109,6 +114,7 @@ public class ProcedureTestingUtility {
     final ProcedureStore procStore = procExecutor.getStore();
     final int storeThreads = procExecutor.getCorePoolSize();
     final int execThreads = procExecutor.getCorePoolSize();
+    final int urgentThreads = procExecutor.getUrgentPoolSize();
 
     final ProcedureExecutor.Testing testing = procExecutor.testing;
     if (avoidTestKillDuringRestart) {
@@ -130,7 +136,7 @@ public class ProcedureTestingUtility {
     // re-start
     LOG.info("RESTART - Start");
     procStore.start(storeThreads);
-    initAndStartWorkers(procExecutor, execThreads, failOnCorrupted, startWorkers);
+    initAndStartWorkers(procExecutor, execThreads, urgentThreads, failOnCorrupted, startWorkers);
     if (startAction != null) {
       startAction.call();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java
index 4f3c443..b837e82 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java
@@ -69,7 +69,7 @@ public class TestChildProcedures {
     procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore);
     procExecutor.testing = new ProcedureExecutor.Testing();
     procStore.start(PROCEDURE_EXECUTOR_SLOTS);
-    ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
+    ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, 0, false, true);
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java
index 7f130ca..2fe19f3 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java
@@ -71,7 +71,7 @@ public class TestProcedureExecutor {
 
   private void createNewExecutor(final Configuration conf, final int numThreads) throws Exception {
     procExecutor = new ProcedureExecutor<>(conf, procEnv, procStore);
-    ProcedureTestingUtility.initAndStartWorkers(procExecutor, numThreads, true);
+    ProcedureTestingUtility.initAndStartWorkers(procExecutor, numThreads, 0, false, true);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java
index c1c9187..ec9d27f 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java
@@ -62,7 +62,8 @@ public class TestProcedureSuspended {
     procStore = new NoopProcedureStore();
     procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore);
     procStore.start(PROCEDURE_EXECUTOR_SLOTS);
-    ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
+    ProcedureTestingUtility
+        .initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, 0, false, true);
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java
index b5137b0..8a2e296 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java
@@ -74,7 +74,7 @@ public class TestYieldProcedures {
     procExecutor =
       new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore, procRunnables);
     procStore.start(PROCEDURE_EXECUTOR_SLOTS);
-    ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
+    ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, 0, false, true);
   }
 
   @After
@@ -379,6 +379,7 @@ public class TestYieldProcedures {
 
     @Override
     public Procedure poll() {
+      LOG.error("polled()");
       pollCalls++;
       return super.poll();
     }
@@ -386,6 +387,7 @@ public class TestYieldProcedures {
     @Override
     public Procedure poll(long timeout, TimeUnit unit) {
       pollCalls++;
+      LOG.error("polled(long timeout, TimeUnit unit)");
       return super.poll(timeout, unit);
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 5bab8cc..288b33f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -1396,6 +1396,9 @@ public class HMaster extends HRegionServer implements MasterServices {
     int cpus = Runtime.getRuntime().availableProcessors();
     final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, Math.max(
       (cpus > 0 ? cpus / 4 : 0), MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
+    final int urgentWorkers = conf
+        .getInt(MasterProcedureConstants.MASTER_URGENT_PROCEDURE_THREADS,
+            MasterProcedureConstants.DEFAULT_MASTER_URGENT_PROCEDURE_THREADS);
     final boolean abortOnCorruption =
       conf.getBoolean(MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
         MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
@@ -1403,7 +1406,7 @@ public class HMaster extends HRegionServer implements MasterServices {
     // Just initialize it but do not start the workers, we will start the workers later by calling
     // startProcedureExecutor. See the javadoc for finishActiveMasterInitialization for more
     // details.
-    procedureExecutor.init(numThreads, abortOnCorruption);
+    procedureExecutor.init(numThreads, urgentWorkers, abortOnCorruption);
     procEnv.getRemoteDispatcher().start();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java
index 495fab6..728ad43 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java
@@ -28,6 +28,13 @@ public final class MasterProcedureConstants {
   public static final String MASTER_PROCEDURE_THREADS = "hbase.master.procedure.threads";
   public static final int DEFAULT_MIN_MASTER_PROCEDURE_THREADS = 16;
 
+  /** Number of threads used by the procedure executor for urgent proceudres
+   *  For now, only meta table procedure is urgent
+   */
+  public static final String MASTER_URGENT_PROCEDURE_THREADS =
+      "hbase.master.urgent.procedure.threads";
+  public static final int DEFAULT_MASTER_URGENT_PROCEDURE_THREADS = 1;
+
   /**
    * Procedure replay sanity check. In case a WAL is missing or unreadable we
    * may lose information about pending/running procedures.

http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
index 7bab7b3..b060763 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
@@ -124,7 +124,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
 
   @Override
   protected void enqueue(final Procedure proc, final boolean addFront) {
-    if (isMetaProcedure(proc)) {
+    if (isMetaProcedure(proc) ||
+        (isTableProcedure(proc) && getTableName(proc).equals(TableName.META_TABLE_NAME))) {
       doAdd(metaRunQueue, getMetaQueue(), proc, addFront);
     } else if (isTableProcedure(proc)) {
       doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
@@ -162,9 +163,12 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
   }
 
   @Override
-  protected Procedure dequeue() {
+  protected Procedure dequeue(boolean onlyUrgent) {
     // meta procedure is always the first priority
     Procedure<?> pollResult = doPoll(metaRunQueue);
+    if (onlyUrgent) {
+      return pollResult;
+    }
     // For now, let server handling have precedence over table handling; presumption is that it
     // is more important handling crashed servers than it is running the
     // enabling/disabling tables, etc.

http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestSplitTableRegionProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestSplitTableRegionProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestSplitTableRegionProcedure.java
index 1922848..7e37e44 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestSplitTableRegionProcedure.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestSplitTableRegionProcedure.java
@@ -97,6 +97,8 @@ public class TestSplitTableRegionProcedure {
 
   private static void setupConf(Configuration conf) {
     conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
+    //testRecoveryAndDoubleExecution requires only one worker
+    conf.setInt(MasterProcedureConstants.MASTER_URGENT_PROCEDURE_THREADS, 0);
     conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 0);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedurePriority.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedurePriority.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedurePriority.java
index 32fb173..9008dcc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedurePriority.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedurePriority.java
@@ -106,6 +106,7 @@ public class TestProcedurePriority {
   public static void setUp() throws Exception {
     UTIL.getConfiguration().setLong(ProcedureExecutor.WORKER_KEEP_ALIVE_TIME_CONF_KEY, 5000);
     UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 4);
+    UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_URGENT_PROCEDURE_THREADS, 0);
     UTIL.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, MyCP.class.getName());
     UTIL.startMiniCluster(3);
     CORE_POOL_SIZE =

http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java
index 9f7fafe..cef60f3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java
@@ -60,6 +60,8 @@ public class TestServerCrashProcedure {
 
   private void setupConf(Configuration conf) {
     conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
+    //testxxxDoubleExecution requires only one worker
+    conf.setInt(MasterProcedureConstants.MASTER_URGENT_PROCEDURE_THREADS, 0);
     conf.set("hbase.balancer.tablesOnMaster", "none");
     conf.setInt("hbase.client.retries.number", 3);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestTableDDLProcedureBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestTableDDLProcedureBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestTableDDLProcedureBase.java
index f7cf640..9680627 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestTableDDLProcedureBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestTableDDLProcedureBase.java
@@ -39,6 +39,8 @@ public abstract class TestTableDDLProcedureBase {
 
   private static void setupConf(Configuration conf) {
     conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
+    //testxxxDoubleExecution requires only one worker
+    conf.setInt(MasterProcedureConstants.MASTER_URGENT_PROCEDURE_THREADS, 0);
   }
 
   @BeforeClass

http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestUrgentProcedureWorker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestUrgentProcedureWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestUrgentProcedureWorker.java
new file mode 100644
index 0000000..c7801e4
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestUrgentProcedureWorker.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUTKey WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
+import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Category({MasterTests.class, SmallTests.class})
+public class TestUrgentProcedureWorker {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestUrgentProcedureWorker.class);
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(TestUrgentProcedureWorker.class);
+  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
+  private static final CountDownLatch metaFinished = new CountDownLatch(1);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static final TableName TABLE_NAME = TableName.valueOf("TestUrgentProcedureWorker");
+
+  private static WALProcedureStore procStore;
+
+  private static ProcedureExecutor<TestEnv> procExec;
+
+  private static final class TestEnv {
+    private final MasterProcedureScheduler scheduler;
+
+    public TestEnv(MasterProcedureScheduler scheduler) {
+      this.scheduler = scheduler;
+    }
+
+    public MasterProcedureScheduler getScheduler() {
+      return scheduler;
+    }
+  }
+
+  public static class WaitingMetaProcedure extends ProcedureTestingUtility.NoopProcedure<TestEnv>
+      implements TableProcedureInterface {
+
+
+    @Override
+    protected Procedure<TestEnv>[] execute(TestEnv env)
+        throws ProcedureYieldException, ProcedureSuspendedException,
+        InterruptedException {
+      metaFinished.await();
+      return null;
+    }
+
+    @Override
+    protected Procedure.LockState acquireLock(TestEnv env) {
+      if (env.getScheduler().waitTableExclusiveLock(this, getTableName())) {
+        return LockState.LOCK_EVENT_WAIT;
+      }
+      return LockState.LOCK_ACQUIRED;
+    }
+
+    @Override
+    protected void releaseLock(TestEnv env) {
+      env.getScheduler().wakeTableExclusiveLock(this, getTableName());
+    }
+
+    @Override
+    protected boolean holdLock(TestEnv env) {
+      return true;
+    }
+
+    @Override
+    public TableName getTableName() {
+      return TABLE_NAME;
+    }
+
+    @Override
+    public TableOperationType getTableOperationType() {
+      return TableOperationType.EDIT;
+    }
+  }
+
+  public static class MetaProcedure extends ProcedureTestingUtility.NoopProcedure<TestEnv>
+      implements TableProcedureInterface {
+
+
+    @Override
+    protected Procedure<TestEnv>[] execute(TestEnv env)
+        throws ProcedureYieldException, ProcedureSuspendedException,
+        InterruptedException {
+      metaFinished.countDown();
+      return null;
+    }
+
+    @Override
+    protected Procedure.LockState acquireLock(TestEnv env) {
+      if (env.getScheduler().waitTableExclusiveLock(this, getTableName())) {
+        return LockState.LOCK_EVENT_WAIT;
+      }
+      return LockState.LOCK_ACQUIRED;
+    }
+
+    @Override
+    protected void releaseLock(TestEnv env) {
+      env.getScheduler().wakeTableExclusiveLock(this, getTableName());
+    }
+
+    @Override
+    protected boolean holdLock(TestEnv env) {
+      return true;
+    }
+
+    @Override
+    public TableName getTableName() {
+      return TableName.META_TABLE_NAME;
+    }
+
+    @Override
+    public TableOperationType getTableOperationType() {
+      return TableOperationType.EDIT;
+    }
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws IOException {
+    UTIL.cleanupTestDir();
+  }
+
+  @BeforeClass
+  public static void setUp() throws IOException {
+    UTIL.getConfiguration().setInt("hbase.procedure.worker.stuck.threshold.msec", 6000000);
+    procStore = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(),
+        UTIL.getDataTestDir("TestUrgentProcedureWorker"));
+    procStore.start(1);
+    MasterProcedureScheduler scheduler = new MasterProcedureScheduler(pid -> null);
+    procExec = new ProcedureExecutor<>(UTIL.getConfiguration(), new TestEnv(scheduler), procStore,
+        scheduler);
+    procExec.init(1, false);
+    procExec.startWorkers();
+  }
+
+  @Test
+  public void test() throws Exception {
+    WaitingMetaProcedure waitingMetaProcedure = new WaitingMetaProcedure();
+    long waitProc = procExec.submitProcedure(waitingMetaProcedure);
+    MetaProcedure metaProcedure = new MetaProcedure();
+    long metaProc = procExec.submitProcedure(metaProcedure);
+    UTIL.waitFor(5000, () -> procExec.isFinished(waitProc));
+
+  }
+
+
+
+}