You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/07/25 06:42:41 UTC

[2/2] hbase git commit: HBASE-20846 Restore procedure locks when master restarts

HBASE-20846 Restore procedure locks when master restarts


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f3f17fa1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f3f17fa1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f3f17fa1

Branch: refs/heads/master
Commit: f3f17fa111f37233ddc42ddb9c38594e35d8d501
Parents: e44f506
Author: zhangduo <zh...@apache.org>
Authored: Sun Jul 22 15:10:06 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Wed Jul 25 14:37:26 2018 +0800

----------------------------------------------------------------------
 .../procedure2/AbstractProcedureScheduler.java  |   2 +-
 .../hbase/procedure2/DelayedProcedure.java      |   5 +-
 .../hadoop/hbase/procedure2/Procedure.java      | 384 +++++++++-------
 .../hbase/procedure2/ProcedureExecutor.java     | 439 ++++++++++---------
 .../hadoop/hbase/procedure2/ProcedureUtil.java  |   7 +
 .../hbase/procedure2/RootProcedureState.java    |  44 +-
 .../hbase/procedure2/TimeoutExecutorThread.java |  28 +-
 .../procedure2/TestProcedureReplayOrder.java    |   8 +-
 .../procedure2/TestProcedureSuspended.java      |   6 -
 .../src/main/protobuf/Procedure.proto           |   3 +
 .../hbase/master/ClusterSchemaServiceImpl.java  |   4 +-
 .../org/apache/hadoop/hbase/master/HMaster.java |   7 +-
 .../master/assignment/GCRegionProcedure.java    |   5 -
 .../assignment/MergeTableRegionsProcedure.java  |  12 +-
 .../assignment/RegionTransitionProcedure.java   |  33 +-
 .../hbase/master/locking/LockProcedure.java     |   9 -
 .../AbstractStateMachineNamespaceProcedure.java |   6 +-
 .../AbstractStateMachineRegionProcedure.java    |   9 -
 .../AbstractStateMachineTableProcedure.java     |   8 +-
 .../procedure/CreateNamespaceProcedure.java     |  35 +-
 .../master/procedure/CreateTableProcedure.java  |  12 +-
 .../master/procedure/InitMetaProcedure.java     |   7 +-
 .../procedure/MasterProcedureScheduler.java     |  37 +-
 .../master/procedure/MasterProcedureUtil.java   |   2 +-
 .../hbase/master/procedure/PeerQueue.java       |  14 -
 .../master/procedure/ProcedureSyncWait.java     |   4 +-
 .../hadoop/hbase/master/procedure/Queue.java    |  13 +-
 .../replication/AbstractPeerProcedure.java      |  14 +-
 .../hbase-webapps/master/procedures.jsp         |   2 +-
 .../hbase/client/TestGetProcedureResult.java    |   2 +-
 .../assignment/TestAssignmentManager.java       |   3 +-
 .../procedure/TestMasterProcedureEvents.java    |   2 +-
 .../master/procedure/TestProcedureAdmin.java    |   2 +-
 .../hbase/procedure/TestFailedProcCleanup.java  |   5 +-
 .../security/access/TestAccessController.java   |   2 +-
 35 files changed, 624 insertions(+), 551 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f3f17fa1/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
index c036163..5645f89 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
@@ -163,8 +163,8 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
           return null;
         }
       }
-
       final Procedure pollResult = dequeue();
+
       pollCalls++;
       nullPollCalls += (pollResult == null) ? 1 : 0;
       return pollResult;

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3f17fa1/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/DelayedProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/DelayedProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/DelayedProcedure.java
index a9f3e7d..3fc9750 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/DelayedProcedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/DelayedProcedure.java
@@ -24,8 +24,9 @@ import org.apache.yetus.audience.InterfaceAudience;
  * Vessel that carries a Procedure and a timeout.
  */
 @InterfaceAudience.Private
