You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2016/09/26 15:43:31 UTC

[1/2] hbase git commit: HBASE-16587 Procedure v2 - Cleanup suspended proc execution

Repository: hbase
Updated Branches:
  refs/heads/master 5f7e642fe -> 8da0500e7


HBASE-16587 Procedure v2 - Cleanup suspended proc execution


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e01e05cc
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e01e05cc
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e01e05cc

Branch: refs/heads/master
Commit: e01e05cc0ef5255c549d3c7bb87be38d34f13d94
Parents: 5f7e642
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Mon Sep 26 08:08:44 2016 -0700
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Mon Sep 26 08:08:44 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/procedure2/Procedure.java      |  59 +++-
 .../hbase/procedure2/ProcedureExecutor.java     |  61 ++++-
 .../procedure2/RemoteProcedureException.java    |  21 +-
 .../hbase/procedure2/StateMachineProcedure.java |   4 +-
 .../procedure2/TestProcedureSuspended.java      | 270 +++++++++++++++++++
 .../procedure/MasterProcedureScheduler.java     |   4 +-
 .../master/procedure/ProcedurePrepareLatch.java |   2 +-
 .../master/procedure/ProcedureSyncWait.java     |   3 +-
 .../procedure/TestMasterProcedureEvents.java    |  90 +++++++
 9 files changed, 489 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e01e05cc/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
