You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/11/27 20:05:55 UTC

hbase git commit: HBASE-19319 Fix bug in synchronizing over ProcedureEvent

Repository: hbase
Updated Branches:
  refs/heads/master 5a0881a98 -> f88671661


HBASE-19319 Fix bug in synchronizing over ProcedureEvent

Also moves event related functions (wake/wait/suspend) from ProcedureScheduler to ProcedureEvent class


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f8867166
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f8867166
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f8867166

Branch: refs/heads/master
Commit: f8867166178e76598393437baac3849f4943ff16
Parents: 5a0881a
Author: Apekshit Sharma <ap...@apache.org>
Authored: Mon Nov 20 19:16:23 2017 -0800
Committer: Apekshit Sharma <ap...@apache.org>
Committed: Mon Nov 27 11:51:17 2017 -0800

----------------------------------------------------------------------
 .../procedure2/AbstractProcedureScheduler.java  | 89 ++++++++------------
 .../hadoop/hbase/procedure2/ProcedureEvent.java | 79 +++++++++++++++--
 .../hbase/procedure2/ProcedureScheduler.java    | 40 ++-------
 .../hbase/procedure2/TestProcedureEvents.java   | 30 +++++--
 .../TestProcedureSchedulerConcurrency.java      |  8 +-
 .../master/assignment/AssignmentManager.java    | 21 +++--
 .../assignment/RegionTransitionProcedure.java   | 10 +--
 .../hbase/master/locking/LockProcedure.java     |  8 +-
 .../master/procedure/MasterProcedureEnv.java    | 10 +--
 .../procedure/TestMasterProcedureEvents.java    |  2 +-
 .../procedure/TestMasterProcedureScheduler.java |  4 +-
 .../TestRegionMergeTransactionOnCluster.java    | 17 ++--
 12 files changed, 177 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f8867166/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
index 3bff8b8..3e47451 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hbase.procedure2;
 
+import java.util.Iterator;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.TimeUnit;
@@ -79,10 +80,31 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
    */
   protected abstract void enqueue(Procedure procedure, boolean addFront);
 
+  @Override
   public void addFront(final Procedure procedure) {
     push(procedure, true, true);
   }
 
+  @Override
+  public void addFront(Iterator<Procedure> procedureIterator) {
+    schedLock();
+    try {
+      int count = 0;
+      while (procedureIterator.hasNext()) {
+        Procedure procedure = procedureIterator.next();
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Wake " + procedure);
+        }
+        push(procedure, /* addFront= */ true, /* notify= */false);
+        count++;
+      }
+      wakePollIfNeeded(count);
+    } finally {
+      schedUnlock();
+    }
+  }
+
+  @Override
   public void addBack(final Procedure procedure) {
     push(procedure, false, true);
   }
@@ -206,61 +228,22 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
   // ==========================================================================
   //  Procedure Events
   // ==========================================================================
-  @Override
-  public boolean waitEvent(final ProcedureEvent event, final Procedure procedure) {
-    synchronized (event) {
-      if (event.isReady()) {
-        return false;
-      }
-      waitProcedure(event.getSuspendedProcedures(), procedure);
-      return true;
-    }
-  }
-
-  @Override
-  public void suspendEvent(final ProcedureEvent event) {
-    final boolean traceEnabled = LOG.isTraceEnabled();
-    synchronized (event) {
-      event.setReady(false);
-      if (traceEnabled) {
-        LOG.trace("Suspend " + event);
-      }
-    }
-  }
-
-  @Override
-  public void wakeEvent(final ProcedureEvent event) {
-    wakeEvents(1, event);
-  }
 