-class DelayedProcedure extends DelayedUtil.DelayedContainerWithTimestamp<Procedure<?>> {
-  public DelayedProcedure(Procedure<?> procedure) {
+class DelayedProcedure<TEnvironment>
+    extends DelayedUtil.DelayedContainerWithTimestamp<Procedure<TEnvironment>> {
+  public DelayedProcedure(Procedure<TEnvironment> procedure) {
     super(procedure, procedure.getTimeoutTimestamp());
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3f17fa1/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
index 545bedf..58757bb 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
@@ -22,76 +22,94 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
 import org.apache.hadoop.hbase.metrics.Counter;
 import org.apache.hadoop.hbase.metrics.Histogram;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
 import org.apache.hadoop.hbase.procedure2.util.StringUtils;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.NonceKey;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
+
 /**
- * Base Procedure class responsible for Procedure Metadata;
- * e.g. state, submittedTime, lastUpdate, stack-indexes, etc.
- *
- * <p>Procedures are run by a {@link ProcedureExecutor} instance. They are submitted and then
- * the ProcedureExecutor keeps calling {@link #execute(Object)} until the Procedure is done.
- * Execute may be called multiple times in the case of failure or a restart, so code must be
- * idempotent. The return from an execute call is either: null to indicate we are done;
- * ourself if there is more to do; or, a set of sub-procedures that need to
- * be run to completion before the framework resumes our execution.
- *
- * <p>The ProcedureExecutor keeps its
- * notion of Procedure State in the Procedure itself; e.g. it stamps the Procedure as INITIALIZING,
- * RUNNABLE, SUCCESS, etc. Here are some of the States defined in the ProcedureState enum from
- * protos:
- *<ul>
- * <li>{@link #isFailed()} A procedure has executed at least once and has failed. The procedure
- * may or may not have rolled back yet. Any procedure in FAILED state will be eventually moved
- * to ROLLEDBACK state.</li>
- *
+ * Base Procedure class responsible for Procedure Metadata; e.g. state, submittedTime, lastUpdate,
+ * stack-indexes, etc.
+ * <p/>
+ * Procedures are run by a {@link ProcedureExecutor} instance. They are submitted and then the
+ * ProcedureExecutor keeps calling {@link #execute(Object)} until the Procedure is done. Execute may
+ * be called multiple times in the case of failure or a restart, so code must be idempotent. The
+ * return from an execute call is either: null to indicate we are done; ourself if there is more to
+ * do; or, a set of sub-procedures that need to be run to completion before the framework resumes
+ * our execution.
+ * <p/>
+ * The ProcedureExecutor keeps its notion of Procedure State in the Procedure itself; e.g. it stamps
+ * the Procedure as INITIALIZING, RUNNABLE, SUCCESS, etc. Here are some of the States defined in the
+ * ProcedureState enum from protos:
+ * <ul>
+ * <li>{@link #isFailed()} A procedure has executed at least once and has failed. The procedure may
+ * or may not have rolled back yet. Any procedure in FAILED state will be eventually moved to
+ * ROLLEDBACK state.</li>
  * <li>{@link #isSuccess()} A procedure is completed successfully without exception.</li>
- *
  * <li>{@link #isFinished()} As a procedure in FAILED state will be tried forever for rollback, only
  * condition when scheduler/ executor will drop procedure from further processing is when procedure
  * state is ROLLEDBACK or isSuccess() returns true. This is a terminal state of the procedure.</li>
- *
  * <li>{@link #isWaiting()} - Procedure is in one of the two waiting states
  * ({@link ProcedureState#WAITING}, {@link ProcedureState#WAITING_TIMEOUT}).</li>
- *</ul>
- * NOTE: These states are of the ProcedureExecutor. Procedure implementations in turn can keep
- * their own state. This can lead to confusion. Try to keep the two distinct.
- *
- * <p>rollback() is called when the procedure or one of the sub-procedures
- * has failed. The rollback step is supposed to cleanup the resources created
- * during the execute() step. In case of failure and restart, rollback() may be
- * called multiple times, so again the code must be idempotent.
- *
- * <p>Procedure can be made respect a locking regime. It has acquire/release methods as
- * well as an {@link #hasLock(Object)}. The lock implementation is up to the implementor.
- * If an entity needs to be locked for the life of a procedure -- not just the calls to
- * execute -- then implementations should say so with the {@link #holdLock(Object)}
- * method.
- *
- * <p>Procedures can be suspended or put in wait state with a callback that gets executed on
+ * </ul>
+ * NOTE: These states are of the ProcedureExecutor. Procedure implementations in turn can keep their
+ * own state. This can lead to confusion. Try to keep the two distinct.
+ * <p/>
+ * rollback() is called when the procedure or one of the sub-procedures has failed. The rollback
+ * step is supposed to cleanup the resources created during the execute() step. In case of failure
+ * and restart, rollback() may be called multiple times, so again the code must be idempotent.
+ * <p/>
+ * Procedure can be made respect a locking regime. It has acquire/release methods as well as an
+ * {@link #hasLock()}. The lock implementation is up to the implementor. If an entity needs to be
+ * locked for the life of a procedure -- not just the calls to execute -- then implementations
+ * should say so with the {@link #holdLock(Object)} method.
+ * <p/>
+ * And since we need to restore the lock when restarting to keep the logic correct(HBASE-20846), the
+ * implementation is a bit tricky so we add some comments hrre about it.
+ * <ul>
+ * <li>Make {@link #hasLock()} method final, and add a {@link #locked} field in Procedure to record
+ * whether we have the lock. We will set it to {@code true} in
+ * {@link #doAcquireLock(Object, ProcedureStore)} and to {@code false} in
+ * {@link #doReleaseLock(Object, ProcedureStore)}. The sub classes do not need to manage it any
+ * more.</li>
+ * <li>Also added a locked field in the proto message. When storing, the field will be set according
+ * to the return value of {@link #hasLock()}. And when loading, there is a new field in Procedure
+ * called {@link #lockedWhenLoading}. We will set it to {@code true} if the locked field in proto
+ * message is {@code true}.</li>
+ * <li>The reason why we can not set the {@link #locked} field directly to {@code true} by calling
+ * {@link #doAcquireLock(Object, ProcedureStore)} is that, during initialization, most procedures
+ * need to wait until master is initialized. So the solution here is that, we introduced a new
+ * method called {@link #waitInitialized(Object)} in Procedure, and move the wait master initialized
+ * related code from {@link #acquireLock(Object)} to this method. And we added a restoreLock method
+ * to Procedure, if {@link #lockedWhenLoading} is {@code true}, we will call the
+ * {@link #acquireLock(Object)} to get the lock, but do not set {@link #locked} to true. And later
+ * when we call {@link #doAcquireLock(Object, ProcedureStore)} and pass the
+ * {@link #waitInitialized(Object)} check, we will test {@link #lockedWhenLoading}, if it is
+ * {@code true}, when we just set the {@link #locked} field to true and return, without actually
+ * calling the {@link #acquireLock(Object)} method since we have already called it once.</li>
+ * </ul>
+ * <p/>
+ * Procedures can be suspended or put in wait state with a callback that gets executed on
  * Procedure-specified timeout. See {@link #setTimeout(int)}}, and
- * {@link #setTimeoutFailure(Object)}. See TestProcedureEvents and the
- * TestTimeoutEventProcedure class for an example usage.</p>
- *
- * <p>There are hooks for collecting metrics on submit of the procedure and on finish.
- * See {@link #updateMetricsOnSubmit(Object)} and
- * {@link #updateMetricsOnFinish(Object, long, boolean)}.
+ * {@link #setTimeoutFailure(Object)}. See TestProcedureEvents and the TestTimeoutEventProcedure
+ * class for an example usage.
+ * </p>
+ * <p/>
+ * There are hooks for collecting metrics on submit of the procedure and on finish. See
+ * {@link #updateMetricsOnSubmit(Object)} and {@link #updateMetricsOnFinish(Object, long, boolean)}.
  */
 @InterfaceAudience.Private
-@InterfaceStability.Evolving
 public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TEnvironment>> {
   private static final Logger LOG = LoggerFactory.getLogger(Procedure.class);
   public static final long NO_PROC_ID = -1;
@@ -122,6 +140,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
 
   private volatile byte[] result = null;
 
+  private volatile boolean locked = false;
+
+  private boolean lockedWhenLoading = false;
+
   /**
    * The main code of the procedure. It must be idempotent since execute()
    * may be called multiple times in case of machine failure in the middle
@@ -170,7 +192,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
    * be able to resume on failure.
    * @param serializer stores the serializable state
    */
-  protected abstract void serializeStateData(final ProcedureStateSerializer serializer)
+  protected abstract void serializeStateData(ProcedureStateSerializer serializer)
     throws IOException;
 
   /**
@@ -178,52 +200,65 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
    * state.
    * @param serializer contains the serialized state
    */
-  protected abstract void deserializeStateData(final ProcedureStateSerializer serializer)
+  protected abstract void deserializeStateData(ProcedureStateSerializer serializer)
     throws IOException;
 
   /**
-   * The user should override this method if they need a lock on an Entity.
-   * A lock can be anything, and it is up to the implementor. The Procedure
-   * Framework will call this method just before it invokes {@link #execute(Object)}.
-   * It calls {@link #releaseLock(Object)} after the call to execute.
-   *
-   * <p>If you need to hold the lock for the life of the Procedure -- i.e. you do not
-   * want any other Procedure interfering while this Procedure is running, see
-   * {@link #holdLock(Object)}.
-   *
-   * <p>Example: in our Master we can execute request in parallel for different tables.
-   * We can create t1 and create t2 and these creates can be executed at the same time.
-   * Anything else on t1/t2 is queued waiting that specific table create to happen.
-   *
-   * <p>There are 3 LockState:
-   * <ul><li>LOCK_ACQUIRED should be returned when the proc has the lock and the proc is
-   * ready to execute.</li>
-   * <li>LOCK_YIELD_WAIT should be returned when the proc has not the lock and the framework
-   * should take care of readding the procedure back to the runnable set for retry</li>
-   * <li>LOCK_EVENT_WAIT should be returned when the proc has not the lock and someone will
-   * take care of readding the procedure back to the runnable set when the lock is available.
-   * </li></ul>
+   * The {@link #doAcquireLock(Object, ProcedureStore)} will be split into two steps, first, it will
+   * call us to determine whether we need to wait for initialization, second, it will call
+   * {@link #acquireLock(Object)} to actually handle the lock for this procedure.
+   * <p/>
+   * This is because that when master restarts, we need to restore the lock state for all the
+   * procedures to not break the semantic if {@link #holdLock(Object)} is true. But the
+   * {@link ProcedureExecutor} will be started before the master finish initialization(as it is part
+   * of the initialization!), so we need to split the code into two steps, and when restore, we just
+   * restore the lock part and ignore the waitInitialized part. Otherwise there will be dead lock.
+   * @return true means we need to wait until the environment has been initialized, otherwise true.
+   */
+  protected boolean waitInitialized(TEnvironment env) {
+    return false;
+  }
+
+  /**
+   * The user should override this method if they need a lock on an Entity. A lock can be anything,
+   * and it is up to the implementor. The Procedure Framework will call this method just before it
+   * invokes {@link #execute(Object)}. It calls {@link #releaseLock(Object)} after the call to
+   * execute.
+   * <p/>
+   * If you need to hold the lock for the life of the Procedure -- i.e. you do not want any other
+   * Procedure interfering while this Procedure is running, see {@link #holdLock(Object)}.
+   * <p/>
+   * Example: in our Master we can execute request in parallel for different tables. We can create
+   * t1 and create t2 and these creates can be executed at the same time. Anything else on t1/t2 is
+   * queued waiting that specific table create to happen.
+   * <p/>
+   * There are 3 LockState:
+   * <ul>
+   * <li>LOCK_ACQUIRED should be returned when the proc has the lock and the proc is ready to
+   * execute.</li>
+   * <li>LOCK_YIELD_WAIT should be returned when the proc has not the lock and the framework should
+   * take care of readding the procedure back to the runnable set for retry</li>
+   * <li>LOCK_EVENT_WAIT should be returned when the proc has not the lock and someone will take
+   * care of readding the procedure back to the runnable set when the lock is available.</li>
+   * </ul>
    * @return the lock state as described above.
    */
-  protected LockState acquireLock(final TEnvironment env) {
+  protected LockState acquireLock(TEnvironment env) {
     return LockState.LOCK_ACQUIRED;
   }
 
   /**
    * The user should override this method, and release lock if necessary.
    */
-  protected void releaseLock(final TEnvironment env) {
+  protected void releaseLock(TEnvironment env) {
     // no-op
   }
 
   /**
    * Used to keep the procedure lock even when the procedure is yielding or suspended.
-   * Must implement {@link #hasLock(Object)} if you want to hold the lock for life
-   * of the Procedure.
-   * @see #hasLock(Object)
    * @return true if the procedure should hold on the lock until completionCleanup()
    */
-  protected boolean holdLock(final TEnvironment env) {
+  protected boolean holdLock(TEnvironment env) {
     return false;
   }
 
@@ -235,8 +270,8 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
    * @see #holdLock(Object)
    * @return true if the procedure has the lock, false otherwise.
    */
-  protected boolean hasLock(final TEnvironment env) {
-    return false;
+  protected final boolean hasLock() {
+    return locked;
   }
 
   /**
@@ -245,7 +280,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
    * operation before replay.
    * e.g. failing the procedure if the state on replay may be unknown.
    */
-  protected void beforeReplay(final TEnvironment env) {
+  protected void beforeReplay(TEnvironment env) {
     // no-op
   }
 
@@ -253,7 +288,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
    * Called when the procedure is ready to be added to the queue after
    * the loading/replay operation.
    */
-  protected void afterReplay(final TEnvironment env) {
+  protected void afterReplay(TEnvironment env) {
     // no-op
   }
 
@@ -263,7 +298,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
    * This operation will not be retried on failure. If a procedure took a lock,
    * it will have been released when this method runs.
    */
-  protected void completionCleanup(final TEnvironment env) {
+  protected void completionCleanup(TEnvironment env) {
     // no-op
   }
 
@@ -275,7 +310,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
    * @return Return true if the executor should yield on completion of an execution step.
    *         Defaults to return false.
    */
-  protected boolean isYieldAfterExecutionStep(final TEnvironment env) {
+  protected boolean isYieldAfterExecutionStep(TEnvironment env) {
     return false;
   }
 
@@ -288,7 +323,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
    * @return true if the executor should wait the client ack for the result.
    *         Defaults to return true.
    */
-  protected boolean shouldWaitClientAck(final TEnvironment env) {
+  protected boolean shouldWaitClientAck(TEnvironment env) {
     return true;
   }
 
@@ -298,7 +333,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
    * @param env The environment passed to the procedure executor
    * @return Container object for procedure related metric
    */
-  protected ProcedureMetrics getProcedureMetrics(final TEnvironment env) {
+  protected ProcedureMetrics getProcedureMetrics(TEnvironment env) {
     return null;
   }
 
@@ -308,7 +343,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
    * updates submitted counter if {@link #getProcedureMetrics(Object)} returns non-null
    * {@link ProcedureMetrics}.
    */
-  protected void updateMetricsOnSubmit(final TEnvironment env) {
+  protected void updateMetricsOnSubmit(TEnvironment env) {
     ProcedureMetrics metrics = getProcedureMetrics(env);
     if (metrics == null) {
       return;
@@ -322,21 +357,19 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
 
   /**
    * This function will be called just after procedure execution is finished. Override this method
-   * to update metrics at the end of the procedure. If {@link #getProcedureMetrics(Object)}
-   * returns non-null {@link ProcedureMetrics}, the default implementation adds runtime of a
-   * procedure to a time histogram for successfully completed procedures. Increments failed
-   * counter for failed procedures.
-   *
-   * TODO: As any of the sub-procedures on failure rolls back all procedures in the stack,
-   * including successfully finished siblings, this function may get called twice in certain
-   * cases for certain procedures. Explore further if this can be called once.
-   *
+   * to update metrics at the end of the procedure. If {@link #getProcedureMetrics(Object)} returns
+   * non-null {@link ProcedureMetrics}, the default implementation adds runtime of a procedure to a
+   * time histogram for successfully completed procedures. Increments failed counter for failed
+   * procedures.
+   * <p/>
+   * TODO: As any of the sub-procedures on failure rolls back all procedures in the stack, including
+   * successfully finished siblings, this function may get called twice in certain cases for certain
+   * procedures. Explore further if this can be called once.
    * @param env The environment passed to the procedure executor
    * @param runtime Runtime of the procedure in milliseconds
    * @param success true if procedure is completed successfully
    */
-  protected void updateMetricsOnFinish(final TEnvironment env, final long runtime,
-                                       boolean success) {
+  protected void updateMetricsOnFinish(TEnvironment env, long runtime, boolean success) {
     ProcedureMetrics metrics = getProcedureMetrics(env);
     if (metrics == null) {
       return;
@@ -362,8 +395,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
   }
 
   /**
-   * Build the StringBuilder for the simple form of
-   * procedure string.
+   * Build the StringBuilder for the simple form of procedure string.
    * @return the StringBuilder
    */
   protected StringBuilder toStringSimpleSB() {
@@ -389,6 +421,8 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
     sb.append(", state="); // pState for Procedure State as opposed to any other kind.
     toStringState(sb);
 
+    sb.append(", hasLock=").append(locked);
+
     if (hasException()) {
       sb.append(", exception=" + getException());
     }
@@ -400,8 +434,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
   }
 
   /**
-   * Extend the toString() information with more procedure
-   * details
+   * Extend the toString() information with more procedure details
    */
   public String toStringDetails() {
     final StringBuilder sb = toStringSimpleSB();
@@ -429,8 +462,8 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
   }
 
   /**
-   * Called from {@link #toString()} when interpolating {@link Procedure} State.
-   * Allows decorating generic Procedure State with Procedure particulars.
+   * Called from {@link #toString()} when interpolating {@link Procedure} State. Allows decorating
+   * generic Procedure State with Procedure particulars.
    * @param builder Append current {@link ProcedureState}
    */
   protected void toStringState(StringBuilder builder) {
@@ -493,8 +526,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
    * Called by the ProcedureExecutor to assign the ID to the newly created procedure.
    */
   @VisibleForTesting
-  @InterfaceAudience.Private
-  protected void setProcId(final long procId) {
+  protected void setProcId(long procId) {
     this.procId = procId;
     this.submittedTime = EnvironmentEdgeManager.currentTime();
     setState(ProcedureState.RUNNABLE);
@@ -503,13 +535,11 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
   /**
    * Called by the ProcedureExecutor to assign the parent to the newly created procedure.
    */
-  @InterfaceAudience.Private
-  protected void setParentProcId(final long parentProcId) {
+  protected void setParentProcId(long parentProcId) {
     this.parentProcId = parentProcId;
   }
 
-  @InterfaceAudience.Private
-  protected void setRootProcId(final long rootProcId) {
+  protected void setRootProcId(long rootProcId) {
     this.rootProcId = rootProcId;
   }
 
@@ -517,18 +547,16 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
    * Called by the ProcedureExecutor to set the value to the newly created procedure.
    */
   @VisibleForTesting
-  @InterfaceAudience.Private
-  protected void setNonceKey(final NonceKey nonceKey) {
+  protected void setNonceKey(NonceKey nonceKey) {
     this.nonceKey = nonceKey;
   }
 
   @VisibleForTesting
-  @InterfaceAudience.Private
-  public void setOwner(final String owner) {
+  public void setOwner(String owner) {
     this.owner = StringUtils.isEmpty(owner) ? null : owner;
   }
 
-  public void setOwner(final User owner) {
+  public void setOwner(User owner) {
     assert owner != null : "expected owner to be not null";
     setOwner(owner.getShortName());
   }
@@ -537,8 +565,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
    * Called on store load to initialize the Procedure internals after
    * the creation/deserialization.
    */
-  @InterfaceAudience.Private
-  protected void setSubmittedTime(final long submittedTime) {
+  protected void setSubmittedTime(long submittedTime) {
     this.submittedTime = submittedTime;
   }
 
@@ -548,7 +575,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
   /**
    * @param timeout timeout interval in msec
    */
-  protected void setTimeout(final int timeout) {
+  protected void setTimeout(int timeout) {
     this.timeout = timeout;
   }
 
@@ -567,15 +594,13 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
    * Called on store load to initialize the Procedure internals after
    * the creation/deserialization.
    */
-  @InterfaceAudience.Private
-  protected void setLastUpdate(final long lastUpdate) {
+  protected void setLastUpdate(long lastUpdate) {
     this.lastUpdate = lastUpdate;
   }
 
   /**
    * Called by ProcedureExecutor after each time a procedure step is executed.
    */
-  @InterfaceAudience.Private
   protected void updateTimestamp() {
     this.lastUpdate = EnvironmentEdgeManager.currentTime();
   }
@@ -590,7 +615,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
    * the procedure is in the waiting queue.
    * @return the timestamp of the next timeout.
    */
-  @InterfaceAudience.Private
   protected long getTimeoutTimestamp() {
     return getLastUpdate() + getTimeout();
   }
@@ -616,10 +640,19 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
    * The procedure may leave a "result" on completion.
    * @param result the serialized result that will be passed to the client
    */
-  protected void setResult(final byte[] result) {
+  protected void setResult(byte[] result) {
     this.result = result;
   }
 
+  /**
+   * Will only be called when loading procedures from procedure store, where we need to record
+   * whether the procedure has already held a lock. Later we will call
+   * {@link #doAcquireLock(Object)} to actually acquire the lock.
+   */
+  final void lockedWhenLoading() {
+    this.lockedWhenLoading = true;
+  }
+
   // ==============================================================================================
   //  Runtime state, updated every operation by the ProcedureExecutor
   //
@@ -677,13 +710,11 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
   }
 
   @VisibleForTesting
-  @InterfaceAudience.Private
   protected synchronized void setState(final ProcedureState state) {
     this.state = state;
     updateTimestamp();
   }
 
-  @InterfaceAudience.Private
   public synchronized ProcedureState getState() {
     return state;
   }
@@ -705,10 +736,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
 
   /**
    * Called by the ProcedureExecutor when the timeout set by setTimeout() is expired.
-   * @return true to let the framework handle the timeout as abort,
-   *         false in case the procedure handled the timeout itself.
+   * @return true to let the framework handle the timeout as abort, false in case the procedure
+   *         handled the timeout itself.
    */
-  protected synchronized boolean setTimeoutFailure(final TEnvironment env) {
+  protected synchronized boolean setTimeoutFailure(TEnvironment env) {
     if (state == ProcedureState.WAITING_TIMEOUT) {
       long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate;
       setFailure("ProcedureExecutor", new TimeoutIOException(
@@ -729,8 +760,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
   /**
    * Called by the ProcedureExecutor on procedure-load to restore the latch state
    */
-  @InterfaceAudience.Private
-  protected synchronized void setChildrenLatch(final int numChildren) {
+  protected synchronized void setChildrenLatch(int numChildren) {
     this.childrenLatch = numChildren;
     if (LOG.isTraceEnabled()) {
       LOG.trace("CHILD LATCH INCREMENT SET " +
@@ -741,7 +771,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
   /**
    * Called by the ProcedureExecutor on procedure-load to restore the latch state
    */
-  @InterfaceAudience.Private
   protected synchronized void incChildrenLatch() {
     // TODO: can this be inferred from the stack? I think so...
     this.childrenLatch++;
@@ -753,7 +782,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
   /**
    * Called by the ProcedureExecutor to notify that one of the sub-procedures has completed.
    */
-  @InterfaceAudience.Private
   private synchronized boolean childrenCountDown() {
     assert childrenLatch > 0: this;
     boolean b = --childrenLatch == 0;
@@ -770,17 +798,18 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
    */
   synchronized boolean tryRunnable() {
     // Don't use isWaiting in the below; it returns true for WAITING and WAITING_TIMEOUT
-    boolean b = getState() == ProcedureState.WAITING && childrenCountDown();
-    if (b) setState(ProcedureState.RUNNABLE);
-    return b;
+    if (getState() == ProcedureState.WAITING && childrenCountDown()) {
+      setState(ProcedureState.RUNNABLE);
+      return true;
+    } else {
+      return false;
+    }
   }
 
-  @InterfaceAudience.Private
   protected synchronized boolean hasChildren() {
     return childrenLatch > 0;
   }
 
-  @InterfaceAudience.Private
   protected synchronized int getChildrenLatch() {
     return childrenLatch;
   }
@@ -789,7 +818,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
    * Called by the RootProcedureState on procedure execution.
    * Each procedure store its stack-index positions.
    */
-  @InterfaceAudience.Private
   protected synchronized void addStackIndex(final int index) {
     if (stackIndexes == null) {
       stackIndexes = new int[] { index };
@@ -800,7 +828,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
     }
   }
 
-  @InterfaceAudience.Private
   protected synchronized boolean removeStackIndex() {
     if (stackIndexes != null && stackIndexes.length > 1) {
       stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1);
@@ -815,7 +842,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
    * Called on store load to initialize the Procedure internals after
    * the creation/deserialization.
    */
-  @InterfaceAudience.Private
   protected synchronized void setStackIndexes(final List<Integer> stackIndexes) {
     this.stackIndexes = new int[stackIndexes.size()];
     for (int i = 0; i < this.stackIndexes.length; ++i) {
@@ -823,12 +849,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
     }
   }
 
-  @InterfaceAudience.Private
   protected synchronized boolean wasExecuted() {
     return stackIndexes != null;
   }
 
-  @InterfaceAudience.Private
   protected synchronized int[] getStackIndexes() {
     return stackIndexes;
   }
@@ -840,10 +864,9 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
   /**
    * Internal method called by the ProcedureExecutor that starts the user-level code execute().
    * @throws ProcedureSuspendedException This is used when procedure wants to halt processing and
-   * skip out without changing states or releasing any locks held.
+   *           skip out without changing states or releasing any locks held.
    */
-  @InterfaceAudience.Private
-  protected Procedure<TEnvironment>[] doExecute(final TEnvironment env)
+  protected Procedure<TEnvironment>[] doExecute(TEnvironment env)
       throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
     try {
       updateTimestamp();
@@ -856,8 +879,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
   /**
    * Internal method called by the ProcedureExecutor that starts the user-level code rollback().
    */
-  @InterfaceAudience.Private
-  protected void doRollback(final TEnvironment env)
+  protected void doRollback(TEnvironment env)
       throws IOException, InterruptedException {
     try {
       updateTimestamp();
@@ -867,19 +889,60 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
     }
   }
 
+  final void restoreLock(TEnvironment env) {
+    if (!lockedWhenLoading) {
+      LOG.debug("{} didn't hold the lock before restarting, skip acquiring lock.", this);
+      return;
+    }
+
+    LOG.debug("{} held the lock before restarting, call acquireLock to restore it.", this);
+    LockState state = acquireLock(env);
+    assert state == LockState.LOCK_ACQUIRED;
+  }
+
   /**
    * Internal method called by the ProcedureExecutor that starts the user-level code acquireLock().
    */
-  @InterfaceAudience.Private
-  protected LockState doAcquireLock(final TEnvironment env) {
-    return acquireLock(env);
+  final LockState doAcquireLock(TEnvironment env, ProcedureStore store) {
+    if (waitInitialized(env)) {
+      return LockState.LOCK_EVENT_WAIT;
+    }
+    if (lockedWhenLoading) {
+      // reset it so we will not consider it anymore
+      lockedWhenLoading = false;
+      locked = true;
+      // Here we return without persist the locked state, as lockedWhenLoading is true means
+      // that the locked field of the procedure stored in procedure store is true, so we do not need
+      // to store it again.
+      return LockState.LOCK_ACQUIRED;
+    }
+    LockState state = acquireLock(env);
+    if (state == LockState.LOCK_ACQUIRED) {
+      locked = true;
+      // persist that we have held the lock. This must be done before we actually execute the
+      // procedure, otherwise when restarting, we may consider the procedure does not have a lock,
+      // but it may have already done some changes as we have already executed it, and if another
+      // procedure gets the lock, then the semantic will be broken if the holdLock is true, as we do
+      // not expect that another procedure can be executed in the middle.
+      store.update(this);
+    }
+    return state;
   }
 
   /**
    * Internal method called by the ProcedureExecutor that starts the user-level code releaseLock().
    */
-  @InterfaceAudience.Private
-  protected void doReleaseLock(final TEnvironment env) {
+  final void doReleaseLock(TEnvironment env, ProcedureStore store) {
+    locked = false;
+    // persist that we have released the lock. This must be done before we actually release the
+    // lock. Another procedure may take this lock immediately after we release the lock, and if we
+    // crash before persist the information that we have already released the lock, then when
+    // restarting there will be two procedures which both have the lock and cause problems.
+    if (getState() != ProcedureState.ROLLEDBACK) {
+      // If the state is ROLLEDBACK, it means that we have already deleted the procedure from
+      // procedure store, so do not need to log the release operation any more.
+      store.update(this);
+    }
     releaseLock(env);
   }
 
@@ -896,7 +959,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
    * Get an hashcode for the specified Procedure ID
    * @return the hashcode for the specified procId
    */
-  public static long getProcIdHashCode(final long procId) {
+  public static long getProcIdHashCode(long procId) {
     long h = procId;
     h ^= h >> 16;
     h *= 0x85ebca6b;
@@ -906,15 +969,16 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
     return h;
   }
 
-  /*
+  /**
    * Helper to lookup the root Procedure ID given a specified procedure.
    */
-  @InterfaceAudience.Private
-  protected static Long getRootProcedureId(final Map<Long, Procedure> procedures,
-      Procedure<?> proc) {
+  protected static <T> Long getRootProcedureId(Map<Long, Procedure<T>> procedures,
+      Procedure<T> proc) {
     while (proc.hasParent()) {
       proc = procedures.get(proc.getParentProcId());
-      if (proc == null) return null;
+      if (proc == null) {
+        return null;
+      }
     }
     return proc.getProcId();
   }
@@ -924,7 +988,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
    * @param b the second procedure to be compared.
    * @return true if the two procedures have the same parent
    */
-  public static boolean haveSameParent(final Procedure<?> a, final Procedure<?> b) {
+  public static boolean haveSameParent(Procedure<?> a, Procedure<?> b) {
     return a.hasParent() && b.hasParent() && (a.getParentProcId() == b.getParentProcId());
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3f17fa1/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index db7c118..f1bec72 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -19,8 +19,10 @@
 package org.apache.hadoop.hbase.procedure2;
 
 import java.io.IOException;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Deque;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -48,7 +50,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.NonceKey;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,7 +72,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
  * and get the result via getResult(procId)
  */
 @InterfaceAudience.Private
-@InterfaceStability.Evolving
 public class ProcedureExecutor<TEnvironment> {
   private static final Logger LOG = LoggerFactory.getLogger(ProcedureExecutor.class);
 
@@ -108,16 +108,16 @@ public class ProcedureExecutor<TEnvironment> {
     void procedureFinished(long procId);
   }
 
-  private static class CompletedProcedureRetainer {
-    private final Procedure<?> procedure;
+  private static final class CompletedProcedureRetainer<TEnvironment> {
+    private final Procedure<TEnvironment> procedure;
     private long clientAckTime;
 
-    public CompletedProcedureRetainer(Procedure<?> procedure) {
+    public CompletedProcedureRetainer(Procedure<TEnvironment> procedure) {
       this.procedure = procedure;
       clientAckTime = -1;
     }
 
-    public Procedure<?> getProcedure() {
+    public Procedure<TEnvironment> getProcedure() {
       return procedure;
     }
 
@@ -172,13 +172,13 @@ public class ProcedureExecutor<TEnvironment> {
     private static final String BATCH_SIZE_CONF_KEY = "hbase.procedure.cleaner.evict.batch.size";
     private static final int DEFAULT_BATCH_SIZE = 32;
 
-    private final Map<Long, CompletedProcedureRetainer> completed;
+    private final Map<Long, CompletedProcedureRetainer<TEnvironment>> completed;
     private final Map<NonceKey, Long> nonceKeysToProcIdsMap;
     private final ProcedureStore store;
     private Configuration conf;
 
-    public CompletedProcedureCleaner(final Configuration conf, final ProcedureStore store,
-        final Map<Long, CompletedProcedureRetainer> completedMap,
+    public CompletedProcedureCleaner(Configuration conf, final ProcedureStore store,
+        final Map<Long, CompletedProcedureRetainer<TEnvironment>> completedMap,
         final Map<NonceKey, Long> nonceKeysToProcIdsMap) {
       // set the timeout interval that triggers the periodic-procedure
       super(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL));
@@ -205,10 +205,11 @@ public class ProcedureExecutor<TEnvironment> {
       int batchCount = 0;
 
       final long now = EnvironmentEdgeManager.currentTime();
-      final Iterator<Map.Entry<Long, CompletedProcedureRetainer>> it = completed.entrySet().iterator();
+      final Iterator<Map.Entry<Long, CompletedProcedureRetainer<TEnvironment>>> it =
+        completed.entrySet().iterator();
       while (it.hasNext() && store.isRunning()) {
-        final Map.Entry<Long, CompletedProcedureRetainer> entry = it.next();
-        final CompletedProcedureRetainer retainer = entry.getValue();
+        final Map.Entry<Long, CompletedProcedureRetainer<TEnvironment>> entry = it.next();
+        final CompletedProcedureRetainer<TEnvironment> retainer = entry.getValue();
         final Procedure<?> proc = retainer.getProcedure();
 
         // TODO: Select TTL based on Procedure type
@@ -240,28 +241,32 @@ public class ProcedureExecutor<TEnvironment> {
    * Once a Root-Procedure completes (success or failure), the result will be added to this map.
    * The user of ProcedureExecutor should call getResult(procId) to get the result.
    */
-  private final ConcurrentHashMap<Long, CompletedProcedureRetainer> completed = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<Long, CompletedProcedureRetainer<TEnvironment>> completed =
+    new ConcurrentHashMap<>();
 
   /**
    * Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState.
    * The RootProcedureState contains the execution stack of the Root-Procedure,
    * It is added to the map by submitProcedure() and removed on procedure completion.
    */
-  private final ConcurrentHashMap<Long, RootProcedureState> rollbackStack = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<Long, RootProcedureState<TEnvironment>> rollbackStack =
+    new ConcurrentHashMap<>();
 
   /**
    * Helper map to lookup the live procedures by ID.
    * This map contains every procedure. root-procedures and subprocedures.
    */
-  private final ConcurrentHashMap<Long, Procedure> procedures = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<Long, Procedure<TEnvironment>> procedures =
+    new ConcurrentHashMap<>();
 
   /**
-   * Helper map to lookup whether the procedure already issued from the same client.
-   * This map contains every root procedure.
+   * Helper map to lookup whether the procedure already issued from the same client. This map
+   * contains every root procedure.
    */
   private final ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap = new ConcurrentHashMap<>();
 
-  private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners = new CopyOnWriteArrayList<>();
+  private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners =
+    new CopyOnWriteArrayList<>();
 
   private Configuration conf;
 
@@ -287,7 +292,7 @@ public class ProcedureExecutor<TEnvironment> {
    * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
    * (Should be ok).
    */
-  private TimeoutExecutorThread timeoutExecutor;
+  private TimeoutExecutorThread<TEnvironment> timeoutExecutor;
 
   private int corePoolSize;
   private int maxPoolSize;
@@ -357,27 +362,68 @@ public class ProcedureExecutor<TEnvironment> {
     });
   }
 
-  private void loadProcedures(final ProcedureIterator procIter,
-      final boolean abortOnCorruption) throws IOException {
-    final boolean debugEnabled = LOG.isDebugEnabled();
+  private void restoreLock(Procedure<TEnvironment> proc, Set<Long> restored) {
+    proc.restoreLock(getEnvironment());
+    restored.add(proc.getProcId());
+  }
+
+  private void restoreLocks(Deque<Procedure<TEnvironment>> stack, Set<Long> restored) {
+    while (!stack.isEmpty()) {
+      restoreLock(stack.pop(), restored);
+    }
+  }
+
+  // Restore the locks for all the procedures.
+  // Notice that we need to restore the locks starting from the root proc, otherwise there will be
+  // problem that a sub procedure may hold the exclusive lock first and then we are stuck when
+  // calling the acquireLock method for the parent procedure.
+  // The algorithm is straight-forward:
+  // 1. Use a set to record the procedures which locks have already been restored.
+  // 2. Use a stack to store the hierarchy of the procedures
+  // 3. For all the procedure, we will first try to find its parent and push it into the stack,
+  // unless
+  // a. We have no parent, i.e, we are the root procedure
+  // b. The lock has already been restored(by checking the set introduced in #1)
+  // then we start to pop the stack and call acquireLock for each procedure.
+  // Notice that this should be done for all procedures, not only the ones in runnableList.
+  private void restoreLocks() {
+    Set<Long> restored = new HashSet<>();
+    Deque<Procedure<TEnvironment>> stack = new ArrayDeque<>();
+    procedures.values().forEach(proc -> {
+      for (;;) {
+        if (restored.contains(proc.getProcId())) {
+          restoreLocks(stack, restored);
+          return;
+        }
+        if (!proc.hasParent()) {
+          restoreLock(proc, restored);
+          restoreLocks(stack, restored);
+          return;
+        }
+        stack.push(proc);
+        proc = procedures.get(proc.getParentProcId());
+      }
+    });
+  }
 
+  private void loadProcedures(ProcedureIterator procIter, boolean abortOnCorruption)
+      throws IOException {
     // 1. Build the rollback stack
     int runnablesCount = 0;
+    int failedCount = 0;
     while (procIter.hasNext()) {
       boolean finished = procIter.isNextFinished();
-      Procedure proc = procIter.next();
+      Procedure<TEnvironment> proc = procIter.next();
       NonceKey nonceKey = proc.getNonceKey();
       long procId = proc.getProcId();
 
       if (finished) {
-        completed.put(proc.getProcId(), new CompletedProcedureRetainer(proc));
-        if (debugEnabled) {
-          LOG.debug("Completed " + proc);
-        }
+        completed.put(proc.getProcId(), new CompletedProcedureRetainer<>(proc));
+        LOG.debug("Completed {}", proc);
       } else {
         if (!proc.hasParent()) {
           assert !proc.isFinished() : "unexpected finished procedure";
-          rollbackStack.put(proc.getProcId(), new RootProcedureState());
+          rollbackStack.put(proc.getProcId(), new RootProcedureState<>());
         }
 
         // add the procedure to the map
@@ -386,6 +432,8 @@ public class ProcedureExecutor<TEnvironment> {
 
         if (proc.getState() == ProcedureState.RUNNABLE) {
           runnablesCount++;
+        } else if (proc.getState() == ProcedureState.FAILED) {
+          failedCount++;
         }
       }
 
@@ -396,8 +444,19 @@ public class ProcedureExecutor<TEnvironment> {
     }
 
     // 2. Initialize the stacks
-    final ArrayList<Procedure> runnableList = new ArrayList(runnablesCount);
-    HashSet<Procedure> waitingSet = null;
+    // In the old implementation, for procedures in FAILED state, we will push it into the
+    // ProcedureScheduler directly to execute the rollback. But this does not work after we
+    // introduce the restore lock stage.
+    // For now, when we acquire a xlock, we will remove the queue from runQueue in scheduler, and
+    // then when a procedure which has lock access, for example, a sub procedure of the procedure
+    // which has the xlock, is pushed into the scheduler, we will add the queue back to let the
+    // workers poll from it. The assumption here is that, the procedure which has the xlock should
+    // have been polled out already, so when loading we can not add the procedure to scheduler first
+    // and then call acquireLock, since the procedure is still in the queue, and since we will
+    // remove the queue from runQueue, then no one can poll it out, then there is a dead lock
+    List<Procedure<TEnvironment>> runnableList = new ArrayList<>(runnablesCount);
+    List<Procedure<TEnvironment>> failedList = new ArrayList<>(failedCount);
+    Set<Procedure<TEnvironment>> waitingSet = null;
     procIter.reset();
     while (procIter.hasNext()) {
       if (procIter.isNextFinished()) {
@@ -405,12 +464,10 @@ public class ProcedureExecutor<TEnvironment> {
         continue;
       }
 
-      Procedure proc = procIter.next();
+      Procedure<TEnvironment> proc = procIter.next();
       assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc;
 
-      if (debugEnabled) {
-        LOG.debug(String.format("Loading %s", proc));
-      }
+      LOG.debug("Loading {}", proc);
 
       Long rootProcId = getRootProcedureId(proc);
       if (rootProcId == null) {
@@ -420,14 +477,14 @@ public class ProcedureExecutor<TEnvironment> {
       }
 
       if (proc.hasParent()) {
-        Procedure parent = procedures.get(proc.getParentProcId());
+        Procedure<TEnvironment> parent = procedures.get(proc.getParentProcId());
         // corrupted procedures are handled later at step 3
         if (parent != null && !proc.isFinished()) {
           parent.incChildrenLatch();
         }
       }
 
-      RootProcedureState procStack = rollbackStack.get(rootProcId);
+      RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId);
       procStack.loadStack(proc);
 
       proc.setRootProcId(rootProcId);
@@ -447,8 +504,7 @@ public class ProcedureExecutor<TEnvironment> {
           waitingSet.add(proc);
           break;
         case FAILED:
-          // add the proc to the scheduler to perform the rollback
-          scheduler.addBack(proc);
+          failedList.add(proc);
           break;
         case ROLLEDBACK:
         case INITIALIZING:
@@ -462,13 +518,14 @@ public class ProcedureExecutor<TEnvironment> {
 
     // 3. Validate the stacks
     int corruptedCount = 0;
-    Iterator<Map.Entry<Long, RootProcedureState>> itStack = rollbackStack.entrySet().iterator();
+    Iterator<Map.Entry<Long, RootProcedureState<TEnvironment>>> itStack =
+      rollbackStack.entrySet().iterator();
     while (itStack.hasNext()) {
-      Map.Entry<Long, RootProcedureState> entry = itStack.next();
-      RootProcedureState procStack = entry.getValue();
+      Map.Entry<Long, RootProcedureState<TEnvironment>> entry = itStack.next();
+      RootProcedureState<TEnvironment> procStack = entry.getValue();
       if (procStack.isValid()) continue;
 
-      for (Procedure proc: procStack.getSubproceduresStack()) {
+      for (Procedure<TEnvironment> proc : procStack.getSubproceduresStack()) {
         LOG.error("Corrupted " + proc);
         procedures.remove(proc.getProcId());
         runnableList.remove(proc);
@@ -484,30 +541,22 @@ public class ProcedureExecutor<TEnvironment> {
 
     // 4. Push the procedures to the timeout executor
     if (waitingSet != null && !waitingSet.isEmpty()) {
-      for (Procedure proc: waitingSet) {
+      for (Procedure<TEnvironment> proc: waitingSet) {
         proc.afterReplay(getEnvironment());
         timeoutExecutor.add(proc);
       }
     }
-
-    // 5. Push the procedure to the scheduler
-    if (!runnableList.isEmpty()) {
-      // TODO: See ProcedureWALFormatReader#hasFastStartSupport
-      // some procedure may be started way before this stuff.
-      for (int i = runnableList.size() - 1; i >= 0; --i) {
-        Procedure proc = runnableList.get(i);
-        proc.afterReplay(getEnvironment());
-        if (!proc.hasParent()) {
-          sendProcedureLoadedNotification(proc.getProcId());
-        }
-        if (proc.wasExecuted()) {
-          scheduler.addFront(proc);
-        } else {
-          // if it was not in execution, it can wait.
-          scheduler.addBack(proc);
-        }
+    // 5. restore locks
+    restoreLocks();
+    // 6. Push the procedure to the scheduler
+    failedList.forEach(scheduler::addBack);
+    runnableList.forEach(p -> {
+      p.afterReplay(getEnvironment());
+      if (!p.hasParent()) {
+        sendProcedureLoadedNotification(p.getProcId());
       }
-    }
+      scheduler.addBack(p);
+    });
   }
 
   /**
@@ -529,7 +578,7 @@ public class ProcedureExecutor<TEnvironment> {
         corePoolSize, maxPoolSize);
 
     this.threadGroup = new ThreadGroup("PEWorkerGroup");
-    this.timeoutExecutor = new TimeoutExecutorThread(this, threadGroup);
+    this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup);
 
     // Create the workers
     workerId.set(0);
@@ -581,7 +630,7 @@ public class ProcedureExecutor<TEnvironment> {
     timeoutExecutor.add(new WorkerMonitor());
 
     // Add completed cleaner chore
-    addChore(new CompletedProcedureCleaner(conf, store, completed, nonceKeysToProcIdsMap));
+    addChore(new CompletedProcedureCleaner<>(conf, store, completed, nonceKeysToProcIdsMap));
   }
 
   public void stop() {
@@ -686,7 +735,7 @@ public class ProcedureExecutor<TEnvironment> {
    * Add a chore procedure to the executor
    * @param chore the chore to add
    */
-  public void addChore(final ProcedureInMemoryChore chore) {
+  public void addChore(ProcedureInMemoryChore<TEnvironment> chore) {
     chore.setState(ProcedureState.WAITING_TIMEOUT);
     timeoutExecutor.add(chore);
   }
@@ -696,7 +745,7 @@ public class ProcedureExecutor<TEnvironment> {
    * @param chore the chore to remove
    * @return whether the chore is removed, or it will be removed later
    */
-  public boolean removeChore(final ProcedureInMemoryChore chore) {
+  public boolean removeChore(ProcedureInMemoryChore<TEnvironment> chore) {
     chore.setState(ProcedureState.SUCCESS);
     return timeoutExecutor.remove(chore);
   }
@@ -830,17 +879,21 @@ public class ProcedureExecutor<TEnvironment> {
    * @param procOwner name of the owner of the procedure, used to inform the user
    * @param exception the failure to report to the user
    */
-  public void setFailureResultForNonce(final NonceKey nonceKey, final String procName,
-      final User procOwner, final IOException exception) {
-    if (nonceKey == null) return;
+  public void setFailureResultForNonce(NonceKey nonceKey, String procName, User procOwner,
+      IOException exception) {
+    if (nonceKey == null) {
+      return;
+    }
 
-    final Long procId = nonceKeysToProcIdsMap.get(nonceKey);
-    if (procId == null || completed.containsKey(procId)) return;
+    Long procId = nonceKeysToProcIdsMap.get(nonceKey);
+    if (procId == null || completed.containsKey(procId)) {
+      return;
+    }
 
-    Procedure<?> proc = new FailedProcedure(procId.longValue(),
-        procName, procOwner, nonceKey, exception);
+    Procedure<TEnvironment> proc =
+      new FailedProcedure<>(procId.longValue(), procName, procOwner, nonceKey, exception);
 
-    completed.putIfAbsent(procId, new CompletedProcedureRetainer(proc));
+    completed.putIfAbsent(procId, new CompletedProcedureRetainer<>(proc));
   }
 
   // ==========================================================================
@@ -851,7 +904,7 @@ public class ProcedureExecutor<TEnvironment> {
    * @param proc the new procedure to execute.
    * @return the procedure id, that can be used to monitor the operation
    */
-  public long submitProcedure(final Procedure proc) {
+  public long submitProcedure(Procedure<TEnvironment> proc) {
     return submitProcedure(proc, null);
   }
 
@@ -863,7 +916,7 @@ public class ProcedureExecutor<TEnvironment> {
    */
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
       justification = "FindBugs is blind to the check-for-null")
-  public long submitProcedure(final Procedure proc, final NonceKey nonceKey) {
+  public long submitProcedure(Procedure<TEnvironment> proc, NonceKey nonceKey) {
     Preconditions.checkArgument(lastProcId.get() >= 0);
 
     prepareProcedure(proc);
@@ -883,9 +936,7 @@ public class ProcedureExecutor<TEnvironment> {
 
     // Commit the transaction
     store.insert(proc, null);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Stored " + proc);
-    }
+    LOG.debug("Stored {}", proc);
 
     // Add the procedure to the executor
     return pushProcedure(proc);
@@ -896,7 +947,7 @@ public class ProcedureExecutor<TEnvironment> {
    * @param procs the new procedures to execute.
    */
   // TODO: Do we need to take nonces here?
-  public void submitProcedures(final Procedure[] procs) {
+  public void submitProcedures(Procedure<TEnvironment>[] procs) {
     Preconditions.checkArgument(lastProcId.get() >= 0);
     if (procs == null || procs.length <= 0) {
       return;
@@ -919,7 +970,7 @@ public class ProcedureExecutor<TEnvironment> {
     }
   }
 
-  private Procedure prepareProcedure(final Procedure proc) {
+  private Procedure<TEnvironment> prepareProcedure(Procedure<TEnvironment> proc) {
     Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING);
     Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc);
     if (this.checkOwnerSet) {
@@ -928,14 +979,14 @@ public class ProcedureExecutor<TEnvironment> {
     return proc;
   }
 
-  private long pushProcedure(final Procedure proc) {
+  private long pushProcedure(Procedure<TEnvironment> proc) {
     final long currentProcId = proc.getProcId();
 
     // Update metrics on start of a procedure
     proc.updateMetricsOnSubmit(getEnvironment());
 
     // Create the rollback stack for the procedure
-    RootProcedureState stack = new RootProcedureState();
+    RootProcedureState<TEnvironment> stack = new RootProcedureState<>();
     rollbackStack.put(currentProcId, stack);
 
     // Submit the new subprocedures
@@ -952,7 +1003,7 @@ public class ProcedureExecutor<TEnvironment> {
    * @param procId the procedure to abort
    * @return true if the procedure exists and has received the abort, otherwise false.
    */
-  public boolean abort(final long procId) {
+  public boolean abort(long procId) {
     return abort(procId, true);
   }
 
@@ -963,8 +1014,8 @@ public class ProcedureExecutor<TEnvironment> {
    * @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted?
    * @return true if the procedure exists and has received the abort, otherwise false.
    */
-  public boolean abort(final long procId, final boolean mayInterruptIfRunning) {
-    final Procedure proc = procedures.get(procId);
+  public boolean abort(long procId, boolean mayInterruptIfRunning) {
+    Procedure<TEnvironment> proc = procedures.get(procId);
     if (proc != null) {
       if (!mayInterruptIfRunning && proc.wasExecuted()) {
         return false;
@@ -977,20 +1028,20 @@ public class ProcedureExecutor<TEnvironment> {
   // ==========================================================================
   //  Executor query helpers
   // ==========================================================================
-  public Procedure getProcedure(final long procId) {
+  public Procedure<TEnvironment> getProcedure(final long procId) {
     return procedures.get(procId);
   }
 
-  public <T extends Procedure> T getProcedure(final Class<T> clazz, final long procId) {
-    final Procedure proc = getProcedure(procId);
+  public <T extends Procedure<TEnvironment>> T getProcedure(Class<T> clazz, long procId) {
+    Procedure<TEnvironment> proc = getProcedure(procId);
     if (clazz.isInstance(proc)) {
-      return (T)proc;
+      return clazz.cast(proc);
     }
     return null;
   }
 
-  public Procedure getResult(final long procId) {
-    CompletedProcedureRetainer retainer = completed.get(procId);
+  public Procedure<TEnvironment> getResult(long procId) {
+    CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
     if (retainer == null) {
       return null;
     } else {
@@ -1014,8 +1065,8 @@ public class ProcedureExecutor<TEnvironment> {
    * @param procId the ID of the procedure to check
    * @return true if the procedure execution is started, otherwise false.
    */
-  public boolean isStarted(final long procId) {
-    final Procedure proc = procedures.get(procId);
+  public boolean isStarted(long procId) {
+    Procedure<?> proc = procedures.get(procId);
     if (proc == null) {
       return completed.get(procId) != null;
     }
@@ -1026,13 +1077,11 @@ public class ProcedureExecutor<TEnvironment> {
    * Mark the specified completed procedure, as ready to remove.
    * @param procId the ID of the procedure to remove
    */
-  public void removeResult(final long procId) {
-    CompletedProcedureRetainer retainer = completed.get(procId);
+  public void removeResult(long procId) {
+    CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
     if (retainer == null) {
       assert !procedures.containsKey(procId) : "pid=" + procId + " is still running";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("pid=" + procId + " already removed by the cleaner.");
-      }
+      LOG.debug("pid={} already removed by the cleaner.", procId);
       return;
     }
 
@@ -1040,8 +1089,8 @@ public class ProcedureExecutor<TEnvironment> {
     retainer.setClientAckTime(EnvironmentEdgeManager.currentTime());
   }
 
-  public Procedure getResultOrProcedure(final long procId) {
-    CompletedProcedureRetainer retainer = completed.get(procId);
+  public Procedure<TEnvironment> getResultOrProcedure(long procId) {
+    CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
     if (retainer == null) {
       return procedures.get(procId);
     } else {
@@ -1056,15 +1105,16 @@ public class ProcedureExecutor<TEnvironment> {
    * @return true if the user is the owner of the procedure,
    *   false otherwise or the owner is unknown.
    */
-  public boolean isProcedureOwner(final long procId, final User user) {
-    if (user == null) return false;
-
-    final Procedure runningProc = procedures.get(procId);
+  public boolean isProcedureOwner(long procId, User user) {
+    if (user == null) {
+      return false;
+    }
+    final Procedure<TEnvironment> runningProc = procedures.get(procId);
     if (runningProc != null) {
       return runningProc.getOwner().equals(user.getShortName());
     }
 
-    final CompletedProcedureRetainer retainer = completed.get(procId);
+    final CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
     if (retainer != null) {
       return retainer.getProcedure().getOwner().equals(user.getShortName());
     }
@@ -1078,19 +1128,17 @@ public class ProcedureExecutor<TEnvironment> {
    * Get procedures.
    * @return the procedures in a list
    */
-  public List<Procedure<?>> getProcedures() {
-    final List<Procedure<?>> procedureLists = new ArrayList<>(procedures.size() + completed.size());
-    for (Procedure<?> procedure : procedures.values()) {
-      procedureLists.add(procedure);
-    }
+  public List<Procedure<TEnvironment>> getProcedures() {
+    List<Procedure<TEnvironment>> procedureList =
+      new ArrayList<>(procedures.size() + completed.size());
+    procedureList.addAll(procedures.values());
     // Note: The procedure could show up twice in the list with different state, as
     // it could complete after we walk through procedures list and insert into
     // procedureList - it is ok, as we will use the information in the Procedure
     // to figure it out; to prevent this would increase the complexity of the logic.
-    for (CompletedProcedureRetainer retainer: completed.values()) {
-      procedureLists.add(retainer.getProcedure());
-    }
-    return procedureLists;
+    completed.values().stream().map(CompletedProcedureRetainer::getProcedure)
+      .forEach(procedureList::add);
+    return procedureList;
   }
 
   // ==========================================================================
@@ -1169,14 +1217,14 @@ public class ProcedureExecutor<TEnvironment> {
     return procedures.keySet();
   }
 
-  Long getRootProcedureId(Procedure proc) {
+  Long getRootProcedureId(Procedure<TEnvironment> proc) {
     return Procedure.getRootProcedureId(procedures, proc);
   }
 
   // ==========================================================================
   //  Executions
   // ==========================================================================
-  private void executeProcedure(final Procedure proc) {
+  private void executeProcedure(Procedure<TEnvironment> proc) {
     final Long rootProcId = getRootProcedureId(proc);
     if (rootProcId == null) {
       // The 'proc' was ready to run but the root procedure was rolledback
@@ -1185,7 +1233,7 @@ public class ProcedureExecutor<TEnvironment> {
       return;
     }
 
-    final RootProcedureState procStack = rollbackStack.get(rootProcId);
+    RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId);
     if (procStack == null) {
       LOG.warn("RootProcedureState is null for " + proc.getProcId());
       return;
@@ -1197,7 +1245,7 @@ public class ProcedureExecutor<TEnvironment> {
           // we have the 'rollback-lock' we can start rollingback
           switch (executeRollback(rootProcId, procStack)) {
             case LOCK_ACQUIRED:
-                break;
+              break;
             case LOCK_YIELD_WAIT:
               procStack.unsetRollback();
               scheduler.yield(proc);
@@ -1239,7 +1287,6 @@ public class ProcedureExecutor<TEnvironment> {
       switch (lockState) {
         case LOCK_ACQUIRED:
           execProcedure(procStack, proc);
-          releaseLock(proc, false);
           break;
         case LOCK_YIELD_WAIT:
           LOG.info(lockState + " " + proc);
@@ -1254,12 +1301,6 @@ public class ProcedureExecutor<TEnvironment> {
       }
       procStack.release(proc);
 
-      // allows to kill the executor before something is stored to the wal.
-      // useful to test the procedure recovery.
-      if (testing != null && !isRunning()) {
-        break;
-      }
-
       if (proc.isSuccess()) {
         // update metrics on finishing the procedure
         proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true);
@@ -1275,33 +1316,31 @@ public class ProcedureExecutor<TEnvironment> {
     } while (procStack.isFailed());
   }
 
-  private LockState acquireLock(final Procedure proc) {
-    final TEnvironment env = getEnvironment();
-    // hasLock() is used in conjunction with holdLock().
-    // This allows us to not rewrite or carry around the hasLock() flag
-    // for every procedure. the hasLock() have meaning only if holdLock() is true.
-    if (proc.holdLock(env) && proc.hasLock(env)) {
+  private LockState acquireLock(Procedure<TEnvironment> proc) {
+    TEnvironment env = getEnvironment();
+    // if holdLock is true, then maybe we already have the lock, so just return LOCK_ACQUIRED if
+    // hasLock is true.
+    if (proc.hasLock()) {
       return LockState.LOCK_ACQUIRED;
     }
-    return proc.doAcquireLock(env);
+    return proc.doAcquireLock(env, store);
   }
 
-  private void releaseLock(final Procedure proc, final boolean force) {
-    final TEnvironment env = getEnvironment();
+  private void releaseLock(Procedure<TEnvironment> proc, boolean force) {
+    TEnvironment env = getEnvironment();
     // For how the framework works, we know that we will always have the lock
     // when we call releaseLock(), so we can avoid calling proc.hasLock()
-    if (force || !proc.holdLock(env)) {
-      proc.doReleaseLock(env);
+    if (force || !proc.holdLock(env) || proc.isFinished()) {
+      proc.doReleaseLock(env, store);
     }
   }
 
   /**
-   * Execute the rollback of the full procedure stack.
-   * Once the procedure is rolledback, the root-procedure will be visible as
-   * finished to user, and the result will be the fatal exception.
+   * Execute the rollback of the full procedure stack. Once the procedure is rolledback, the
+   * root-procedure will be visible as finished to user, and the result will be the fatal exception.
    */
-  private LockState executeRollback(final long rootProcId, final RootProcedureState procStack) {
-    final Procedure rootProc = procedures.get(rootProcId);
+  private LockState executeRollback(long rootProcId, RootProcedureState<TEnvironment> procStack) {
+    Procedure<TEnvironment> rootProc = procedures.get(rootProcId);
     RemoteProcedureException exception = rootProc.getException();
     // TODO: This needs doc. The root proc doesn't have an exception. Maybe we are
     // rolling back because the subprocedure does. Clarify.
@@ -1311,13 +1350,13 @@ public class ProcedureExecutor<TEnvironment> {
       store.update(rootProc);
     }
 
-    final List<Procedure> subprocStack = procStack.getSubproceduresStack();
+    List<Procedure<TEnvironment>> subprocStack = procStack.getSubproceduresStack();
     assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc;
 
     int stackTail = subprocStack.size();
     boolean reuseLock = false;
     while (stackTail --> 0) {
-      final Procedure proc = subprocStack.get(stackTail);
+      Procedure<TEnvironment> proc = subprocStack.get(stackTail);
 
       LockState lockState;
       if (!reuseLock && (lockState = acquireLock(proc)) != LockState.LOCK_ACQUIRED) {
@@ -1334,7 +1373,7 @@ public class ProcedureExecutor<TEnvironment> {
       // (e.g. StateMachineProcedure reuse the same instance)
       // we can avoid to lock/unlock each step
       reuseLock = stackTail > 0 && (subprocStack.get(stackTail - 1) == proc) && !abortRollback;
-      if (!reuseLock) {
+      if (!reuseLock && proc.hasLock()) {
         releaseLock(proc, false);
       }
 
@@ -1368,13 +1407,11 @@ public class ProcedureExecutor<TEnvironment> {
    * It updates the store with the new state (stack index)
    * or will remove completly the procedure in case it is a child.
    */
-  private LockState executeRollback(final Procedure proc) {
+  private LockState executeRollback(Procedure<TEnvironment> proc) {
     try {
       proc.doRollback(getEnvironment());
     } catch (IOException e) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Roll back attempt failed for " + proc, e);
-      }
+      LOG.debug("Roll back attempt failed for {}", proc, e);
       return LockState.LOCK_YIELD_WAIT;
     } catch (InterruptedException e) {
       handleInterruptedException(proc, e);
@@ -1387,9 +1424,10 @@ public class ProcedureExecutor<TEnvironment> {
     // allows to kill the executor before something is stored to the wal.
     // useful to test the procedure recovery.
     if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
-      LOG.debug("TESTING: Kill before store update");
+      String msg = "TESTING: Kill before store update";
+      LOG.debug(msg);
       stop();
-      return LockState.LOCK_YIELD_WAIT;
+      throw new RuntimeException(msg);
     }
 
     if (proc.removeStackIndex()) {
@@ -1416,6 +1454,11 @@ public class ProcedureExecutor<TEnvironment> {
     return LockState.LOCK_ACQUIRED;
   }
 
+  private void yieldProcedure(Procedure<TEnvironment> proc) {
+    releaseLock(proc, false);
+    scheduler.yield(proc);
+  }
+
   /**
    * Executes <code>procedure</code>
    * <ul>
@@ -1445,10 +1488,10 @@ public class ProcedureExecutor<TEnvironment> {
    *  </li>
    *  </ul>
    */
-  private void execProcedure(final RootProcedureState procStack,
-      final Procedure<TEnvironment> procedure) {
+  private void execProcedure(RootProcedureState<TEnvironment> procStack,
+      Procedure<TEnvironment> procedure) {
     Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,
-        procedure.toString());
+      procedure.toString());
 
     // Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException.
     // The exception is caught below and then we hurry to the exit without disturbing state. The
@@ -1475,22 +1518,16 @@ public class ProcedureExecutor<TEnvironment> {
           subprocs = null;
         }
       } catch (ProcedureSuspendedException e) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Suspend " + procedure);
-        }
+        LOG.trace("Suspend {}", procedure);
         suspended = true;
       } catch (ProcedureYieldException e) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Yield " + procedure + ": " + e.getMessage(), e);
-        }
-        scheduler.yield(procedure);
+        LOG.trace("Yield {}", procedure, e);
+        yieldProcedure(procedure);
         return;
       } catch (InterruptedException e) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Yield interrupt " + procedure + ": " + e.getMessage(), e);
-        }
+        LOG.trace("Yield interrupt {}", procedure, e);
         handleInterruptedException(procedure, e);
-        scheduler.yield(procedure);
+        yieldProcedure(procedure);
         return;
       } catch (Throwable e) {
         // Catch NullPointerExceptions or similar errors...
@@ -1506,9 +1543,7 @@ public class ProcedureExecutor<TEnvironment> {
             // i.e. we go around this loop again rather than go back out on the scheduler queue.
             subprocs = null;
             reExecute = true;
-            if (LOG.isTraceEnabled()) {
-              LOG.trace("Short-circuit to next step on pid=" + procedure.getProcId());
-            }
+            LOG.trace("Short-circuit to next step on pid={}", procedure.getProcId());
           } else {
             // Yield the current procedure, and make the subprocedure runnable
             // subprocs may come back 'null'.
@@ -1519,9 +1554,7 @@ public class ProcedureExecutor<TEnvironment> {
                 collect(Collectors.toList()).toString()));
           }
         } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Added to timeoutExecutor " + procedure);
-          }
+          LOG.trace("Added to timeoutExecutor {}", procedure);
           timeoutExecutor.add(procedure);
         } else if (!suspended) {
           // No subtask, so we are done
@@ -1535,9 +1568,10 @@ public class ProcedureExecutor<TEnvironment> {
       // allows to kill the executor before something is stored to the wal.
       // useful to test the procedure recovery.
       if (testing != null && testing.shouldKillBeforeStoreUpdate(suspended)) {
-        LOG.debug("TESTING: Kill before store update: " + procedure);
+        String msg = "TESTING: Kill before store update: " + procedure;
+        LOG.debug(msg);
         stop();
-        return;
+        throw new RuntimeException(msg);
       }
 
       // TODO: The code here doesn't check if store is running before persisting to the store as
@@ -1551,11 +1585,13 @@ public class ProcedureExecutor<TEnvironment> {
       updateStoreOnExec(procStack, procedure, subprocs);
 
       // if the store is not running we are aborting
-      if (!store.isRunning()) return;
+      if (!store.isRunning()) {
+        return;
+      }
       // if the procedure is kind enough to pass the slot to someone else, yield
       if (procedure.isRunnable() && !suspended &&
           procedure.isYieldAfterExecutionStep(getEnvironment())) {
-        scheduler.yield(procedure);
+        yieldProcedure(procedure);
         return;
       }
 
@@ -1566,6 +1602,11 @@ public class ProcedureExecutor<TEnvironment> {
       submitChildrenProcedures(subprocs);
     }
 
+    // we need to log the release lock operation before waking up the parent procedure, as there
+    // could be race that the parent procedure may call updateStoreOnExec ahead of us and remove all
+    // the sub procedures from store and cause problems...
+    releaseLock(procedure, false);
+
     // if the procedure is complete and has a parent, count down the children latch.
     // If 'suspended', do nothing to change state -- let other threads handle unsuspend event.
     if (!suspended && procedure.isFinished() && procedure.hasParent()) {
@@ -1573,12 +1614,12 @@ public class ProcedureExecutor<TEnvironment> {
     }
   }
 
-  private Procedure[] initializeChildren(final RootProcedureState procStack,
-      final Procedure procedure, final Procedure[] subprocs) {
+  private Procedure<TEnvironment>[] initializeChildren(RootProcedureState<TEnvironment> procStack,
+      Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) {
     assert subprocs != null : "expected subprocedures";
     final long rootProcId = getRootProcedureId(procedure);
     for (int i = 0; i < subprocs.length; ++i) {
-      final Procedure subproc = subprocs[i];
+      Procedure<TEnvironment> subproc = subprocs[i];
       if (subproc == null) {
         String msg = "subproc[" + i + "] is null, aborting the procedure";
         procedure.setFailure(new RemoteProcedureException(msg,
@@ -1609,9 +1650,9 @@ public class ProcedureExecutor<TEnvironment> {
     return subprocs;
   }
 
-  private void submitChildrenProcedures(final Procedure[] subprocs) {
+  private void submitChildrenProcedures(Procedure<TEnvironment>[] subprocs) {
     for (int i = 0; i < subprocs.length; ++i) {
-      final Procedure subproc = subprocs[i];
+      Procedure<TEnvironment> subproc = subprocs[i];
       subproc.updateMetricsOnSubmit(getEnvironment());
       assert !procedures.containsKey(subproc.getProcId());
       procedures.put(subproc.getProcId(), subproc);
@@ -1619,8 +1660,9 @@ public class ProcedureExecutor<TEnvironment> {
     }
   }
 
-  private void countDownChildren(final RootProcedureState procStack, final Procedure procedure) {
-    final Procedure parent = procedures.get(procedure.getParentProcId());
+  private void countDownChildren(RootProcedureState<TEnvironment> procStack,
+      Procedure<TEnvironment> procedure) {
+    Procedure<TEnvironment> parent = procedures.get(procedure.getParentProcId());
     if (parent == null) {
       assert procStack.isRollingback();
       return;
@@ -1637,17 +1679,15 @@ public class ProcedureExecutor<TEnvironment> {
     }
   }
 
-  private void updateStoreOnExec(final RootProcedureState procStack,
-      final Procedure procedure, final Procedure[] subprocs) {
+  private void updateStoreOnExec(RootProcedureState<TEnvironment> procStack,
+      Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) {
     if (subprocs != null && !procedure.isFailed()) {
       if (LOG.isTraceEnabled()) {
         LOG.trace("Stored " + procedure + ", children " + Arrays.toString(subprocs));
       }
       store.insert(procedure, subprocs);
     } else {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Store update " + procedure);
-      }
+      LOG.trace("Store update {}", procedure);
       if (procedure.isFinished() && !procedure.hasParent()) {
         // remove child procedures
         final long[] childProcIds = procStack.getSubprocedureIds();
@@ -1665,11 +1705,8 @@ public class ProcedureExecutor<TEnvironment> {
     }
   }
 
-  private void handleInterruptedException(final Procedure proc, final InterruptedException e) {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Interrupt during " + proc + ". suspend and retry it later.", e);
-    }
-
+  private void handleInterruptedException(Procedure<TEnvironment> proc, InterruptedException e) {
+    LOG.trace("Interrupt during {}. suspend and retry it later.", proc, e);
     // NOTE: We don't call Thread.currentThread().interrupt()
     // because otherwise all the subsequent calls e.g. Thread.sleep() will throw
     // the InterruptedException. If the master is going down, we will be notified
@@ -1677,9 +1714,13 @@ public class ProcedureExecutor<TEnvironment> {
     // (The interrupted procedure will be retried on the next run)
   }
 
-  private void execCompletionCleanup(final Procedure proc) {
+  private void execCompletionCleanup(Procedure<TEnvironment> proc) {
     final TEnvironment env = getEnvironment();
-    if (proc.holdLock(env) && proc.hasLock(env)) {
+    if (proc.hasLock()) {
+      LOG.warn("Usually this should not happen, we will release the lock before if the procedure" +
+        " is finished, even if the holdLock is true, arrive here means we have some holes where" +
+        " we do not release the lock. And the releaseLock below may fail since the procedure may" +
+        " have already been deleted from the procedure store.");
       releaseLock(proc, true);
     }
     try {
@@ -1690,11 +1731,11 @@ public class ProcedureExecutor<TEnvironment> {
     }
   }
 
-  private void procedureFinished(final Procedure proc) {
+  private void procedureFinished(Procedure<TEnvironment> proc) {
     // call the procedure completion cleanup handler
     execCompletionCleanup(proc);
 
-    CompletedProcedureRetainer retainer = new CompletedProcedureRetainer(proc);
+    CompletedProcedureRetainer<TEnvironment> retainer = new CompletedProcedureRetainer<>(proc);
 
     // update the executor internal state maps
     if (!proc.shouldWaitClientAck(getEnvironment())) {
@@ -1710,14 +1751,14 @@ public class ProcedureExecutor<TEnvironment> {
       scheduler.completionCleanup(proc);
     } catch (Throwable e) {
       // Catch NullPointerExceptions or similar errors...
-      LOG.error("CODE-BUG: uncatched runtime exception for completion cleanup: " + proc, e);
+      LOG.error("CODE-BUG: uncatched runtime exception for completion cleanup: {}", proc, e);
     }
 
     // Notify the listeners
     sendProcedureFinishedNotification(proc.getProcId());
   }
 
-  RootProcedureState getProcStack(long rootProcId) {
+  RootProcedureState<TEnvironment> getProcStack(long rootProcId) {
     return rollbackStack.get(rootProcId);
   }
 
@@ -1726,7 +1767,7 @@ public class ProcedureExecutor<TEnvironment> {
   // ==========================================================================
   private class WorkerThread extends StoppableThread {
     private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE);
-    private volatile Procedure<?> activeProcedure;
+    private volatile Procedure<TEnvironment> activeProcedure;
 
     public WorkerThread(ThreadGroup group) {
       this(group, "PEWorker-");
@@ -1747,7 +1788,7 @@ public class ProcedureExecutor<TEnvironment> {
       long lastUpdate = EnvironmentEdgeManager.currentTime();
       try {
         while (isRunning() && keepAlive(lastUpdate)) {
-          Procedure<?> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
+          Procedure<TEnvironment> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
           if (proc == null) {
             continue;
           }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f3f17fa1/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java
index c42dfc4..1215008 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java
@@ -202,6 +202,9 @@ public final class ProcedureUtil {
       builder.setNonce(proc.getNonceKey().getNonce());
     }
 
+    if (proc.hasLock()) {
+      builder.setLocked(true);
+    }
     return builder.build();
   }
 
@@ -255,6 +258,10 @@ public final class ProcedureUtil {
       proc.setNonceKey(new NonceKey(proto.getNonceGroup(), proto.getNonce()));
     }
 
+    if (proto.getLocked()) {
+      proc.lockedWhenLoading();
+    }
+
     ProcedureStateSerializer serializer = null;
 
     if (proto.getStateMessageCount() > 0) {