index b9145e7..69c36e6 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
@@ -164,6 +164,23 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
   }
 
   /**
+   * Used to keep the procedure lock even when the procedure is yielding or suspended.
+   * @return true if the procedure should hold on the lock until completionCleanup()
+   */
+  protected boolean holdLock(final TEnvironment env) {
+    return false;
+  }
+
+  /**
+   * This is used in conjuction with holdLock(). If holdLock() is true
+   * the procedure executor will not call acquireLock() if hasLock() is true.
+   * @return true if the procedure has the lock, false otherwise.
+   */
+  protected boolean hasLock(final TEnvironment env) {
+    return false;
+  }
+
+  /**
    * Called when the procedure is loaded for replay.
    * The procedure implementor may use this method to perform some quick
    * operation before replay.
@@ -174,6 +191,14 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
   }
 
   /**
+   * Called when the procedure is ready to be added to the queue after
+   * the loading/replay operation.
+   */
+  protected void afterReplay(final TEnvironment env) {
+    // no-op
+  }
+
+  /**
    * Called when the procedure is marked as completed (success or rollback).
    * The procedure implementor may use this method to cleanup in-memory states.
    * This operation will not be retried on failure.
@@ -339,6 +364,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
     return state == ProcedureState.RUNNABLE;
   }
 
+  public synchronized boolean isInitializing() {
+    return state == ProcedureState.INITIALIZING;
+  }
+
   /**
    * @return true if the procedure has failed.
    *         true may mean failed but not yet rolledback or failed and rolledback.
@@ -479,8 +508,12 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
     setFailure(source, new ProcedureAbortedException(msg));
   }
 
-  @InterfaceAudience.Private
-  protected synchronized boolean setTimeoutFailure() {
+  /**
+   * Called by the ProcedureExecutor when the timeout set by setTimeout() is expired.
+   * @return true to let the framework handle the timeout as abort,
+   *         false in case the procedure handled the timeout itself.
+   */
+  protected synchronized boolean setTimeoutFailure(final TEnvironment env) {
     if (state == ProcedureState.WAITING_TIMEOUT) {
       long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate;
       setFailure("ProcedureExecutor", new TimeoutIOException(
@@ -549,6 +582,24 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
   }
 
   /**
+   * Internal method called by the ProcedureExecutor that starts the
+   * user-level code acquireLock().
+   */
+  @InterfaceAudience.Private
+  protected boolean doAcquireLock(final TEnvironment env) {
+    return acquireLock(env);
+  }
+
+  /**
+   * Internal method called by the ProcedureExecutor that starts the
+   * user-level code releaseLock().
+   */
+  @InterfaceAudience.Private
+  protected void doReleaseLock(final TEnvironment env) {
+    releaseLock(env);
+  }
+
+  /**
    * Called on store load to initialize the Procedure internals after
    * the creation/deserialization.
    */
@@ -601,6 +652,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
     return childrenLatch > 0;
   }
 
+  protected synchronized int getChildrenLatch() {
+    return childrenLatch;
+  }
+
   /**
    * Called by the RootProcedureState on procedure execution.
    * Each procedure store its stack-index positions.

http://git-wip-us.apache.org/repos/asf/hbase/blob/e01e05cc/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 5066fb4..c2838ba 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -438,6 +438,7 @@ public class ProcedureExecutor<TEnvironment> {
       // some procedure may be started way before this stuff.
       for (int i = runnableList.size() - 1; i >= 0; --i) {
         Procedure proc = runnableList.get(i);
+        proc.afterReplay(getEnvironment());
         if (!proc.hasParent()) {
           sendProcedureLoadedNotification(proc.getProcId());
         }
@@ -857,9 +858,9 @@ public class ProcedureExecutor<TEnvironment> {
 
       // Execute the procedure
       assert proc.getState() == ProcedureState.RUNNABLE : proc;
-      if (proc.acquireLock(getEnvironment())) {
+      if (acquireLock(proc)) {
         execProcedure(procStack, proc);
-        proc.releaseLock(getEnvironment());
+        releaseLock(proc, false);
       } else {
         runnables.yield(proc);
       }
@@ -879,12 +880,34 @@ public class ProcedureExecutor<TEnvironment> {
         // Finalize the procedure state
         if (proc.getProcId() == rootProcId) {
           procedureFinished(proc);
+        } else {
+          execCompletionCleanup(proc);
         }
         break;
       }
     } while (procStack.isFailed());
   }
 
+  private boolean acquireLock(final Procedure proc) {
+    final TEnvironment env = getEnvironment();
+    // hasLock() is used in conjunction with holdLock().
+    // This allows us to not rewrite or carry around the hasLock() flag
+    // for every procedure. the hasLock() have meaning only if holdLock() is true.
+    if (proc.holdLock(env) && proc.hasLock(env)) {
+      return true;
+    }
+    return proc.doAcquireLock(env);
+  }
+
+  private void releaseLock(final Procedure proc, final boolean force) {
+    final TEnvironment env = getEnvironment();
+    // for how the framework works, we know that we will always have the lock
+    // when we call releaseLock(), so we can avoid calling proc.hasLock()
+    if (force || !proc.holdLock(env)) {
+      proc.doReleaseLock(env);
+    }
+  }
+
   private void timeoutLoop() {
     while (isRunning()) {
       Procedure proc = waitingTimeout.poll();
@@ -921,15 +944,17 @@ public class ProcedureExecutor<TEnvironment> {
         continue;
       }
 
-      // The procedure received an "abort-timeout", call abort() and
-      // add the procedure back in the queue for rollback.
-      if (proc.setTimeoutFailure()) {
+      // The procedure received a timeout. if the procedure itself does not handle it,
+      // call abort() and add the procedure back in the queue for rollback.
+      if (proc.setTimeoutFailure(getEnvironment())) {
         long rootProcId = Procedure.getRootProcedureId(procedures, proc);
         RootProcedureState procStack = rollbackStack.get(rootProcId);
         procStack.abort();
         store.update(proc);
         runnables.addFront(proc);
         continue;
+      } else if (proc.getState() == ProcedureState.WAITING_TIMEOUT) {
+        waitingTimeout.add(proc);
       }
     }
   }
@@ -940,7 +965,7 @@ public class ProcedureExecutor<TEnvironment> {
    * finished to user, and the result will be the fatal exception.
    */
   private boolean executeRollback(final long rootProcId, final RootProcedureState procStack) {
-    Procedure rootProc = procedures.get(rootProcId);
+    final Procedure rootProc = procedures.get(rootProcId);
     RemoteProcedureException exception = rootProc.getException();
     if (exception == null) {
       exception = procStack.getException();
@@ -948,7 +973,7 @@ public class ProcedureExecutor<TEnvironment> {
       store.update(rootProc);
     }
 
-    List<Procedure> subprocStack = procStack.getSubproceduresStack();
+    final List<Procedure> subprocStack = procStack.getSubproceduresStack();
     assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc;
 
     int stackTail = subprocStack.size();
@@ -956,7 +981,7 @@ public class ProcedureExecutor<TEnvironment> {
     while (stackTail --> 0) {
       final Procedure proc = subprocStack.get(stackTail);
 
-      if (!reuseLock && !proc.acquireLock(getEnvironment())) {
+      if (!reuseLock && !acquireLock(proc)) {
         // can't take a lock on the procedure, add the root-proc back on the
         // queue waiting for the lock availability
         return false;
@@ -970,7 +995,7 @@ public class ProcedureExecutor<TEnvironment> {
       // we can avoid to lock/unlock each step
       reuseLock = stackTail > 0 && (subprocStack.get(stackTail - 1) == proc) && !abortRollback;
       if (!reuseLock) {
-        proc.releaseLock(getEnvironment());
+        releaseLock(proc, false);
       }
 
       // allows to kill the executor before something is stored to the wal.
@@ -985,6 +1010,10 @@ public class ProcedureExecutor<TEnvironment> {
       if (proc.isYieldAfterExecutionStep(getEnvironment())) {
         return false;
       }
+
+      if (proc != rootProc) {
+        execCompletionCleanup(proc);
+      }
     }
 
     // Finalize the procedure state
@@ -1302,14 +1331,22 @@ public class ProcedureExecutor<TEnvironment> {
     return Procedure.getRootProcedureId(procedures, proc);
   }
 
-  private void procedureFinished(final Procedure proc) {
-    // call the procedure completion cleanup handler
+  private void execCompletionCleanup(final Procedure proc) {
+    final TEnvironment env = getEnvironment();
+    if (proc.holdLock(env) && proc.hasLock(env)) {
+      releaseLock(proc, true);
+    }
     try {
-      proc.completionCleanup(getEnvironment());
+      proc.completionCleanup(env);
     } catch (Throwable e) {
       // Catch NullPointerExceptions or similar errors...
       LOG.error("CODE-BUG: uncatched runtime exception for procedure: " + proc, e);
     }
+  }
+
+  private void procedureFinished(final Procedure proc) {
+    // call the procedure completion cleanup handler
+    execCompletionCleanup(proc);
 
     // update the executor internal state maps
     ProcedureInfo procInfo = Procedure.createProcedureInfo(proc, proc.getNonceKey());

http://git-wip-us.apache.org/repos/asf/hbase/blob/e01e05cc/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java
index 456f83d..402ddfc 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java
@@ -65,14 +65,23 @@ public class RemoteProcedureException extends ProcedureException {
     return source;
   }
 
-  public IOException unwrapRemoteException() {
-    if (getCause() instanceof RemoteException) {
-      return ((RemoteException)getCause()).unwrapRemoteException();
+  public Exception unwrapRemoteException() {
+    final Throwable cause = getCause();
+    if (cause instanceof RemoteException) {
+      return ((RemoteException)cause).unwrapRemoteException();
     }
-    if (getCause() instanceof IOException) {
-      return (IOException)getCause();
+    if (cause instanceof Exception) {
+      return (Exception)cause;
     }
-    return new IOException(getCause());
+    return new Exception(cause);
+  }
+
+  public IOException unwrapRemoteIOException() {
+    final Exception cause = unwrapRemoteException();
+    if (cause instanceof IOException) {
+      return (IOException)cause;
+    }
+    return new IOException(cause);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/e01e05cc/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
index 10467fe..a363c2e 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
@@ -131,7 +131,9 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
       subProcList = new ArrayList<Procedure>(subProcedure.length);
     }
     for (int i = 0; i < subProcedure.length; ++i) {
-      subProcList.add(subProcedure[i]);
+      Procedure proc = subProcedure[i];
+      if (!proc.hasOwner()) proc.setOwner(getOwner());
+      subProcList.add(proc);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e01e05cc/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java
new file mode 100644
index 0000000..eb72939
--- /dev/null
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.util.Threads;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category({MasterTests.class, SmallTests.class})
+public class TestProcedureSuspended {
+  private static final Log LOG = LogFactory.getLog(TestProcedureSuspended.class);
+
+  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
+  private static final Procedure NULL_PROC = null;
+
+  private ProcedureExecutor<TestProcEnv> procExecutor;
+  private ProcedureStore procStore;
+
+  private HBaseCommonTestingUtility htu;
+
+  @Before
+  public void setUp() throws IOException {
+    htu = new HBaseCommonTestingUtility();
+
+    procStore = new NoopProcedureStore();
+    procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(), procStore);
+    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
+    procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    procExecutor.stop();
+    procStore.stop(false);
+  }
+
+  @Test(timeout=10000)
+  public void testSuspendWhileHoldingLocks() {
+    final AtomicBoolean lockA = new AtomicBoolean(false);
+    final AtomicBoolean lockB = new AtomicBoolean(false);
+
+    final TestLockProcedure p1keyA = new TestLockProcedure(lockA, "keyA", false, true);
+    final TestLockProcedure p2keyA = new TestLockProcedure(lockA, "keyA", false, true);
+    final TestLockProcedure p3keyB = new TestLockProcedure(lockB, "keyB", false, true);
+
+    procExecutor.submitProcedure(p1keyA);
+    procExecutor.submitProcedure(p2keyA);
+    procExecutor.submitProcedure(p3keyB);
+
+    // first run p1, p3 are able to run p2 is blocked by p1
+    waitAndAssertTimestamp(p1keyA, 1, 1);
+    waitAndAssertTimestamp(p2keyA, 0, -1);
+    waitAndAssertTimestamp(p3keyB, 1, 2);
+    assertEquals(true, lockA.get());
+    assertEquals(true, lockB.get());
+
+    // release p3
+    p3keyB.setThrowSuspend(false);
+    procExecutor.getRunnableSet().addFront(p3keyB);
+    waitAndAssertTimestamp(p1keyA, 1, 1);
+    waitAndAssertTimestamp(p2keyA, 0, -1);
+    waitAndAssertTimestamp(p3keyB, 2, 3);
+    assertEquals(true, lockA.get());
+
+    // wait until p3 is fully completed
+    ProcedureTestingUtility.waitProcedure(procExecutor, p3keyB);
+    assertEquals(false, lockB.get());
+
+    // rollback p2 and wait until is fully completed
+    p1keyA.setTriggerRollback(true);
+    procExecutor.getRunnableSet().addFront(p1keyA);
+    ProcedureTestingUtility.waitProcedure(procExecutor, p1keyA);
+
+    // p2 should start and suspend
+    waitAndAssertTimestamp(p1keyA, 4, 60000);
+    waitAndAssertTimestamp(p2keyA, 1, 7);
+    waitAndAssertTimestamp(p3keyB, 2, 3);
+    assertEquals(true, lockA.get());
+
+    // wait until p2 is fully completed
+    p2keyA.setThrowSuspend(false);
+    procExecutor.getRunnableSet().addFront(p2keyA);
+    ProcedureTestingUtility.waitProcedure(procExecutor, p2keyA);
+    waitAndAssertTimestamp(p1keyA, 4, 60000);
+    waitAndAssertTimestamp(p2keyA, 2, 8);
+    waitAndAssertTimestamp(p3keyB, 2, 3);
+    assertEquals(false, lockA.get());
+    assertEquals(false, lockB.get());
+  }
+
+  @Test(timeout=10000)
+  public void testYieldWhileHoldingLocks() {
+    final AtomicBoolean lock = new AtomicBoolean(false);
+
+    final TestLockProcedure p1 = new TestLockProcedure(lock, "key", true, false);
+    final TestLockProcedure p2 = new TestLockProcedure(lock, "key", true, false);
+
+    procExecutor.submitProcedure(p1);
+    procExecutor.submitProcedure(p2);
+
+    // try to execute a bunch of yield on p1, p2 should be blocked
+    while (p1.getTimestamps().size() < 100) Threads.sleep(10);
+    assertEquals(0, p2.getTimestamps().size());
+
+    // wait until p1 is completed
+    p1.setThrowYield(false);
+    ProcedureTestingUtility.waitProcedure(procExecutor, p1);
+
+    // try to execute a bunch of yield on p2
+    while (p2.getTimestamps().size() < 100) Threads.sleep(10);
+    assertEquals(p1.getTimestamps().get(p1.getTimestamps().size() - 1).longValue() + 1,
+      p2.getTimestamps().get(0).longValue());
+
+    // wait until p2 is completed
+    p1.setThrowYield(false);
+    ProcedureTestingUtility.waitProcedure(procExecutor, p1);
+  }
+
+  private void waitAndAssertTimestamp(TestLockProcedure proc, int size, int lastTs) {
+    final ArrayList<Long> timestamps = proc.getTimestamps();
+    while (timestamps.size() < size) Threads.sleep(10);
+    LOG.info(proc + " -> " + timestamps);
+    assertEquals(size, timestamps.size());
+    if (size > 0) {
+      assertEquals(lastTs, timestamps.get(timestamps.size() - 1).longValue());
+    }
+  }
+
+  public static class TestLockProcedure extends Procedure<TestProcEnv> {
+    private final ArrayList<Long> timestamps = new ArrayList<Long>();
+    private final String key;
+
+    private boolean triggerRollback = false;
+    private boolean throwSuspend = false;
+    private boolean throwYield = false;
+    private AtomicBoolean lock = null;
+    private boolean hasLock = false;
+
+    public TestLockProcedure(final AtomicBoolean lock, final String key,
+        final boolean throwYield, final boolean throwSuspend) {
+      this.lock = lock;
+      this.key = key;
+      this.throwYield = throwYield;
+      this.throwSuspend = throwSuspend;
+    }
+
+    public void setThrowYield(final boolean throwYield) {
+      this.throwYield = throwYield;
+    }
+
+    public void setThrowSuspend(final boolean throwSuspend) {
+      this.throwSuspend = throwSuspend;
+    }
+
+    public void setTriggerRollback(final boolean triggerRollback) {
+      this.triggerRollback = triggerRollback;
+    }
+
+    @Override
+    protected Procedure[] execute(final TestProcEnv env)
+        throws ProcedureYieldException, ProcedureSuspendedException {
+      LOG.info("EXECUTE " + this + " suspend " + (lock != null));
+      timestamps.add(env.nextTimestamp());
+      if (triggerRollback) {
+        setFailure(getClass().getSimpleName(), new Exception("injected failure"));
+      } else if (throwYield) {
+        throw new ProcedureYieldException();
+      } else if (throwSuspend) {
+        throw new ProcedureSuspendedException();
+      }
+      return null;
+    }
+
+    @Override
+    protected void rollback(final TestProcEnv env) {
+      LOG.info("ROLLBACK " + this);
+      timestamps.add(env.nextTimestamp() * 10000);
+    }
+
+    @Override
+    protected boolean acquireLock(final TestProcEnv env) {
+      if ((hasLock = lock.compareAndSet(false, true))) {
+        LOG.info("ACQUIRE LOCK " + this + " " + (hasLock));
+      }
+      return hasLock;
+    }
+
+    @Override
+    protected void releaseLock(final TestProcEnv env) {
+      LOG.info("RELEASE LOCK " + this + " " + hasLock);
+      lock.set(false);
+      hasLock = false;
+    }
+
+    @Override
+    protected boolean holdLock(final TestProcEnv env) {
+      return true;
+    }
+
+    @Override
+    protected boolean hasLock(final TestProcEnv env) {
+      return hasLock;
+    }
+
+    public ArrayList<Long> getTimestamps() {
+      return timestamps;
+    }
+
+    @Override
+    protected void toStringClassDetails(StringBuilder builder) {
+      builder.append(getClass().getName());
+      builder.append("(" + key + ")");
+    }
+
+    @Override
+    protected boolean abort(TestProcEnv env) { return false; }
+
+    @Override
+    protected void serializeStateData(final OutputStream stream) throws IOException {
+    }
+
+    @Override
+    protected void deserializeStateData(final InputStream stream) throws IOException {
+    }
+  }
+
+  private static class TestProcEnv {
+    public final AtomicLong timestamp = new AtomicLong(0);
+
+    public long nextTimestamp() {
+      return timestamp.incrementAndGet();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/e01e05cc/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
index 3a215d5..548fb00 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
@@ -297,7 +297,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
       TableProcedureInterface iProcTable = (TableProcedureInterface)proc;
       boolean tableDeleted;
       if (proc.hasException()) {
-        IOException procEx = proc.getException().unwrapRemoteException();
+        Exception procEx = proc.getException().unwrapRemoteException();
         if (iProcTable.getTableOperationType() == TableOperationType.CREATE) {
           // create failed because the table already exist
           tableDeleted = !(procEx instanceof TableExistsException);
@@ -1628,4 +1628,4 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
       return Math.max(1, queue.getPriority() * quantum); // TODO
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e01e05cc/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java
index eaeb9ac..eb23960 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java
@@ -82,7 +82,7 @@ public abstract class ProcedurePrepareLatch {
 
     protected void countDown(final Procedure proc) {
       if (proc.hasException()) {
-        exception = proc.getException().unwrapRemoteException();
+        exception = proc.getException().unwrapRemoteIOException();
       }
       latch.countDown();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e01e05cc/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
index cf8fdd4..0e10293 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
@@ -83,7 +83,8 @@ public final class ProcedureSyncWait {
       if (result.isFailed()) {
         // If the procedure fails, we should always have an exception captured. Throw it.
         throw RemoteProcedureException.fromProto(
-          result.getForeignExceptionMessage().getForeignExchangeMessage()).unwrapRemoteException();
+          result.getForeignExceptionMessage().getForeignExchangeMessage())
+            .unwrapRemoteIOException();
       }
       return result.getResult();
     } else {

http://git-wip-us.apache.org/repos/asf/hbase/blob/e01e05cc/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java
index d6b15d9..bf58738 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java
@@ -18,6 +18,11 @@
 
 package org.apache.hadoop.hbase.master.procedure;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -29,9 +34,13 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
+import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -173,4 +182,85 @@ public class TestMasterProcedureEvents {
     }
     return null;
   }
+
+  @Test(timeout=30000)
+  public void testTimeoutEventProcedure() throws Exception {
+    HMaster master = UTIL.getMiniHBaseCluster().getMaster();
+    ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
+    MasterProcedureScheduler procSched = procExec.getEnvironment().getProcedureQueue();
+
+    TestTimeoutEventProcedure proc = new TestTimeoutEventProcedure(1000, 5);
+    procExec.submitProcedure(proc);
+
+    ProcedureTestingUtility.waitProcedure(procExec, proc.getProcId());
+    ProcedureTestingUtility.assertIsAbortException(procExec.getResult(proc.getProcId()));
+  }
+
+  public static class TestTimeoutEventProcedure
+      extends Procedure<MasterProcedureEnv> implements TableProcedureInterface {
+    private final ProcedureEvent event = new ProcedureEvent("timeout-event");
+
+    private final AtomicInteger ntimeouts = new AtomicInteger(0);
+    private int maxTimeouts = 1;
+
+    public TestTimeoutEventProcedure() {}
+
+    public TestTimeoutEventProcedure(final int timeoutMsec, final int maxTimeouts) {
+      this.maxTimeouts = maxTimeouts;
+      setTimeout(timeoutMsec);
+      setOwner("test");
+    }
+
+    @Override
+    protected Procedure[] execute(final MasterProcedureEnv env)
+        throws ProcedureSuspendedException {
+      LOG.info("EXECUTE " + this + " ntimeouts=" + ntimeouts);
+      if (ntimeouts.get() > maxTimeouts) {
+        setAbortFailure("test", "give up after " + ntimeouts.get());
+        return null;
+      }
+
+      env.getProcedureQueue().suspendEvent(event);
+      if (env.getProcedureQueue().waitEvent(event, this)) {
+        setState(ProcedureState.WAITING_TIMEOUT);
+        throw new ProcedureSuspendedException();
+      }
+
+      return null;
+    }
+
+    @Override
+    protected void rollback(final MasterProcedureEnv env) {
+    }
+
+    @Override
+    protected boolean setTimeoutFailure(final MasterProcedureEnv env) {
+      int n = ntimeouts.incrementAndGet();
+      LOG.info("HANDLE TIMEOUT " + this + " ntimeouts=" + n);
+      setState(ProcedureState.RUNNABLE);
+      env.getProcedureQueue().wakeEvent(event);
+      return false;
+    }
+
+    @Override
+    public TableName getTableName() {
+      return TableName.valueOf("testtb");
+    }
+
+    @Override
+    public TableOperationType getTableOperationType() {
+      return TableOperationType.READ;
+    }
+
+    @Override
+    protected boolean abort(MasterProcedureEnv env) { return false; }
+
+    @Override
+    protected void serializeStateData(final OutputStream stream) throws IOException {
+    }
+
+    @Override
+    protected void deserializeStateData(final InputStream stream) throws IOException {
+    }
+  }
 }


[2/2] hbase git commit: HBASE-16695 Procedure v2 - Support for parent holding locks

Posted by mb...@apache.org.
HBASE-16695 Procedure v2 - Support for parent holding locks


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8da0500e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8da0500e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8da0500e

Branch: refs/heads/master
Commit: 8da0500e7d494f45cded7c3cb3423401a73e21fb
Parents: e01e05c
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Mon Sep 26 08:42:48 2016 -0700
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Mon Sep 26 08:42:48 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/procedure2/Procedure.java      |  12 +
 .../hbase/procedure2/ProcedureRunnableSet.java  |   3 +
 .../procedure/MasterProcedureScheduler.java     | 156 +++++---
 .../procedure/TestMasterProcedureScheduler.java | 330 +++++++----------
 ...TestMasterProcedureSchedulerConcurrency.java | 363 +++++++++++++++++++
 5 files changed, 597 insertions(+), 267 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/8da0500e/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
index 69c36e6..6403cfd 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
@@ -908,4 +908,16 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
 
     return proc;
   }
+
+  /**
+   * @param a the first procedure to be compared.
+   * @param b the second procedure to be compared.
+   * @return true if the two procedures have the same parent
+   */
+  public static boolean haveSameParent(final Procedure a, final Procedure b) {
+    if (a.hasParent() && b.hasParent()) {
+      return a.getParentProcId() == b.getParentProcId();
+    }
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8da0500e/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java
index 65df692..64c41ee 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hbase.procedure2;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 
@@ -69,6 +71,7 @@ public interface ProcedureRunnableSet {
    * Returns the number of elements in this collection.
    * @return the number of elements in this collection.
    */
+  @VisibleForTesting
   int size();
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/8da0500e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
index 548fb00..26ecd94 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hbase.master.procedure;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.Arrays;
@@ -44,6 +46,7 @@ import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator;
 import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList;
 import org.apache.hadoop.hbase.util.AvlUtil.AvlLinkedNode;
 import org.apache.hadoop.hbase.util.AvlUtil.AvlTree;
+import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator;
 
 /**
  * ProcedureRunnableSet for the Master Procedures.
@@ -78,7 +81,6 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
 
   private final FairQueue<ServerName> serverRunQueue = new FairQueue<ServerName>();
   private final FairQueue<TableName> tableRunQueue = new FairQueue<TableName>();
-  private int queueSize = 0;
 
   private final ServerQueue[] serverBuckets = new ServerQueue[128];
   private NamespaceQueue namespaceMap = null;
@@ -148,14 +150,14 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
     if (proc.isSuspended()) return;
 
     queue.add(proc, addFront);
-    if (!(queue.isSuspended() || queue.hasExclusiveLock())) {
+    if (!(queue.isSuspended() ||
+        (queue.hasExclusiveLock() && !queue.isLockOwner(proc.getProcId())))) {
       // the queue is not suspended or removed from the fairq (run-queue)
       // because someone has an xlock on it.
       // so, if the queue is not-linked we should add it
       if (queue.size() == 1 && !AvlIterableList.isLinked(queue)) {
         fairq.add(queue);
       }
-      queueSize++;
     } else if (queue.hasParentLock(proc)) {
       assert addFront : "expected to add a child in the front";
       assert !queue.isSuspended() : "unexpected suspended state for the queue";
@@ -165,7 +167,6 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
       if (!AvlIterableList.isLinked(queue)) {
         fairq.add(queue);
       }
-      queueSize++;
     }
   }
 
@@ -179,13 +180,13 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
     Procedure pollResult = null;
     schedLock.lock();
     try {
-      if (queueSize == 0) {
+      if (!hasRunnables()) {
         if (waitNsec < 0) {
           schedWaitCond.await();
         } else {
           schedWaitCond.awaitNanos(waitNsec);
         }
-        if (queueSize == 0) {
+        if (!hasRunnables()) {
           return null;
         }
       }
@@ -209,6 +210,10 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
     return pollResult;
   }
 
+  private boolean hasRunnables() {
+    return tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables();
+  }
+
   private <T extends Comparable<T>> Procedure doPoll(final FairQueue<T> fairq) {
     final Queue<T> rq = fairq.poll();
     if (rq == null || !rq.isAvailable()) {
@@ -218,13 +223,12 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
     assert !rq.isSuspended() : "rq=" + rq + " is suspended";
     final Procedure pollResult = rq.peek();
     final boolean xlockReq = rq.requireExclusiveLock(pollResult);
-    if (xlockReq && rq.isLocked() && !rq.hasParentLock(pollResult)) {
+    if (xlockReq && rq.isLocked() && !rq.hasLockAccess(pollResult)) {
       // someone is already holding the lock (e.g. shared lock). avoid a yield
       return null;
     }
 
     rq.poll();
-    this.queueSize--;
     if (rq.isEmpty() || xlockReq) {
       removeFromRunQueue(fairq, rq);
     } else if (rq.hasParentLock(pollResult)) {
@@ -232,7 +236,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
       // check if the next procedure is still a child.
       // if not, remove the rq from the fairq and go back to the xlock state
       Procedure nextProc = rq.peek();
-      if (nextProc != null && nextProc.getParentProcId() != pollResult.getParentProcId()) {
+      if (nextProc != null && !Procedure.haveSameParent(nextProc, pollResult)) {
         removeFromRunQueue(fairq, rq);
       }
     }
@@ -255,7 +259,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
       clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR);
       tableMap = null;
 
-      assert queueSize == 0 : "expected queue size to be 0, got " + queueSize;
+      assert size() == 0 : "expected queue size to be 0, got " + size();
     } finally {
       schedLock.unlock();
     }
@@ -271,6 +275,14 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
     }
   }
 
+  private void wakePollIfNeeded(final int waitingCount) {
+    if (waitingCount > 1) {
+      schedWaitCond.signalAll();
+    } else if (waitingCount > 0) {
+      schedWaitCond.signal();
+    }
+  }
+
   @Override
   public void signalAll() {
     schedLock.lock();
@@ -285,14 +297,30 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
   public int size() {
     schedLock.lock();
     try {
-      return queueSize;
+      int count = 0;
+
+      // Server queues
+      final AvlTreeIterator<ServerQueue> serverIter = new AvlTreeIterator<ServerQueue>();
+      for (int i = 0; i < serverBuckets.length; ++i) {
+        serverIter.seekFirst(serverBuckets[i]);
+        while (serverIter.hasNext()) {
+          count += serverIter.next().size();
+        }
+      }
+
+      // Table queues
+      final AvlTreeIterator<TableQueue> tableIter = new AvlTreeIterator<TableQueue>(tableMap);
+      while (tableIter.hasNext()) {
+        count += tableIter.next().size();
+      }
+      return count;
     } finally {
       schedLock.unlock();
     }
   }
 
   @Override
-  public void completionCleanup(Procedure proc) {
+  public void completionCleanup(final Procedure proc) {
     if (proc instanceof TableProcedureInterface) {
       TableProcedureInterface iProcTable = (TableProcedureInterface)proc;
       boolean tableDeleted;
@@ -310,7 +338,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
         tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE);
       }
       if (tableDeleted) {
-        markTableAsDeleted(iProcTable.getTableName());
+        markTableAsDeleted(iProcTable.getTableName(), proc);
         return;
       }
     } else {
@@ -323,14 +351,12 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
     if (AvlIterableList.isLinked(queue)) return;
     if (!queue.isEmpty())  {
       fairq.add(queue);
-      queueSize += queue.size();
     }
   }
 
   private <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq, Queue<T> queue) {
     if (!AvlIterableList.isLinked(queue)) return;
     fairq.remove(queue);
-    queueSize -= queue.size();
   }
 
   // ============================================================================
@@ -470,13 +496,8 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
 
       schedLock.lock();
       try {
-        popEventWaitingObjects(event);
-
-        if (queueSize > 1) {
-          schedWaitCond.signalAll();
-        } else if (queueSize > 0) {
-          schedWaitCond.signal();
-        }
+        final int waitingCount = popEventWaitingObjects(event);
+        wakePollIfNeeded(waitingCount);
       } finally {
         schedLock.unlock();
       }
@@ -493,6 +514,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
     final boolean isTraceEnabled = LOG.isTraceEnabled();
     schedLock.lock();
     try {
+      int waitingCount = 0;
       for (int i = 0; i < count; ++i) {
         final ProcedureEvent event = events[i];
         synchronized (event) {
@@ -500,36 +522,36 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
           if (isTraceEnabled) {
             LOG.trace("Wake event " + event);
           }
-          popEventWaitingObjects(event);
+          waitingCount += popEventWaitingObjects(event);
         }
       }
-
-      if (queueSize > 1) {
-        schedWaitCond.signalAll();
-      } else if (queueSize > 0) {
-        schedWaitCond.signal();
-      }
+      wakePollIfNeeded(waitingCount);
     } finally {
       schedLock.unlock();
     }
   }
 
-  private void popEventWaitingObjects(final ProcedureEvent event) {
+  private int popEventWaitingObjects(final ProcedureEvent event) {
+    int count = 0;
     while (event.hasWaitingTables()) {
       final Queue<TableName> queue = event.popWaitingTable();
       queue.setSuspended(false);
       addToRunQueue(tableRunQueue, queue);
+      count += queue.size();
     }
     // TODO: This will change once we have the new AM
     while (event.hasWaitingServers()) {
       final Queue<ServerName> queue = event.popWaitingServer();
       queue.setSuspended(false);
       addToRunQueue(serverRunQueue, queue);
+      count += queue.size();
     }
 
     while (event.hasWaitingProcedures()) {
       wakeProcedure(event.popWaitingProcedure(false));
+      count++;
     }
+    return count;
   }
 
   private void suspendProcedure(final BaseProcedureEvent event, final Procedure procedure) {
@@ -823,8 +845,8 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
       if (hasExclusiveLock()) {
         // if we have an exclusive lock already taken
         // only child of the lock owner can be executed
-        Procedure availProc = peek();
-        return availProc != null && hasParentLock(availProc);
+        final Procedure nextProc = peek();
+        return nextProc != null && hasLockAccess(nextProc);
       }
 
       // no xlock
@@ -1011,26 +1033,31 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
     schedLock.lock();
     TableQueue queue = getTableQueue(table);
     if (!queue.getNamespaceQueue().trySharedLock()) {
+      schedLock.unlock();
       return false;
     }
 
-    if (!queue.tryExclusiveLock(procedure.getProcId())) {
+    if (!queue.tryExclusiveLock(procedure)) {
       queue.getNamespaceQueue().releaseSharedLock();
       schedLock.unlock();
       return false;
     }
 
     removeFromRunQueue(tableRunQueue, queue);
+    boolean hasParentLock = queue.hasParentLock(procedure);
     schedLock.unlock();
 
-    // Zk lock is expensive...
-    boolean hasXLock = queue.tryZkExclusiveLock(lockManager, procedure.toString());
-    if (!hasXLock) {
-      schedLock.lock();
-      queue.releaseExclusiveLock();
-      queue.getNamespaceQueue().releaseSharedLock();
-      addToRunQueue(tableRunQueue, queue);
-      schedLock.unlock();
+    boolean hasXLock = true;
+    if (!hasParentLock) {
+      // Zk lock is expensive...
+      hasXLock = queue.tryZkExclusiveLock(lockManager, procedure.toString());
+      if (!hasXLock) {
+        schedLock.lock();
+        if (!hasParentLock) queue.releaseExclusiveLock();
+        queue.getNamespaceQueue().releaseSharedLock();
+        addToRunQueue(tableRunQueue, queue);
+        schedLock.unlock();
+      }
     }
     return hasXLock;
   }
@@ -1041,15 +1068,16 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
    * @param table the name of the table that has the exclusive lock
    */
   public void releaseTableExclusiveLock(final Procedure procedure, final TableName table) {
-    schedLock.lock();
-    TableQueue queue = getTableQueue(table);
-    schedLock.unlock();
+    final TableQueue queue = getTableQueueWithLock(table);
+    final boolean hasParentLock = queue.hasParentLock(procedure);
 
-    // Zk lock is expensive...
-    queue.releaseZkExclusiveLock(lockManager);
+    if (!hasParentLock) {
+      // Zk lock is expensive...
+      queue.releaseZkExclusiveLock(lockManager);
+    }
 
     schedLock.lock();
-    queue.releaseExclusiveLock();
+    if (!hasParentLock) queue.releaseExclusiveLock();
     queue.getNamespaceQueue().releaseSharedLock();
     addToRunQueue(tableRunQueue, queue);
     schedLock.unlock();
@@ -1116,17 +1144,19 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
    * If there are new operations pending (e.g. a new create),
    * the remove will not be performed.
    * @param table the name of the table that should be marked as deleted
+   * @param procedure the procedure that is removing the table
    * @return true if deletion succeeded, false otherwise meaning that there are
    *     other new operations pending for that table (e.g. a new create).
    */
-  protected boolean markTableAsDeleted(final TableName table) {
+  @VisibleForTesting
+  protected boolean markTableAsDeleted(final TableName table, final Procedure procedure) {
     final ReentrantLock l = schedLock;
     l.lock();
     try {
       TableQueue queue = getTableQueue(table);
       if (queue == null) return true;
 
-      if (queue.isEmpty() && queue.tryExclusiveLock(0)) {
+      if (queue.isEmpty() && queue.tryExclusiveLock(procedure)) {
         // remove the table from the run-queue and the map
         if (AvlIterableList.isLinked(queue)) {
           tableRunQueue.remove(queue);
@@ -1256,11 +1286,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
         wakeProcedure(nextProcs[i]);
       }
 
-      if (numProcs > 1) {
-        schedWaitCond.signalAll();
-      } else if (numProcs > 0) {
-        schedWaitCond.signal();
-      }
+      wakePollIfNeeded(numProcs);
 
       if (!procedure.hasParent()) {
         // release the table shared-lock.
@@ -1289,7 +1315,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
       if (!tableQueue.trySharedLock()) return false;
 
       NamespaceQueue nsQueue = getNamespaceQueue(nsName);
-      boolean hasLock = nsQueue.tryExclusiveLock(procedure.getProcId());
+      boolean hasLock = nsQueue.tryExclusiveLock(procedure);
       if (!hasLock) {
         tableQueue.releaseSharedLock();
       }
@@ -1333,7 +1359,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
     schedLock.lock();
     try {
       ServerQueue queue = getServerQueue(serverName);
-      if (queue.tryExclusiveLock(procedure.getProcId())) {
+      if (queue.tryExclusiveLock(procedure)) {
         removeFromRunQueue(serverRunQueue, queue);
         return true;
       }
@@ -1473,10 +1499,13 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
       return proc.hasParent() && isLockOwner(proc.getParentProcId());
     }
 
-    public synchronized boolean tryExclusiveLock(final long procIdOwner) {
-      assert procIdOwner != Long.MIN_VALUE;
-      if (isLocked() && !isLockOwner(procIdOwner)) return false;
-      exclusiveLockProcIdOwner = procIdOwner;
+    public synchronized boolean hasLockAccess(final Procedure proc) {
+      return isLockOwner(proc.getProcId()) || hasParentLock(proc);
+    }
+
+    public synchronized boolean tryExclusiveLock(final Procedure proc) {
+      if (isLocked()) return hasLockAccess(proc);
+      exclusiveLockProcIdOwner = proc.getProcId();
       return true;
     }
 
@@ -1564,6 +1593,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
     private Queue<T> currentQueue = null;
     private Queue<T> queueHead = null;
     private int currentQuantum = 0;
+    private int size = 0;
 
     public FairQueue() {
       this(1);
@@ -1573,9 +1603,14 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
       this.quantum = quantum;
     }
 
+    public boolean hasRunnables() {
+      return size > 0;
+    }
+
     public void add(Queue<T> queue) {
       queueHead = AvlIterableList.append(queueHead, queue);
       if (currentQueue == null) setNextQueue(queueHead);
+      size++;
     }
 
     public void remove(Queue<T> queue) {
@@ -1584,6 +1619,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
       if (currentQueue == queue) {
         setNextQueue(queueHead != null ? nextQueue : null);
       }
+      size--;
     }
 
     public Queue<T> poll() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8da0500e/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
index 7feeec5..58eaf3c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
@@ -20,10 +20,7 @@ package org.apache.hadoop.hbase.master.procedure;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,8 +34,8 @@ import org.apache.hadoop.hbase.master.TableLockManager;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
 import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -51,7 +48,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-@Category({MasterTests.class, MediumTests.class})
+@Category({MasterTests.class, SmallTests.class})
 public class TestMasterProcedureScheduler {
   private static final Log LOG = LogFactory.getLog(TestMasterProcedureScheduler.class);
 
@@ -70,60 +67,6 @@ public class TestMasterProcedureScheduler {
     queue.clear();
   }
 
-  @Test
-  public void testConcurrentCreateDelete() throws Exception {
-    final MasterProcedureScheduler procQueue = queue;
-    final TableName table = TableName.valueOf("testtb");
-    final AtomicBoolean running = new AtomicBoolean(true);
-    final AtomicBoolean failure = new AtomicBoolean(false);
-    Thread createThread = new Thread() {
-      @Override
-      public void run() {
-        try {
-          TestTableProcedure proc = new TestTableProcedure(1, table,
-              TableProcedureInterface.TableOperationType.CREATE);
-          while (running.get() && !failure.get()) {
-            if (procQueue.tryAcquireTableExclusiveLock(proc, table)) {
-              procQueue.releaseTableExclusiveLock(proc, table);
-            }
-          }
-        } catch (Throwable e) {
-          LOG.error("create failed", e);
-          failure.set(true);
-        }
-      }
-    };
-
-    Thread deleteThread = new Thread() {
-      @Override
-      public void run() {
-        try {
-          TestTableProcedure proc = new TestTableProcedure(2, table,
-              TableProcedureInterface.TableOperationType.DELETE);
-          while (running.get() && !failure.get()) {
-            if (procQueue.tryAcquireTableExclusiveLock(proc, table)) {
-              procQueue.releaseTableExclusiveLock(proc, table);
-            }
-            procQueue.markTableAsDeleted(table);
-          }
-        } catch (Throwable e) {
-          LOG.error("delete failed", e);
-          failure.set(true);
-        }
-      }
-    };
-
-    createThread.start();
-    deleteThread.start();
-    for (int i = 0; i < 100 && running.get() && !failure.get(); ++i) {
-      Thread.sleep(100);
-    }
-    running.set(false);
-    createThread.join();
-    deleteThread.join();
-    assertEquals(false, failure.get());
-  }
-
   /**
    * Verify simple create/insert/fetch/delete of the table queue.
    */
@@ -159,9 +102,11 @@ public class TestMasterProcedureScheduler {
     assertEquals(0, queue.size());
 
     for (int i = 1; i <= NUM_TABLES; ++i) {
-      TableName tableName = TableName.valueOf(String.format("test-%04d", i));
+      final TableName tableName = TableName.valueOf(String.format("test-%04d", i));
+      final TestTableProcedure dummyProc = new TestTableProcedure(100, tableName,
+        TableProcedureInterface.TableOperationType.DELETE);
       // complete the table deletion
-      assertTrue(queue.markTableAsDeleted(tableName));
+      assertTrue(queue.markTableAsDeleted(tableName, dummyProc));
     }
   }
 
@@ -173,11 +118,14 @@ public class TestMasterProcedureScheduler {
   public void testCreateDeleteTableOperationsWithWriteLock() throws Exception {
     TableName tableName = TableName.valueOf("testtb");
 
+    final TestTableProcedure dummyProc = new TestTableProcedure(100, tableName,
+        TableProcedureInterface.TableOperationType.DELETE);
+
     queue.addBack(new TestTableProcedure(1, tableName,
           TableProcedureInterface.TableOperationType.EDIT));
 
     // table can't be deleted because one item is in the queue
-    assertFalse(queue.markTableAsDeleted(tableName));
+    assertFalse(queue.markTableAsDeleted(tableName, dummyProc));
 
     // fetch item and take a lock
     Procedure proc = queue.poll();
@@ -186,11 +134,11 @@ public class TestMasterProcedureScheduler {
     assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName));
     // table can't be deleted because we have the lock
     assertEquals(0, queue.size());
-    assertFalse(queue.markTableAsDeleted(tableName));
+    assertFalse(queue.markTableAsDeleted(tableName, dummyProc));
     // release the xlock
     queue.releaseTableExclusiveLock(proc, tableName);
     // complete the table deletion
-    assertTrue(queue.markTableAsDeleted(tableName));
+    assertTrue(queue.markTableAsDeleted(tableName, proc));
   }
 
   /**
@@ -202,13 +150,16 @@ public class TestMasterProcedureScheduler {
     final TableName tableName = TableName.valueOf("testtb");
     final int nitems = 2;
 
+    final TestTableProcedure dummyProc = new TestTableProcedure(100, tableName,
+        TableProcedureInterface.TableOperationType.DELETE);
+
     for (int i = 1; i <= nitems; ++i) {
       queue.addBack(new TestTableProcedure(i, tableName,
             TableProcedureInterface.TableOperationType.READ));
     }
 
     // table can't be deleted because one item is in the queue
-    assertFalse(queue.markTableAsDeleted(tableName));
+    assertFalse(queue.markTableAsDeleted(tableName, dummyProc));
 
     Procedure[] procs = new Procedure[nitems];
     for (int i = 0; i < nitems; ++i) {
@@ -218,12 +169,12 @@ public class TestMasterProcedureScheduler {
       // take the rlock
       assertTrue(queue.tryAcquireTableSharedLock(proc, tableName));
       // table can't be deleted because we have locks and/or items in the queue
-      assertFalse(queue.markTableAsDeleted(tableName));
+      assertFalse(queue.markTableAsDeleted(tableName, dummyProc));
     }
 
     for (int i = 0; i < nitems; ++i) {
       // table can't be deleted because we have locks
-      assertFalse(queue.markTableAsDeleted(tableName));
+      assertFalse(queue.markTableAsDeleted(tableName, dummyProc));
       // release the rlock
       queue.releaseTableSharedLock(procs[i], tableName);
     }
@@ -231,7 +182,7 @@ public class TestMasterProcedureScheduler {
     // there are no items and no lock in the queeu
     assertEquals(0, queue.size());
     // complete the table deletion
-    assertTrue(queue.markTableAsDeleted(tableName));
+    assertTrue(queue.markTableAsDeleted(tableName, dummyProc));
   }
 
   /**
@@ -299,7 +250,7 @@ public class TestMasterProcedureScheduler {
 
     // remove table queue
     assertEquals(0, queue.size());
-    assertTrue("queue should be deleted", queue.markTableAsDeleted(tableName));
+    assertTrue("queue should be deleted", queue.markTableAsDeleted(tableName, wrProc));
   }
 
   @Test
@@ -355,6 +306,32 @@ public class TestMasterProcedureScheduler {
   }
 
   @Test
+  public void testVerifyNamespaceXLock() throws Exception {
+    String nsName = "ns1";
+    TableName tableName = TableName.valueOf(nsName, "testtb");
+    queue.addBack(new TestNamespaceProcedure(1, nsName,
+          TableProcedureInterface.TableOperationType.CREATE));
+    queue.addBack(new TestTableProcedure(2, tableName,
+          TableProcedureInterface.TableOperationType.READ));
+
+    // Fetch the ns item and take the xlock
+    Procedure proc = queue.poll();
+    assertEquals(1, proc.getProcId());
+    assertEquals(true, queue.tryAcquireNamespaceExclusiveLock(proc, nsName));
+
+    // the table operation can't be executed because the ns is locked
+    assertEquals(null, queue.poll(0));
+
+    // release the ns lock
+    queue.releaseNamespaceExclusiveLock(proc, nsName);
+
+    proc = queue.poll();
+    assertEquals(2, proc.getProcId());
+    assertEquals(true, queue.tryAcquireTableExclusiveLock(proc, tableName));
+    queue.releaseTableExclusiveLock(proc, tableName);
+  }
+
+  @Test
   public void testSharedZkLock() throws Exception {
     final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
     final String dir = TEST_UTIL.getDataTestDir("TestSharedZkLock").toString();
@@ -625,154 +602,80 @@ public class TestMasterProcedureScheduler {
     assertEquals(null, queue.poll(0));
   }
 
-  /**
-   * Verify that "write" operations for a single table are serialized,
-   * but different tables can be executed in parallel.
-   */
-  @Test(timeout=90000)
-  public void testConcurrentWriteOps() throws Exception {
-    final TestTableProcSet procSet = new TestTableProcSet(queue);
+  @Test
+  public void testParentXLockAndChildrenSharedLock() throws Exception {
+    final TableName tableName = TableName.valueOf("testParentXLockAndChildrenSharedLock");
+    final HRegionInfo[] regions = new HRegionInfo[] {
+      new HRegionInfo(tableName, Bytes.toBytes("a"), Bytes.toBytes("b")),
+      new HRegionInfo(tableName, Bytes.toBytes("b"), Bytes.toBytes("c")),
+      new HRegionInfo(tableName, Bytes.toBytes("c"), Bytes.toBytes("d")),
+    };
 
-    final int NUM_ITEMS = 10;
-    final int NUM_TABLES = 4;
-    final AtomicInteger opsCount = new AtomicInteger(0);
-    for (int i = 0; i < NUM_TABLES; ++i) {
-      TableName tableName = TableName.valueOf(String.format("testtb-%04d", i));
-      for (int j = 1; j < NUM_ITEMS; ++j) {
-        procSet.addBack(new TestTableProcedure(i * 100 + j, tableName,
-          TableProcedureInterface.TableOperationType.EDIT));
-        opsCount.incrementAndGet();
-      }
-    }
-    assertEquals(opsCount.get(), queue.size());
-
-    final Thread[] threads = new Thread[NUM_TABLES * 2];
-    final HashSet<TableName> concurrentTables = new HashSet<TableName>();
-    final ArrayList<String> failures = new ArrayList<String>();
-    final AtomicInteger concurrentCount = new AtomicInteger(0);
-    for (int i = 0; i < threads.length; ++i) {
-      threads[i] = new Thread() {
-        @Override
-        public void run() {
-          while (opsCount.get() > 0) {
-            try {
-              Procedure proc = procSet.acquire();
-              if (proc == null) {
-                queue.signalAll();
-                if (opsCount.get() > 0) {
-                  continue;
-                }
-                break;
-              }
-
-              TableName tableId = procSet.getTableName(proc);
-              synchronized (concurrentTables) {
-                assertTrue("unexpected concurrency on " + tableId, concurrentTables.add(tableId));
-              }
-              assertTrue(opsCount.decrementAndGet() >= 0);
-              try {
-                long procId = proc.getProcId();
-                int concurrent = concurrentCount.incrementAndGet();
-                assertTrue("inc-concurrent="+ concurrent +" 1 <= concurrent <= "+ NUM_TABLES,
-                  concurrent >= 1 && concurrent <= NUM_TABLES);
-                LOG.debug("[S] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent);
-                Thread.sleep(2000);
-                concurrent = concurrentCount.decrementAndGet();
-                LOG.debug("[E] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent);
-                assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_TABLES);
-              } finally {
-                synchronized (concurrentTables) {
-                  assertTrue(concurrentTables.remove(tableId));
-                }
-                procSet.release(proc);
-              }
-            } catch (Throwable e) {
-              LOG.error("Failed " + e.getMessage(), e);
-              synchronized (failures) {
-                failures.add(e.getMessage());
-              }
-            } finally {
-              queue.signalAll();
-            }
-          }
-        }
-      };
-      threads[i].start();
-    }
-    for (int i = 0; i < threads.length; ++i) {
-      threads[i].join();
-    }
-    assertTrue(failures.toString(), failures.isEmpty());
-    assertEquals(0, opsCount.get());
-    assertEquals(0, queue.size());
+    queue.addBack(new TestTableProcedure(1, tableName,
+        TableProcedureInterface.TableOperationType.CREATE));
 
-    for (int i = 1; i <= NUM_TABLES; ++i) {
-      TableName table = TableName.valueOf(String.format("testtb-%04d", i));
-      assertTrue("queue should be deleted, table=" + table, queue.markTableAsDeleted(table));
+    // fetch and acquire first xlock proc
+    Procedure parentProc = queue.poll();
+    assertEquals(1, parentProc.getProcId());
+    assertTrue(queue.tryAcquireTableExclusiveLock(parentProc, tableName));
+
+    // add child procedure
+    for (int i = 0; i < regions.length; ++i) {
+      queue.addFront(new TestRegionProcedure(1, 1 + i, tableName,
+          TableProcedureInterface.TableOperationType.ASSIGN, regions[i]));
     }
-  }
 
-  public static class TestTableProcSet {
-    private final MasterProcedureScheduler queue;
+    // add another xlock procedure (no parent)
+    queue.addBack(new TestTableProcedure(100, tableName,
+        TableProcedureInterface.TableOperationType.EDIT));
 
-    public TestTableProcSet(final MasterProcedureScheduler queue) {
-      this.queue = queue;
+    // fetch and execute child
+    for (int i = 0; i < regions.length; ++i) {
+      final int regionIdx = regions.length - i - 1;
+      Procedure childProc = queue.poll();
+      LOG.debug("fetch children " + childProc);
+      assertEquals(1 + regionIdx, childProc.getProcId());
+      assertEquals(false, queue.waitRegion(childProc, regions[regionIdx]));
+      queue.wakeRegion(childProc, regions[regionIdx]);
     }
 
-    public void addBack(Procedure proc) {
-      queue.addBack(proc);
-    }
+    // nothing available, until xlock release
+    assertEquals(null, queue.poll(0));
 
-    public void addFront(Procedure proc) {
-      queue.addFront(proc);
-    }
+    // release xlock
+    queue.releaseTableExclusiveLock(parentProc, tableName);
 
-    public Procedure acquire() {
-      Procedure proc = null;
-      boolean avail = false;
-      while (!avail) {
-        proc = queue.poll();
-        if (proc == null) break;
-        switch (getTableOperationType(proc)) {
-          case CREATE:
-          case DELETE:
-          case EDIT:
-            avail = queue.tryAcquireTableExclusiveLock(proc, getTableName(proc));
-            break;
-          case READ:
-            avail = queue.tryAcquireTableSharedLock(proc, getTableName(proc));
-            break;
-          default:
-            throw new UnsupportedOperationException();
-        }
-        if (!avail) {
-          addFront(proc);
-          LOG.debug("yield procId=" + proc);
-        }
-      }
-      return proc;
-    }
+    // fetch the other xlock proc
+    Procedure proc = queue.poll();
+    assertEquals(100, proc.getProcId());
+    assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName));
+    queue.releaseTableExclusiveLock(proc, tableName);
+  }
 
-    public void release(Procedure proc) {
-      switch (getTableOperationType(proc)) {
-        case CREATE:
-        case DELETE:
-        case EDIT:
-          queue.releaseTableExclusiveLock(proc, getTableName(proc));
-          break;
-        case READ:
-          queue.releaseTableSharedLock(proc, getTableName(proc));
-          break;
-      }
-    }
+  @Test
+  public void testParentXLockAndChildrenXLock() throws Exception {
+    final TableName tableName = TableName.valueOf("testParentXLockAndChildrenXLock");
 
-    public TableName getTableName(Procedure proc) {
-      return ((TableProcedureInterface)proc).getTableName();
-    }
+    queue.addBack(new TestTableProcedure(1, tableName,
+        TableProcedureInterface.TableOperationType.EDIT));
 
-    public TableProcedureInterface.TableOperationType getTableOperationType(Procedure proc) {
-      return ((TableProcedureInterface)proc).getTableOperationType();
-    }
+    // fetch and acquire first xlock proc
+    Procedure parentProc = queue.poll();
+    assertEquals(1, parentProc.getProcId());
+    assertTrue(queue.tryAcquireTableExclusiveLock(parentProc, tableName));
+
+    // add child procedure
+    queue.addFront(new TestTableProcedure(1, 2, tableName,
+      TableProcedureInterface.TableOperationType.EDIT));
+
+    // fetch the other xlock proc
+    Procedure proc = queue.poll();
+    assertEquals(2, proc.getProcId());
+    assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName));
+    queue.releaseTableExclusiveLock(proc, tableName);
+
+    // release xlock
+    queue.releaseTableExclusiveLock(parentProc, tableName);
   }
 
   public static class TestTableProcedure extends TestProcedure
@@ -813,6 +716,19 @@ public class TestMasterProcedureScheduler {
     }
   }
 
+  public static class TestTableProcedureWithEvent extends TestTableProcedure {
+    private final ProcedureEvent event;
+
+    public TestTableProcedureWithEvent(long procId, TableName tableName, TableOperationType opType) {
+      super(procId, tableName, opType);
+      event = new ProcedureEvent(tableName + " procId=" + procId);
+    }
+
+    public ProcedureEvent getEvent() {
+      return event;
+    }
+  }
+
   public static class TestRegionProcedure extends TestTableProcedure {
     private final HRegionInfo[] regionInfo;
 
@@ -839,7 +755,7 @@ public class TestMasterProcedureScheduler {
     public void toStringClassDetails(final StringBuilder sb) {
       sb.append(getClass().getSimpleName());
       sb.append(" (region=");
-      sb.append(getRegionInfo());
+      sb.append(Arrays.toString(getRegionInfo()));
       sb.append(")");
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8da0500e/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java
new file mode 100644
index 0000000..380067d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.TableLockManager;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
+import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestTableProcedure;
+import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestTableProcedureWithEvent;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@Category({MasterTests.class, MediumTests.class})
+public class TestMasterProcedureSchedulerConcurrency {
+  private static final Log LOG = LogFactory.getLog(TestMasterProcedureSchedulerConcurrency.class);
+
+  private MasterProcedureScheduler queue;
+  private Configuration conf;
+
+  @Before
+  public void setUp() throws IOException {
+    conf = HBaseConfiguration.create();
+    queue = new MasterProcedureScheduler(conf, new TableLockManager.NullTableLockManager());
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    assertEquals("proc-queue expected to be empty", 0, queue.size());
+    queue.clear();
+  }
+
+  @Test(timeout=60000)
+  public void testConcurrentCreateDelete() throws Exception {
+    final MasterProcedureScheduler procQueue = queue;
+    final TableName table = TableName.valueOf("testtb");
+    final AtomicBoolean running = new AtomicBoolean(true);
+    final AtomicBoolean failure = new AtomicBoolean(false);
+    Thread createThread = new Thread() {
+      @Override
+      public void run() {
+        try {
+          TestTableProcedure proc = new TestTableProcedure(1, table,
+              TableProcedureInterface.TableOperationType.CREATE);
+          while (running.get() && !failure.get()) {
+            if (procQueue.tryAcquireTableExclusiveLock(proc, table)) {
+              procQueue.releaseTableExclusiveLock(proc, table);
+            }
+          }
+        } catch (Throwable e) {
+          LOG.error("create failed", e);
+          failure.set(true);
+        }
+      }
+    };
+
+    Thread deleteThread = new Thread() {
+      @Override
+      public void run() {
+        try {
+          TestTableProcedure proc = new TestTableProcedure(2, table,
+              TableProcedureInterface.TableOperationType.DELETE);
+          while (running.get() && !failure.get()) {
+            if (procQueue.tryAcquireTableExclusiveLock(proc, table)) {
+              procQueue.releaseTableExclusiveLock(proc, table);
+            }
+            procQueue.markTableAsDeleted(table, proc);
+          }
+        } catch (Throwable e) {
+          LOG.error("delete failed", e);
+          failure.set(true);
+        }
+      }
+    };
+
+    createThread.start();
+    deleteThread.start();
+    for (int i = 0; i < 100 && running.get() && !failure.get(); ++i) {
+      Thread.sleep(100);
+    }
+    running.set(false);
+    createThread.join();
+    deleteThread.join();
+    assertEquals(false, failure.get());
+  }
+
+  /**
+   * Verify that "write" operations for a single table are serialized,
+   * but different tables can be executed in parallel.
+   */
+  @Test(timeout=60000)
+  public void testConcurrentWriteOps() throws Exception {
+    final TestTableProcSet procSet = new TestTableProcSet(queue);
+
+    final int NUM_ITEMS = 10;
+    final int NUM_TABLES = 4;
+    final AtomicInteger opsCount = new AtomicInteger(0);
+    for (int i = 0; i < NUM_TABLES; ++i) {
+      TableName tableName = TableName.valueOf(String.format("testtb-%04d", i));
+      for (int j = 1; j < NUM_ITEMS; ++j) {
+        procSet.addBack(new TestTableProcedure(i * 100 + j, tableName,
+          TableProcedureInterface.TableOperationType.EDIT));
+        opsCount.incrementAndGet();
+      }
+    }
+    assertEquals(opsCount.get(), queue.size());
+
+    final Thread[] threads = new Thread[NUM_TABLES * 2];
+    final HashSet<TableName> concurrentTables = new HashSet<TableName>();
+    final ArrayList<String> failures = new ArrayList<String>();
+    final AtomicInteger concurrentCount = new AtomicInteger(0);
+    for (int i = 0; i < threads.length; ++i) {
+      threads[i] = new Thread() {
+        @Override
+        public void run() {
+          while (opsCount.get() > 0) {
+            try {
+              Procedure proc = procSet.acquire();
+              if (proc == null) {
+                queue.signalAll();
+                if (opsCount.get() > 0) {
+                  continue;
+                }
+                break;
+              }
+
+              TableName tableId = procSet.getTableName(proc);
+              synchronized (concurrentTables) {
+                assertTrue("unexpected concurrency on " + tableId, concurrentTables.add(tableId));
+              }
+              assertTrue(opsCount.decrementAndGet() >= 0);
+              try {
+                long procId = proc.getProcId();
+                int concurrent = concurrentCount.incrementAndGet();
+                assertTrue("inc-concurrent="+ concurrent +" 1 <= concurrent <= "+ NUM_TABLES,
+                  concurrent >= 1 && concurrent <= NUM_TABLES);
+                LOG.debug("[S] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent);
+                Thread.sleep(2000);
+                concurrent = concurrentCount.decrementAndGet();
+                LOG.debug("[E] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent);
+                assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_TABLES);
+              } finally {
+                synchronized (concurrentTables) {
+                  assertTrue(concurrentTables.remove(tableId));
+                }
+                procSet.release(proc);
+              }
+            } catch (Throwable e) {
+              LOG.error("Failed " + e.getMessage(), e);
+              synchronized (failures) {
+                failures.add(e.getMessage());
+              }
+            } finally {
+              queue.signalAll();
+            }
+          }
+        }
+      };
+      threads[i].start();
+    }
+    for (int i = 0; i < threads.length; ++i) {
+      threads[i].join();
+    }
+    assertTrue(failures.toString(), failures.isEmpty());
+    assertEquals(0, opsCount.get());
+    assertEquals(0, queue.size());
+
+    for (int i = 1; i <= NUM_TABLES; ++i) {
+      final TableName table = TableName.valueOf(String.format("testtb-%04d", i));
+      final TestTableProcedure dummyProc = new TestTableProcedure(100, table,
+        TableProcedureInterface.TableOperationType.DELETE);
+      assertTrue("queue should be deleted, table=" + table,
+        queue.markTableAsDeleted(table, dummyProc));
+    }
+  }
+
+  @Test(timeout=60000)
+  public void testConcurrentWaitWake() throws Exception {
+    testConcurrentWaitWake(false);
+  }
+
+  @Test(timeout=60000)
+  public void testConcurrentWaitWakeBatch() throws Exception {
+    testConcurrentWaitWake(true);
+  }
+
+  private void testConcurrentWaitWake(final boolean useWakeBatch) throws Exception {
+    final TableName tableName = TableName.valueOf("testtb");
+
+    final int NPROCS = 20;
+    final int NRUNS = 100;
+
+    for (long i = 0; i < NPROCS; ++i) {
+      queue.addBack(new TestTableProcedureWithEvent(i, tableName,
+          TableProcedureInterface.TableOperationType.READ));
+    }
+
+    final Thread[] threads = new Thread[4];
+    final AtomicInteger waitCount = new AtomicInteger(0);
+    final AtomicInteger wakeCount = new AtomicInteger(0);
+
+    final ConcurrentSkipListSet<TestTableProcedureWithEvent> waitQueue =
+      new ConcurrentSkipListSet<TestTableProcedureWithEvent>();
+    threads[0] = new Thread() {
+      @Override
+      public void run() {
+        while (true) {
+          if (useWakeBatch) {
+            ProcedureEvent[] ev = new ProcedureEvent[waitQueue.size()];
+            for (int i = 0; i < ev.length; ++i) {
+              ev[i] = waitQueue.pollFirst().getEvent();
+              LOG.debug("WAKE " + ev[i] + " total=" + wakeCount.get());
+            }
+            queue.wakeEvents(ev, ev.length);
+            wakeCount.addAndGet(ev.length);
+          } else {
+            int size = waitQueue.size();
+            while (size-- > 0) {
+              ProcedureEvent ev = waitQueue.pollFirst().getEvent();
+              queue.wakeEvent(ev);
+              LOG.debug("WAKE " + ev + " total=" + wakeCount.get());
+              wakeCount.incrementAndGet();
+            }
+          }
+          if (wakeCount.get() >= NRUNS) {
+            break;
+          }
+          Threads.sleepWithoutInterrupt(25);
+        }
+      }
+    };
+
+    for (int i = 1; i < threads.length; ++i) {
+      threads[i] = new Thread() {
+        @Override
+        public void run() {
+          while (true) {
+            TestTableProcedureWithEvent proc = (TestTableProcedureWithEvent)queue.poll();
+            if (proc == null) continue;
+
+            waitQueue.add(proc);
+            queue.suspendEvent(proc.getEvent());
+            queue.waitEvent(proc.getEvent(), proc);
+            LOG.debug("WAIT " + proc.getEvent());
+            if (waitCount.incrementAndGet() >= NRUNS) {
+              break;
+            }
+          }
+        }
+      };
+    }
+
+    for (int i = 0; i < threads.length; ++i) {
+      threads[i].start();
+    }
+    for (int i = 0; i < threads.length; ++i) {
+      threads[i].join();
+    }
+
+    queue.clear();
+  }
+
+  public static class TestTableProcSet {
+    private final MasterProcedureScheduler queue;
+
+    public TestTableProcSet(final MasterProcedureScheduler queue) {
+      this.queue = queue;
+    }
+
+    public void addBack(Procedure proc) {
+      queue.addBack(proc);
+    }
+
+    public void addFront(Procedure proc) {
+      queue.addFront(proc);
+    }
+
+    public Procedure acquire() {
+      Procedure proc = null;
+      boolean avail = false;
+      while (!avail) {
+        proc = queue.poll();
+        if (proc == null) break;
+        switch (getTableOperationType(proc)) {
+          case CREATE:
+          case DELETE:
+          case EDIT:
+            avail = queue.tryAcquireTableExclusiveLock(proc, getTableName(proc));
+            break;
+          case READ:
+            avail = queue.tryAcquireTableSharedLock(proc, getTableName(proc));
+            break;
+          default:
+            throw new UnsupportedOperationException();
+        }
+        if (!avail) {
+          addFront(proc);
+          LOG.debug("yield procId=" + proc);
+        }
+      }
+      return proc;
+    }
+
+    public void release(Procedure proc) {
+      switch (getTableOperationType(proc)) {
+        case CREATE:
+        case DELETE:
+        case EDIT:
+          queue.releaseTableExclusiveLock(proc, getTableName(proc));
+          break;
+        case READ:
+          queue.releaseTableSharedLock(proc, getTableName(proc));
+          break;
+      }
+    }
+
+    public TableName getTableName(Procedure proc) {
+      return ((TableProcedureInterface)proc).getTableName();
+    }
+
+    public TableProcedureInterface.TableOperationType getTableOperationType(Procedure proc) {
+      return ((TableProcedureInterface)proc).getTableOperationType();
+    }
+  }
+}