-  @Override
-  public void wakeEvents(final int count, final ProcedureEvent... events) {
-    final boolean traceEnabled = LOG.isTraceEnabled();
+  /**
+   * Wake up all of the given events.
+   * Note that we first take scheduler lock and then wakeInternal() synchronizes on the event.
+   * Access should remain package-private. Use ProcedureEvent class to wake/suspend events.
+   * @param events the list of events to wake
+   */
+  void wakeEvents(ProcedureEvent[] events) {
     schedLock();
     try {
-      int waitingCount = 0;
-      for (int i = 0; i < count; ++i) {
-        final ProcedureEvent event = events[i];
-        synchronized (event) {
-          if (!event.isReady()) {
-            // Only set ready if we were not ready; i.e. suspended. Otherwise, we double-wake
-            // on this event and down in wakeWaitingProcedures, we double decrement this
-            // finish which messes up child procedure accounting.
-            event.setReady(true);
-            if (traceEnabled) {
-              LOG.trace("Unsuspend " + event);
-            }
-            waitingCount += wakeWaitingProcedures(event.getSuspendedProcedures());
-          } else {
-            ProcedureDeque q = event.getSuspendedProcedures();
-            if (q != null && !q.isEmpty()) {
-              LOG.warn("Q is not empty! size=" + q.size() + "; PROCESSING...");
-              waitingCount += wakeWaitingProcedures(event.getSuspendedProcedures());
-            }
-          }
+      for (ProcedureEvent event : events) {
+        if (event == null) {
+          continue;
         }
+        event.wakeInternal(this);
       }
-      wakePollIfNeeded(waitingCount);
     } finally {
       schedUnlock();
     }
@@ -275,9 +258,8 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
     // wakeProcedure adds to the front of queue, so we start from last in the
     // waitQueue' queue, so that the procedure which was added first goes in the front for
     // the scheduler queue.
-    while (!waitQueue.isEmpty()) {
-      wakeProcedure(waitQueue.removeLast());
-    }
+    addFront(waitQueue.descendingIterator());
+    waitQueue.clear();
     return count;
   }
 
@@ -290,6 +272,7 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
     push(procedure, /* addFront= */ true, /* notify= */false);
   }
 
+
   // ==========================================================================
   //  Internal helpers
   // ==========================================================================

http://git-wip-us.apache.org/repos/asf/hbase/blob/f8867166/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java
index d3e076c..20803f4 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,15 +18,20 @@
 
 package org.apache.hadoop.hbase.procedure2;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+
 /**
  * Basic ProcedureEvent that contains an "object", which can be a description or a reference to the
  * resource to wait on, and a queue for suspended procedures.
- * Access to suspended procedures queue is 'synchronized' on the event itself.
  */
 @InterfaceAudience.Private
 public class ProcedureEvent<T> {
+  private static final Log LOG = LogFactory.getLog(ProcedureEvent.class);
+
   private final T object;
   private boolean ready = false;
   private ProcedureDeque suspendedProcedures = new ProcedureDeque();
@@ -39,10 +44,74 @@ public class ProcedureEvent<T> {
     return ready;
   }
 
-  synchronized void setReady(final boolean isReady) {
-    this.ready = isReady;
+  /**
+   * @return true if event is not ready and adds procedure to suspended queue, else returns false.
+   */
+  public synchronized boolean suspendIfNotReady(Procedure proc) {
+    if (!ready) {
+      suspendedProcedures.addLast(proc);
+    }
+    return !ready;
+  }
+
+  /** Mark the event as not ready. */
+  public synchronized void suspend() {
+    ready = false;
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Suspend " + toString());
+    }
+  }
+
+  /**
+   * Wakes up the suspended procedures by pushing them back into scheduler queues and sets the
+   * event as ready.
+   * See {@link #wakeInternal(AbstractProcedureScheduler)} for why this is not synchronized.
+   */
+  public void wake(AbstractProcedureScheduler procedureScheduler) {
+    procedureScheduler.wakeEvents(new ProcedureEvent[]{this});
+  }
+
+  /**
+   * Wakes up all the given events and puts the procedures waiting on them back into
+   * ProcedureScheduler queues.
+   */
+  public static void wakeEvents(AbstractProcedureScheduler scheduler, ProcedureEvent ... events) {
+    scheduler.wakeEvents(events);
+  }
+
+  /**
+   * Only to be used by ProcedureScheduler implementations.
+   * Reason: To wake up multiple events, locking sequence is
+   * schedLock --> synchronized (event)
+   * To wake up an event, both schedLock() and synchronized(event) are required.
+   * The order is schedLock() --> synchronized(event) because when waking up multiple events
+   * simultaneously, we keep the scheduler locked until all procedures suspended on these events
+   * have been added back to the queue (Maybe it's not required? Evaluate!)
+   * To avoid deadlocks, we want to keep the locking order same even when waking up single event.
+   * That's why, {@link #wake(AbstractProcedureScheduler)} above uses the same code path as used
+   * when waking up multiple events.
+   * Access should remain package-private.
+   */
+  synchronized void wakeInternal(AbstractProcedureScheduler procedureScheduler) {
+    if (ready && !suspendedProcedures.isEmpty()) {
+      LOG.warn("Found procedures suspended in a ready event! Size=" + suspendedProcedures.size());
+    }
+    ready = true;
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Unsuspend " + toString());
+    }
+    // wakeProcedure adds to the front of queue, so we start from last in the
+    // waitQueue' queue, so that the procedure which was added first goes in the front for
+    // the scheduler queue.
+    procedureScheduler.addFront(suspendedProcedures.descendingIterator());
+    suspendedProcedures.clear();
   }
 
+  /**
+   * Access to suspendedProcedures is 'synchronized' on this object, but it's fine to return it
+   * here for tests.
+   */
+  @VisibleForTesting
   public ProcedureDeque getSuspendedProcedures() {
     return suspendedProcedures;
   }
@@ -50,6 +119,6 @@ public class ProcedureEvent<T> {
   @Override
   public String toString() {
     return getClass().getSimpleName() + " for " + object + ", ready=" + isReady() +
-        ", " + getSuspendedProcedures();
+        ", " + suspendedProcedures;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f8867166/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
index d4314d5..79367f3 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
@@ -18,11 +18,10 @@
 
 package org.apache.hadoop.hbase.procedure2;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
-
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -53,6 +52,11 @@ public interface ProcedureScheduler {
   void addFront(Procedure proc);
 
   /**
+   * Inserts all elements in the iterator at the front of this queue.
+   */
+  void addFront(Iterator<Procedure> procedureIterator);
+
+  /**
    * Inserts the specified element at the end of this queue.
    * @param proc the Procedure to add
    */
@@ -92,36 +96,6 @@ public interface ProcedureScheduler {
   Procedure poll(long timeout, TimeUnit unit);
 
   /**
-   * Mark the event as not ready.
-   * Procedures calling waitEvent() will be suspended.
-   * @param event the event to mark as suspended/not ready
-   */
-  void suspendEvent(ProcedureEvent event);
-
-  /**
-   * Wake every procedure waiting for the specified event
-   * (By design each event has only one "wake" caller)
-   * @param event the event to wait
-   */
-  void wakeEvent(ProcedureEvent event);
-
-  /**
-   * Wake every procedure waiting for the specified events.
-   * (By design each event has only one "wake" caller)
-   * @param count the number of events in the array to wake
-   * @param events the list of events to wake
-   */
-  void wakeEvents(int count, ProcedureEvent... events);
-
-  /**
-   * Suspend the procedure if the event is not ready yet.
-   * @param event the event to wait on
-   * @param procedure the procedure waiting on the event
-   * @return true if the procedure has to wait for the event to be ready, false otherwise.
-   */
-  boolean waitEvent(ProcedureEvent event, Procedure procedure);
-
-  /**
    * List lock queues.
    * @return the locks
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/f8867166/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java
index bd310fd..d2b2b7d 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java
@@ -26,7 +26,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
-import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.Int32Value;
@@ -107,6 +106,25 @@ public class TestProcedureEvents {
     ProcedureTestingUtility.assertIsAbortException(procExecutor.getResult(proc.getProcId()));
   }
 
+  /**
+   * This Event+Procedure exhibits following behavior:
+   * <ul>
+   *   <li>On procedure execute()
+   *     <ul>
+   *       <li>If had enough timeouts, abort the procedure. Else....</li>
+   *       <li>Suspend the event and add self to its suspend queue</li>
+   *       <li>Go into waiting state</li>
+   *     </ul>
+   *   </li>
+   *   <li>
+   *     On waiting timeout
+   *     <ul>
+   *       <li>Wake the event (which adds this procedure back into scheduler queue), and set own's
+   *       state to RUNNABLE (so can be executed again).</li>
+   *     </ul>
+   *   </li>
+   * </ul>
+   */
   public static class TestTimeoutEventProcedure extends NoopProcedure<TestProcEnv> {
     private final ProcedureEvent event = new ProcedureEvent("timeout-event");
 
@@ -132,8 +150,8 @@ public class TestProcedureEvents {
         return null;
       }
 
-      env.getProcedureScheduler().suspendEvent(event);
-      if (env.getProcedureScheduler().waitEvent(event, this)) {
+      event.suspend();
+      if (event.suspendIfNotReady(this)) {
         setState(ProcedureState.WAITING_TIMEOUT);
         throw new ProcedureSuspendedException();
       }
@@ -146,15 +164,15 @@ public class TestProcedureEvents {
       int n = ntimeouts.incrementAndGet();
       LOG.info("HANDLE TIMEOUT " + this + " ntimeouts=" + n);
       setState(ProcedureState.RUNNABLE);
-      env.getProcedureScheduler().wakeEvent(event);
+      event.wake((AbstractProcedureScheduler) env.getProcedureScheduler());
       return false;
     }
 
     @Override
     protected void afterReplay(final TestProcEnv env) {
       if (getState() == ProcedureState.WAITING_TIMEOUT) {
-        env.getProcedureScheduler().suspendEvent(event);
-        env.getProcedureScheduler().waitEvent(event, this);
+        event.suspend();
+        event.suspendIfNotReady(this);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/f8867166/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSchedulerConcurrency.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSchedulerConcurrency.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSchedulerConcurrency.java
index 4217693..1c8f1eb 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSchedulerConcurrency.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSchedulerConcurrency.java
@@ -92,13 +92,13 @@ public class TestProcedureSchedulerConcurrency {
               ev[i] = waitQueue.pollFirst().getEvent();
               LOG.debug("WAKE BATCH " + ev[i] + " total=" + wakeCount.get());
             }
-            sched.wakeEvents(ev.length, ev);
+            ProcedureEvent.wakeEvents((AbstractProcedureScheduler) sched, ev);
             wakeCount.addAndGet(ev.length);
           } else {
             int size = waitQueue.size();
             while (size-- > 0) {
               ProcedureEvent ev = waitQueue.pollFirst().getEvent();
-              sched.wakeEvent(ev);
+              ev.wake(procSched);
               LOG.debug("WAKE " + ev + " total=" + wakeCount.get());
               wakeCount.incrementAndGet();
             }
@@ -122,9 +122,9 @@ public class TestProcedureSchedulerConcurrency {
             TestProcedureWithEvent proc = (TestProcedureWithEvent)sched.poll();
             if (proc == null) continue;
 
-            sched.suspendEvent(proc.getEvent());
+            proc.getEvent().suspend();
             waitQueue.add(proc);
-            sched.waitEvent(proc.getEvent(), proc);
+            proc.getEvent().suspendIfNotReady(proc);
             LOG.debug("WAIT " + proc.getEvent());
             if (waitCount.incrementAndGet() >= NRUNS) {
               break;

http://git-wip-us.apache.org/repos/asf/hbase/blob/f8867166/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
index 1c193f9..6f481e5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
@@ -252,7 +252,7 @@ public class AssignmentManager implements ServerListener {
 
     // Update meta events (for testing)
     if (hasProcExecutor) {
-      getProcedureScheduler().suspendEvent(metaLoadEvent);
+      metaLoadEvent.suspend();
       setFailoverCleanupDone(false);
       for (RegionInfo hri: getMetaRegionSet()) {
         setMetaInitialized(hri, false);
@@ -413,17 +413,16 @@ public class AssignmentManager implements ServerListener {
   }
 
   public boolean waitMetaInitialized(final Procedure proc, final RegionInfo regionInfo) {
-    return getProcedureScheduler().waitEvent(
-      getMetaInitializedEvent(getMetaForRegion(regionInfo)), proc);
+    return getMetaInitializedEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc);
   }
 
   private void setMetaInitialized(final RegionInfo metaRegionInfo, final boolean isInitialized) {
     assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
     final ProcedureEvent metaInitEvent = getMetaInitializedEvent(metaRegionInfo);
     if (isInitialized) {
-      getProcedureScheduler().wakeEvent(metaInitEvent);
+      metaInitEvent.wake(getProcedureScheduler());
     } else {
-      getProcedureScheduler().suspendEvent(metaInitEvent);
+      metaInitEvent.suspend();
     }
   }
 
@@ -434,11 +433,11 @@ public class AssignmentManager implements ServerListener {
   }
 
   public boolean waitMetaLoaded(final Procedure proc) {
-    return getProcedureScheduler().waitEvent(metaLoadEvent, proc);
+    return metaLoadEvent.suspendIfNotReady(proc);
   }
 
   protected void wakeMetaLoadedEvent() {
-    getProcedureScheduler().wakeEvent(metaLoadEvent);
+    metaLoadEvent.wake(getProcedureScheduler());
     assert isMetaLoaded() : "expected meta to be loaded";
   }
 
@@ -1011,11 +1010,11 @@ public class AssignmentManager implements ServerListener {
 
   protected boolean waitServerReportEvent(final ServerName serverName, final Procedure proc) {
     final ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
-    return getProcedureScheduler().waitEvent(serverNode.getReportEvent(), proc);
+    return serverNode.getReportEvent().suspendIfNotReady(proc);
   }
 
   protected void wakeServerReportEvent(final ServerStateNode serverNode) {
-    getProcedureScheduler().wakeEvent(serverNode.getReportEvent());
+    serverNode.getReportEvent().wake(getProcedureScheduler());
   }
 
   // ============================================================================================
@@ -1588,7 +1587,7 @@ public class AssignmentManager implements ServerListener {
    * and each region will be assigned by a server using the balancer.
    */
   protected void queueAssign(final RegionStateNode regionNode) {
-    getProcedureScheduler().suspendEvent(regionNode.getProcedureEvent());
+    regionNode.getProcedureEvent().suspend();
 
     // TODO: quick-start for meta and the other sys-tables?
     assignQueueLock.lock();
@@ -1787,7 +1786,7 @@ public class AssignmentManager implements ServerListener {
         events[evcount++] = regionNode.getProcedureEvent();
       }
     }
-    getProcedureScheduler().wakeEvents(evcount, events);
+    ProcedureEvent.wakeEvents(getProcedureScheduler(), events);
 
     final long et = System.currentTimeMillis();
     if (LOG.isTraceEnabled()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/f8867166/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
index 6f54dcf..05b8104 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
@@ -190,7 +190,7 @@ public abstract class RegionTransitionProcedure
       // NOTE: This call to wakeEvent puts this Procedure back on the scheduler.
       // Thereafter, another Worker can be in here so DO NOT MESS WITH STATE beyond
       // this method. Just get out of this current processing quickly.
-      env.getProcedureScheduler().wakeEvent(regionNode.getProcedureEvent());
+      regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
     }
     // else leave the procedure in suspended state; it is waiting on another call to this callback
   }
@@ -214,7 +214,7 @@ public abstract class RegionTransitionProcedure
 
     // Put this procedure into suspended mode to wait on report of state change
     // from remote regionserver. Means Procedure associated ProcedureEvent is marked not 'ready'.
-    env.getProcedureScheduler().suspendEvent(getRegionState(env).getProcedureEvent());
+    getRegionState(env).getProcedureEvent().suspend();
 
     // Tricky because the below call to addOperationToNode can fail. If it fails, we need to
     // backtrack on stuff like the 'suspend' done above -- tricky as the 'wake' requeues us -- and
@@ -253,7 +253,7 @@ public abstract class RegionTransitionProcedure
     // processing to the next stage. At an extreme, the other worker may run in
     // parallel so DO  NOT CHANGE any state hereafter! This should be last thing
     // done in this processing step.
-    env.getProcedureScheduler().wakeEvent(regionNode.getProcedureEvent());
+    regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
   }
 
   protected boolean isServerOnline(final MasterProcedureEnv env, final RegionStateNode regionNode) {
@@ -298,7 +298,7 @@ public abstract class RegionTransitionProcedure
               return null;
             }
             transitionState = RegionTransitionState.REGION_TRANSITION_DISPATCH;
-            if (env.getProcedureScheduler().waitEvent(regionNode.getProcedureEvent(), this)) {
+            if (regionNode.getProcedureEvent().suspendIfNotReady(this)) {
               // Why this suspend? Because we want to ensure Store happens before proceed?
               throw new ProcedureSuspendedException();
             }
@@ -315,7 +315,7 @@ public abstract class RegionTransitionProcedure
               retry = true;
               break;
             }
-            if (env.getProcedureScheduler().waitEvent(regionNode.getProcedureEvent(), this)) {
+            if (regionNode.getProcedureEvent().suspendIfNotReady(this)) {
               throw new ProcedureSuspendedException();
             }
             break;

http://git-wip-us.apache.org/repos/asf/hbase/blob/f8867166/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java
index c9b8ef9..61843d8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java
@@ -209,7 +209,7 @@ public final class LockProcedure extends Procedure<MasterProcedureEnv>
       if (!event.isReady()) {  // Maybe unlock() awakened the event.
         setState(ProcedureProtos.ProcedureState.RUNNABLE);
         if (LOG.isDebugEnabled()) LOG.debug("Calling wake on " + this.event);
-        env.getProcedureScheduler().wakeEvent(event);
+        event.wake(env.getProcedureScheduler());
       }
     }
     return false;  // false: do not mark the procedure as failed.
@@ -224,7 +224,7 @@ public final class LockProcedure extends Procedure<MasterProcedureEnv>
     synchronized (event) {
       if (!event.isReady()) {
         setState(ProcedureProtos.ProcedureState.RUNNABLE);
-        env.getProcedureScheduler().wakeEvent(event);
+        event.wake(env.getProcedureScheduler());
       }
     }
   }
@@ -244,8 +244,8 @@ public final class LockProcedure extends Procedure<MasterProcedureEnv>
       return null;
     }
     synchronized (event) {
-      env.getProcedureScheduler().suspendEvent(event);
-      env.getProcedureScheduler().waitEvent(event, this);
+      event.suspend();
+      event.suspendIfNotReady(this);
       setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
     }
     throw new ProcedureSuspendedException();

http://git-wip-us.apache.org/repos/asf/hbase/blob/f8867166/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
index f294f57..c9c3ac9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
@@ -147,25 +147,25 @@ public class MasterProcedureEnv implements ConfigurationObserver {
   }
 
   public boolean waitInitialized(Procedure proc) {
-    return procSched.waitEvent(master.getInitializedEvent(), proc);
+    return master.getInitializedEvent().suspendIfNotReady(proc);
   }
 
   public boolean waitServerCrashProcessingEnabled(Procedure proc) {
     if (master instanceof HMaster) {
-      return procSched.waitEvent(((HMaster)master).getServerCrashProcessingEnabledEvent(), proc);
+      return ((HMaster)master).getServerCrashProcessingEnabledEvent().suspendIfNotReady(proc);
     }
     return false;
   }
 
   public boolean waitFailoverCleanup(Procedure proc) {
-    return procSched.waitEvent(master.getAssignmentManager().getFailoverCleanupEvent(), proc);
+    return master.getAssignmentManager().getFailoverCleanupEvent().suspendIfNotReady(proc);
   }
 
   public void setEventReady(ProcedureEvent event, boolean isReady) {
     if (isReady) {
-      procSched.wakeEvent(event);
+      event.wake(procSched);
     } else {
-      procSched.suspendEvent(event);
+      event.suspend();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/f8867166/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java
index b7bc28f..b0a598e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java
@@ -168,7 +168,7 @@ public class TestMasterProcedureEvents {
 
     // wake the event
     LOG.debug("wake " + event);
-    procSched.wakeEvent(event);
+    event.wake(procSched);
     assertEquals(true, event.isReady());
 
     // wait until proc completes

http://git-wip-us.apache.org/repos/asf/hbase/blob/f8867166/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
index e2d6b0c..d971b5f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
@@ -572,14 +572,14 @@ public class TestMasterProcedureScheduler {
 
     // suspend
     ProcedureEvent event = new ProcedureEvent("testSuspendedProcedureEvent");
-    assertEquals(true, queue.waitEvent(event, proc));
+    assertEquals(true, event.suspendIfNotReady(proc));
 
     proc = queue.poll();
     assertEquals(2, proc.getProcId());
     assertEquals(null, queue.poll(0));
 
     // resume
-    queue.wakeEvent(event);
+    event.wake(queue);
 
     proc = queue.poll();
     assertEquals(1, proc.getProcId());

http://git-wip-us.apache.org/repos/asf/hbase/blob/f8867166/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
index d046a13..035fb9e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
@@ -467,21 +467,14 @@ public class TestRegionMergeTransactionOnCluster {
     verifyRowCount(table, ROWSIZE);
     LOG.info("Verified " + table.getName());
 
-    // Sleep here is an ugly hack to allow region transitions to finish
-    long timeout = System.currentTimeMillis() + waitTime;
     List<Pair<RegionInfo, ServerName>> tableRegions;
-    while (System.currentTimeMillis() < timeout) {
-      tableRegions = MetaTableAccessor.getTableRegionsAndLocations(
-          TEST_UTIL.getConnection(), tablename);
-      LOG.info("Found " + tableRegions.size() + ", expecting " + numRegions * replication);
-      if (tableRegions.size() == numRegions * replication)
-        break;
-      Thread.sleep(250);
-    }
-    LOG.info("Getting regions of " + table.getName());
+    TEST_UTIL.waitUntilAllRegionsAssigned(tablename);
+    LOG.info("All regions assigned for table - " + table.getName());
     tableRegions = MetaTableAccessor.getTableRegionsAndLocations(
         TEST_UTIL.getConnection(), tablename);
-    LOG.info("Regions after load: " + Joiner.on(',').join(tableRegions));
+    assertEquals("Wrong number of regions in table " + tablename,
+        numRegions * replication, tableRegions.size());
+    LOG.info(tableRegions.size() + "Regions after load: " + Joiner.on(',').join(tableRegions));
     assertEquals(numRegions * replication, tableRegions.size());
     return table;
   }