You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/05/27 00:13:01 UTC

[29/30] hbase git commit: HBASE-14614 Procedure v2 - Core Assignment Manager (Matteo Bertozzi) Move to a new AssignmentManager, one that describes Assignment using a State Machine built on top of ProcedureV2 facility.

http://git-wip-us.apache.org/repos/asf/hbase/blob/9cd5f2d5/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 1bb6118..b648cf2 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -32,6 +32,8 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.DelayQueue;
@@ -113,9 +115,11 @@ public class ProcedureExecutor<TEnvironment> {
    * Internal cleaner that removes the completed procedure results after a TTL.
    * NOTE: This is a special case handled in timeoutLoop().
    *
-   * Since the client code looks more or less like:
+   * <p>Since the client code looks more or less like:
+   * <pre>
    *   procId = master.doOperation()
    *   while (master.getProcResult(procId) == ProcInProgress);
+   * </pre>
    * The master should not throw away the proc result as soon as the procedure is done
    * but should wait a result request from the client (see executor.removeResult(procId))
    * The client will call something like master.isProcDone() or master.getProcResult()
@@ -480,10 +484,10 @@ public class ProcedureExecutor<TEnvironment> {
     // We have numThreads executor + one timer thread used for timing out
     // procedures and triggering periodic procedures.
     this.corePoolSize = numThreads;
-    LOG.info("Starting executor worker threads=" + corePoolSize);
+    LOG.info("Starting ProcedureExecutor Worker threads (ProcExecWrkr)=" + corePoolSize);
 
     // Create the Thread Group for the executors
-    threadGroup = new ThreadGroup("ProcedureExecutor");
+    threadGroup = new ThreadGroup("ProcExecThrdGrp");
 
     // Create the timeout executor
     timeoutExecutor = new TimeoutExecutorThread(threadGroup);
@@ -1077,13 +1081,16 @@ public class ProcedureExecutor<TEnvironment> {
     final Long rootProcId = getRootProcedureId(proc);
     if (rootProcId == null) {
       // The 'proc' was ready to run but the root procedure was rolledback
+      LOG.warn("Rollback because parent is done/rolledback proc=" + proc);
       executeRollback(proc);
       return;
     }
 
     final RootProcedureState procStack = rollbackStack.get(rootProcId);
-    if (procStack == null) return;
-
+    if (procStack == null) {
+      LOG.warn("RootProcedureState is null for " + proc.getProcId());
+      return;
+    }
     do {
       // Try to acquire the execution
       if (!procStack.acquire(proc)) {
@@ -1097,6 +1104,7 @@ public class ProcedureExecutor<TEnvironment> {
               scheduler.yield(proc);
               break;
             case LOCK_EVENT_WAIT:
+              LOG.info("LOCK_EVENT_WAIT rollback..." + proc);
               procStack.unsetRollback();
               break;
             default:
@@ -1114,6 +1122,7 @@ public class ProcedureExecutor<TEnvironment> {
                 scheduler.yield(proc);
                 break;
               case LOCK_EVENT_WAIT:
+                LOG.info("LOCK_EVENT_WAIT can't rollback child running?..." + proc);
                 break;
               default:
                 throw new UnsupportedOperationException();
@@ -1125,16 +1134,21 @@ public class ProcedureExecutor<TEnvironment> {
 
       // Execute the procedure
       assert proc.getState() == ProcedureState.RUNNABLE : proc;
-      switch (acquireLock(proc)) {
+      // Note that lock is NOT about concurrency but rather about ensuring
+      // ownership of a procedure of an entity such as a region or table
+      LockState lockState = acquireLock(proc);
+      switch (lockState) {
         case LOCK_ACQUIRED:
           execProcedure(procStack, proc);
           releaseLock(proc, false);
           break;
         case LOCK_YIELD_WAIT:
+          LOG.info(lockState + " " + proc);
           scheduler.yield(proc);
           break;
         case LOCK_EVENT_WAIT:
-          // someone will wake us up when the lock is available
+          // Someone will wake us up when the lock is available
+          LOG.debug(lockState + " " + proc);
           break;
         default:
           throw new UnsupportedOperationException();
@@ -1150,10 +1164,7 @@ public class ProcedureExecutor<TEnvironment> {
       if (proc.isSuccess()) {
         // update metrics on finishing the procedure
         proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true);
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Finished " + proc + " in " + StringUtils.humanTimeDiff(proc.elapsedTime()));
-        }
+        LOG.info("Finish " + proc + " in " + StringUtils.humanTimeDiff(proc.elapsedTime()));
         // Finalize the procedure state
         if (proc.getProcId() == rootProcId) {
           procedureFinished(proc);
@@ -1178,7 +1189,7 @@ public class ProcedureExecutor<TEnvironment> {
 
   private void releaseLock(final Procedure proc, final boolean force) {
     final TEnvironment env = getEnvironment();
-    // for how the framework works, we know that we will always have the lock
+    // For how the framework works, we know that we will always have the lock
     // when we call releaseLock(), so we can avoid calling proc.hasLock()
     if (force || !proc.holdLock(env)) {
       proc.doReleaseLock(env);
@@ -1193,6 +1204,8 @@ public class ProcedureExecutor<TEnvironment> {
   private LockState executeRollback(final long rootProcId, final RootProcedureState procStack) {
     final Procedure rootProc = procedures.get(rootProcId);
     RemoteProcedureException exception = rootProc.getException();
+    // TODO: This needs doc. The root proc doesn't have an exception. Maybe we are
+    // rolling back because the subprocedure does. Clarify.
     if (exception == null) {
       exception = procStack.getException();
       rootProc.setFailure(exception);
@@ -1269,7 +1282,7 @@ public class ProcedureExecutor<TEnvironment> {
       return LockState.LOCK_YIELD_WAIT;
     } catch (Throwable e) {
       // Catch NullPointerExceptions or similar errors...
-      LOG.fatal("CODE-BUG: Uncatched runtime exception for procedure: " + proc, e);
+      LOG.fatal("CODE-BUG: Uncaught runtime exception fo " + proc, e);
     }
 
     // allows to kill the executor before something is stored to the wal.
@@ -1305,29 +1318,55 @@ public class ProcedureExecutor<TEnvironment> {
   }
 
   /**
-   * Executes the specified procedure
-   *  - calls the doExecute() of the procedure
-   *  - if the procedure execution didn't fail (e.g. invalid user input)
-   *     - ...and returned subprocedures
-   *        - the subprocedures are initialized.
-   *        - the subprocedures are added to the store
-   *        - the subprocedures are added to the runnable queue
-   *        - the procedure is now in a WAITING state, waiting for the subprocedures to complete
-   *     - ...if there are no subprocedure
-   *        - the procedure completed successfully
-   *        - if there is a parent (WAITING)
-   *            - the parent state will be set to RUNNABLE
-   *  - in case of failure
-   *    - the store is updated with the new state
-   *    - the executor (caller of this method) will start the rollback of the procedure
+   * Executes <code>procedure</code>
+   * <ul>
+   *  <li>Calls the doExecute() of the procedure
+   *  <li>If the procedure execution didn't fail (i.e. valid user input)
+   *  <ul>
+   *    <li>...and returned subprocedures
+   *    <ul><li>The subprocedures are initialized.
+   *      <li>The subprocedures are added to the store
+   *      <li>The subprocedures are added to the runnable queue
+   *      <li>The procedure is now in a WAITING state, waiting for the subprocedures to complete
+   *    </ul>
+   *    </li>
+   *   <li>...if there are no subprocedure
+   *    <ul><li>the procedure completed successfully
+   *      <li>if there is a parent (WAITING)
+   *      <li>the parent state will be set to RUNNABLE
+   *    </ul>
+   *   </li>
+   *  </ul>
+   *  </li>
+   *  <li>In case of failure
+   *  <ul>
+   *    <li>The store is updated with the new state</li>
+   *    <li>The executor (caller of this method) will start the rollback of the procedure</li>
+   *  </ul>
+   *  </li>
+   *  </ul>
    */
-  private void execProcedure(final RootProcedureState procStack, final Procedure procedure) {
+  private void execProcedure(final RootProcedureState procStack,
+      final Procedure<TEnvironment> procedure) {
     Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE);
 
-    // Execute the procedure
+    // Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException.
+    // The exception is caught below and then we hurry to the exit without disturbing state. The
+    // idea is that the processing of this procedure will be unsuspended later by an external event
+    // such the report of a region open. TODO: Currently, its possible for two worker threads
+    // to be working on the same procedure concurrently (locking in procedures is NOT about
+    // concurrency but about tying an entity to a procedure; i.e. a region to a particular
+    // procedure instance). This can make for issues if both threads are changing state.
+    // See env.getProcedureScheduler().wakeEvent(regionNode.getProcedureEvent());
+    // in RegionTransitionProcedure#reportTransition for example of Procedure putting
+    // itself back on the scheduler making it possible for two threads running against
+    // the one Procedure. Might be ok if they are both doing different, idempotent sections.
     boolean suspended = false;
+
+    // Whether to 're-' -execute; run through the loop again.
     boolean reExecute = false;
-    Procedure[] subprocs = null;
+
+    Procedure<TEnvironment>[] subprocs = null;
     do {
       reExecute = false;
       try {
@@ -1336,14 +1375,20 @@ public class ProcedureExecutor<TEnvironment> {
           subprocs = null;
         }
       } catch (ProcedureSuspendedException e) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Suspend " + procedure);
+        }
         suspended = true;
       } catch (ProcedureYieldException e) {
         if (LOG.isTraceEnabled()) {
-          LOG.trace("Yield " + procedure + ": " + e.getMessage());
+          LOG.trace("Yield " + procedure + ": " + e.getMessage(), e);
         }
         scheduler.yield(procedure);
         return;
       } catch (InterruptedException e) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Yield interrupt " + procedure + ": " + e.getMessage(), e);
+        }
         handleInterruptedException(procedure, e);
         scheduler.yield(procedure);
         return;
@@ -1357,14 +1402,26 @@ public class ProcedureExecutor<TEnvironment> {
       if (!procedure.isFailed()) {
         if (subprocs != null) {
           if (subprocs.length == 1 && subprocs[0] == procedure) {
-            // quick-shortcut for a state machine like procedure
+            // Procedure returned itself. Quick-shortcut for a state machine-like procedure;
+            // i.e. we go around this loop again rather than go back out on the scheduler queue.
             subprocs = null;
             reExecute = true;
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Short-circuit to next step on pid=" + procedure.getProcId());
+            }
           } else {
-            // yield the current procedure, and make the subprocedure runnable
+            // Yield the current procedure, and make the subprocedure runnable
+            // subprocs may come back 'null'.
             subprocs = initializeChildren(procStack, procedure, subprocs);
+            LOG.info("Initialized subprocedures=" +
+              (subprocs == null? null:
+                Stream.of(subprocs).map(e -> "{" + e.toString() + "}").
+                collect(Collectors.toList()).toString()));
           }
         } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Added to timeoutExecutor " + procedure);
+          }
           timeoutExecutor.add(procedure);
         } else if (!suspended) {
           // No subtask, so we are done
@@ -1388,12 +1445,13 @@ public class ProcedureExecutor<TEnvironment> {
       // executor thread to stop. The statement following the method call below seems to check if
       // store is not running, to prevent scheduling children procedures, re-execution or yield
       // of this procedure. This may need more scrutiny and subsequent cleanup in future
-      // Commit the transaction
+      //
+      // Commit the transaction even if a suspend (state may have changed). Note this append
+      // can take a bunch of time to complete.
       updateStoreOnExec(procStack, procedure, subprocs);
 
       // if the store is not running we are aborting
       if (!store.isRunning()) return;
-
       // if the procedure is kind enough to pass the slot to someone else, yield
       if (procedure.isRunnable() && !suspended &&
           procedure.isYieldAfterExecutionStep(getEnvironment())) {
@@ -1403,14 +1461,14 @@ public class ProcedureExecutor<TEnvironment> {
 
       assert (reExecute && subprocs == null) || !reExecute;
     } while (reExecute);
-
     // Submit the new subprocedures
     if (subprocs != null && !procedure.isFailed()) {
       submitChildrenProcedures(subprocs);
     }
 
-    // if the procedure is complete and has a parent, count down the children latch
-    if (procedure.isFinished() && procedure.hasParent()) {
+    // if the procedure is complete and has a parent, count down the children latch.
+    // If 'suspended', do nothing to change state -- let other threads handle unsuspend event.
+    if (!suspended && procedure.isFinished() && procedure.hasParent()) {
       countDownChildren(procStack, procedure);
     }
   }
@@ -1469,18 +1527,13 @@ public class ProcedureExecutor<TEnvironment> {
     }
 
     // If this procedure is the last child awake the parent procedure
-    final boolean traceEnabled = LOG.isTraceEnabled();
-    if (traceEnabled) {
-      LOG.trace(parent + " child is done: " + procedure);
-    }
-
-    if (parent.childrenCountDown() && parent.getState() == ProcedureState.WAITING) {
-      parent.setState(ProcedureState.RUNNABLE);
+    LOG.info("Finish suprocedure " + procedure);
+    if (parent.tryRunnable()) {
+      // If we succeeded in making the parent runnable -- i.e. all of its
+      // children have completed, move parent to front of the queue.
       store.update(parent);
       scheduler.addFront(parent);
-      if (traceEnabled) {
-        LOG.trace(parent + " all the children finished their work, resume.");
-      }
+      LOG.info("Finished subprocedure(s) of " + parent + "; resume parent processing.");
       return;
     }
   }
@@ -1569,9 +1622,10 @@ public class ProcedureExecutor<TEnvironment> {
   // ==========================================================================
   private final class WorkerThread extends StoppableThread {
     private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE);
+    private Procedure activeProcedure;
 
     public WorkerThread(final ThreadGroup group) {
-      super(group, "ProcExecWorker-" + workerId.incrementAndGet());
+      super(group, "ProcExecWrkr-" + workerId.incrementAndGet());
     }
 
     @Override
@@ -1581,29 +1635,49 @@ public class ProcedureExecutor<TEnvironment> {
 
     @Override
     public void run() {
-      final boolean traceEnabled = LOG.isTraceEnabled();
       long lastUpdate = EnvironmentEdgeManager.currentTime();
-      while (isRunning() && keepAlive(lastUpdate)) {
-        final Procedure procedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
-        if (procedure == null) continue;
-
-        store.setRunningProcedureCount(activeExecutorCount.incrementAndGet());
-        executionStartTime.set(EnvironmentEdgeManager.currentTime());
-        try {
-          if (traceEnabled) {
-            LOG.trace("Trying to start the execution of " + procedure);
+      try {
+        while (isRunning() && keepAlive(lastUpdate)) {
+          this.activeProcedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
+          if (this.activeProcedure == null) continue;
+          int activeCount = activeExecutorCount.incrementAndGet();
+          int runningCount = store.setRunningProcedureCount(activeCount);
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Execute pid=" + this.activeProcedure.getProcId() +
+                " runningCount=" + runningCount + ", activeCount=" + activeCount);
+          }
+          executionStartTime.set(EnvironmentEdgeManager.currentTime());
+          try {
+            executeProcedure(this.activeProcedure);
+          } catch (AssertionError e) {
+            LOG.info("ASSERT pid=" + this.activeProcedure.getProcId(), e);
+            throw e;
+          } finally {
+            activeCount = activeExecutorCount.decrementAndGet();
+            runningCount = store.setRunningProcedureCount(activeCount);
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Halt pid=" + this.activeProcedure.getProcId() +
+                  " runningCount=" + runningCount + ", activeCount=" + activeCount);
+            }
+            this.activeProcedure = null;
+            lastUpdate = EnvironmentEdgeManager.currentTime();
+            executionStartTime.set(Long.MAX_VALUE);
           }
-          executeProcedure(procedure);
-        } finally {
-          store.setRunningProcedureCount(activeExecutorCount.decrementAndGet());
-          lastUpdate = EnvironmentEdgeManager.currentTime();
-          executionStartTime.set(Long.MAX_VALUE);
         }
+      } catch (Throwable t) {
+        LOG.warn("Worker terminating UNNATURALLY " + this.activeProcedure, t);
+      } finally {
+        LOG.debug("Worker terminated.");
       }
-      LOG.debug("Worker thread terminated " + this);
       workerThreads.remove(this);
     }
 
+    @Override
+    public String toString() {
+      Procedure<?> p = this.activeProcedure;
+      return getName() + "(pid=" + (p == null? Procedure.NO_PROC_ID: p.getProcId() + ")");
+    }
+
     /**
      * @return the time since the current procedure is running
      */
@@ -1617,14 +1691,15 @@ public class ProcedureExecutor<TEnvironment> {
     }
   }
 
-  // ==========================================================================
-  //  Timeout Thread
-  // ==========================================================================
+  /**
+   * Runs task on a period such as check for stuck workers.
+   * @see InlineChore
+   */
   private final class TimeoutExecutorThread extends StoppableThread {
     private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<>();
 
     public TimeoutExecutorThread(final ThreadGroup group) {
-      super(group, "ProcedureTimeoutExecutor");
+      super(group, "ProcExecTimeout");
     }
 
     @Override
@@ -1634,7 +1709,7 @@ public class ProcedureExecutor<TEnvironment> {
 
     @Override
     public void run() {
-      final boolean isTraceEnabled = LOG.isTraceEnabled();
+      final boolean traceEnabled = LOG.isTraceEnabled();
       while (isRunning()) {
         final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue);
         if (task == null || task == DelayedUtil.DELAYED_POISON) {
@@ -1643,8 +1718,8 @@ public class ProcedureExecutor<TEnvironment> {
           continue;
         }
 
-        if (isTraceEnabled) {
-          LOG.trace("Trying to start the execution of " + task);
+        if (traceEnabled) {
+          LOG.trace("Executing " + task);
         }
 
         // execute the task
@@ -1665,6 +1740,8 @@ public class ProcedureExecutor<TEnvironment> {
 
     public void add(final Procedure procedure) {
       assert procedure.getState() == ProcedureState.WAITING_TIMEOUT;
+      LOG.info("ADDED " + procedure + "; timeout=" + procedure.getTimeout() +
+          ", timestamp=" + procedure.getTimeoutTimestamp());
       queue.add(new DelayedProcedure(procedure));
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/9cd5f2d5/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java
index bdced10..b148dae 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 
 /**
  * Special procedure used as a chore.
- * instead of bringing the Chore class in (dependencies reason),
+ * Instead of bringing the Chore class in (dependencies reason),
  * we reuse the executor timeout thread for this special case.
  *
  * The assumption is that procedure is used as hook to dispatch other procedures
@@ -43,7 +43,7 @@ public abstract class ProcedureInMemoryChore<TEnvironment> extends Procedure<TEn
   protected abstract void periodicExecute(final TEnvironment env);
 
   @Override
-  protected Procedure[] execute(final TEnvironment env) {
+  protected Procedure<TEnvironment>[] execute(final TEnvironment env) {
     throw new UnsupportedOperationException();
   }
 
@@ -66,4 +66,4 @@ public abstract class ProcedureInMemoryChore<TEnvironment> extends Procedure<TEn
   public void deserializeStateData(final InputStream stream) {
     throw new UnsupportedOperationException();
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/9cd5f2d5/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
index 93d0d5d..233ef57 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
@@ -93,7 +93,7 @@ public interface ProcedureScheduler {
 
   /**
    * Mark the event as not ready.
-   * procedures calling waitEvent() will be suspended.
+   * Procedures calling waitEvent() will be suspended.
    * @param event the event to mark as suspended/not ready
    */
   void suspendEvent(ProcedureEvent event);
@@ -125,6 +125,7 @@ public interface ProcedureScheduler {
    * List lock queues.
    * @return the locks
    */
+  // TODO: This seems to be the wrong place to hang this method.
   List<LockInfo> listLocks();
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/9cd5f2d5/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
new file mode 100644
index 0000000..8d5ff3c
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
+import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedContainerWithTimestamp;
+import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout;
+import org.apache.hadoop.hbase.procedure2.util.StringUtils;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Threads;
+
+import com.google.common.collect.ArrayListMultimap;
+
+/**
+ * A procedure dispatcher that aggregates and sends after elapsed time or after we hit
+ * count threshold. Creates its own threadpool to run RPCs with timeout.
+ * <ul>
+ * <li>Each server queue has a dispatch buffer</li>
+ * <li>Once the dispatch buffer reaches a threshold-size/time we send<li>
+ * </ul>
+ * <p>Call {@link #start()} and then {@link #submitTask(Callable)}. When done,
+ * call {@link #stop()}.
+ */
+@InterfaceAudience.Private
+public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable<TRemote>> {
+  private static final Log LOG = LogFactory.getLog(RemoteProcedureDispatcher.class);
+
+  public static final String THREAD_POOL_SIZE_CONF_KEY =
+      "hbase.procedure.remote.dispatcher.threadpool.size";
+  private static final int DEFAULT_THREAD_POOL_SIZE = 128;
+
+  public static final String DISPATCH_DELAY_CONF_KEY =
+      "hbase.procedure.remote.dispatcher.delay.msec";
+  private static final int DEFAULT_DISPATCH_DELAY = 150;
+
+  public static final String DISPATCH_MAX_QUEUE_SIZE_CONF_KEY =
+      "hbase.procedure.remote.dispatcher.max.queue.size";
+  private static final int DEFAULT_MAX_QUEUE_SIZE = 32;
+
+  private final AtomicBoolean running = new AtomicBoolean(false);
+  private final ConcurrentHashMap<TRemote, BufferNode> nodeMap =
+      new ConcurrentHashMap<TRemote, BufferNode>();
+
+  private final int operationDelay;
+  private final int queueMaxSize;
+  private final int corePoolSize;
+
+  private TimeoutExecutorThread timeoutExecutor;
+  private ThreadPoolExecutor threadPool;
+
+  protected RemoteProcedureDispatcher(Configuration conf) {
+    this.corePoolSize = conf.getInt(THREAD_POOL_SIZE_CONF_KEY, DEFAULT_THREAD_POOL_SIZE);
+    this.operationDelay = conf.getInt(DISPATCH_DELAY_CONF_KEY, DEFAULT_DISPATCH_DELAY);
+    this.queueMaxSize = conf.getInt(DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, DEFAULT_MAX_QUEUE_SIZE);
+  }
+
+  public boolean start() {
+    if (running.getAndSet(true)) {
+      LOG.warn("Already running");
+      return false;
+    }
+
+    LOG.info("Starting procedure remote dispatcher; threads=" + this.corePoolSize +
+      ", queueMaxSize=" + this.queueMaxSize + ", operationDelay=" + this.operationDelay);
+
+    // Create the timeout executor
+    timeoutExecutor = new TimeoutExecutorThread();
+    timeoutExecutor.start();
+
+    // Create the thread pool that will execute RPCs
+    threadPool = Threads.getBoundedCachedThreadPool(corePoolSize, 60L, TimeUnit.SECONDS,
+      Threads.newDaemonThreadFactory(this.getClass().getSimpleName(),
+          getUncaughtExceptionHandler()));
+    return true;
+  }
+
+  public boolean stop() {
+    if (!running.getAndSet(false)) {
+      return false;
+    }
+
+    LOG.info("Stopping procedure remote dispatcher");
+
+    // send stop signals
+    timeoutExecutor.sendStopSignal();
+    threadPool.shutdownNow();
+    return true;
+  }
+
+  public void join() {
+    assert !running.get() : "expected not running";
+
+    // wait the timeout executor
+    timeoutExecutor.awaitTermination();
+    timeoutExecutor = null;
+
+    // wait for the thread pool to terminate
+    threadPool.shutdownNow();
+    try {
+      while (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
+        LOG.warn("Waiting for thread-pool to terminate");
+      }
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted while waiting for thread-pool termination", e);
+    }
+  }
+
+  protected UncaughtExceptionHandler getUncaughtExceptionHandler() {
+    return new UncaughtExceptionHandler() {
+      @Override
+      public void uncaughtException(Thread t, Throwable e) {
+        LOG.warn("Failed to execute remote procedures " + t.getName(), e);
+      }
+    };
+  }
+
+  // ============================================================================================
+  //  Node Helpers
+  // ============================================================================================
+  /**
+   * Add a node that will be able to execute remote procedures
+   * @param key the node identifier
+   */
+  public void addNode(final TRemote key) {
+    assert key != null: "Tried to add a node with a null key";
+    final BufferNode newNode = new BufferNode(key);
+    nodeMap.putIfAbsent(key, newNode);
+  }
+
+  /**
+   * Add a remote rpc. Be sure to check result for successful add.
+   * @param key the node identifier
+   * @return True if we successfully added the operation.
+   */
+  public boolean addOperationToNode(final TRemote key, RemoteProcedure rp) {
+    assert key != null : "found null key for node";
+    BufferNode node = nodeMap.get(key);
+    if (node == null) {
+      return false;
+    }
+    node.add(rp);
+    // Check our node still in the map; could have been removed by #removeNode.
+    return nodeMap.contains(node);
+  }
+
+  /**
+   * Remove a remote node
+   * @param key the node identifier
+   */
+  public boolean removeNode(final TRemote key) {
+    final BufferNode node = nodeMap.remove(key);
+    if (node == null) return false;
+    node.abortOperationsInQueue();
+    return true;
+  }
+
+  // ============================================================================================
+  //  Task Helpers
+  // ============================================================================================
+  protected Future<Void> submitTask(Callable<Void> task) {
+    return threadPool.submit(task);
+  }
+
+  protected Future<Void> submitTask(Callable<Void> task, long delay, TimeUnit unit) {
+    final FutureTask<Void> futureTask = new FutureTask(task);
+    timeoutExecutor.add(new DelayedTask(futureTask, delay, unit));
+    return futureTask;
+  }
+
+  protected abstract void remoteDispatch(TRemote key, Set<RemoteProcedure> operations);
+  protected abstract void abortPendingOperations(TRemote key, Set<RemoteProcedure> operations);
+
+  /**
+   * Data structure with reference to remote operation.
+   */
+  public static abstract class RemoteOperation {
+    private final RemoteProcedure remoteProcedure;
+
+    protected RemoteOperation(final RemoteProcedure remoteProcedure) {
+      this.remoteProcedure = remoteProcedure;
+    }
+
+    public RemoteProcedure getRemoteProcedure() {
+      return remoteProcedure;
+    }
+  }
+
+  /**
+   * Remote procedure reference.
+   * @param <TEnv>
+   * @param <TRemote>
+   */
+  public interface RemoteProcedure<TEnv, TRemote> {
+    RemoteOperation remoteCallBuild(TEnv env, TRemote remote);
+    void remoteCallCompleted(TEnv env, TRemote remote, RemoteOperation response);
+    void remoteCallFailed(TEnv env, TRemote remote, IOException exception);
+  }
+
+  /**
+   * Account of what procedures are running on remote node.
+   * @param <TEnv>
+   * @param <TRemote>
+   */
+  public interface RemoteNode<TEnv, TRemote> {
+    TRemote getKey();
+    void add(RemoteProcedure<TEnv, TRemote> operation);
+    void dispatch();
+  }
+
+  protected ArrayListMultimap<Class<?>, RemoteOperation> buildAndGroupRequestByType(final TEnv env,
+      final TRemote remote, final Set<RemoteProcedure> operations) {
+    final ArrayListMultimap<Class<?>, RemoteOperation> requestByType = ArrayListMultimap.create();
+    for (RemoteProcedure proc: operations) {
+      RemoteOperation operation = proc.remoteCallBuild(env, remote);
+      requestByType.put(operation.getClass(), operation);
+    }
+    return requestByType;
+  }
+
+  protected <T extends RemoteOperation> List<T> fetchType(
+      final ArrayListMultimap<Class<?>, RemoteOperation> requestByType, final Class<T> type) {
+    return (List<T>)requestByType.removeAll(type);
+  }
+
+  // ============================================================================================
+  //  Timeout Helpers
+  // ============================================================================================
+  private final class TimeoutExecutorThread extends Thread {
+    private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<DelayedWithTimeout>();
+
+    public TimeoutExecutorThread() {
+      super("ProcedureDispatcherTimeoutThread");
+    }
+
+    @Override
+    public void run() {
+      while (running.get()) {
+        final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue);
+        if (task == null || task == DelayedUtil.DELAYED_POISON) {
+          // the executor may be shutting down, and the task is just the shutdown request
+          continue;
+        }
+        if (task instanceof DelayedTask) {
+          threadPool.execute(((DelayedTask)task).getObject());
+        } else {
+          ((BufferNode)task).dispatch();
+        }
+      }
+    }
+
+    public void add(final DelayedWithTimeout delayed) {
+      queue.add(delayed);
+    }
+
+    public void remove(final DelayedWithTimeout delayed) {
+      queue.remove(delayed);
+    }
+
+    public void sendStopSignal() {
+      queue.add(DelayedUtil.DELAYED_POISON);
+    }
+
+    public void awaitTermination() {
+      try {
+        final long startTime = EnvironmentEdgeManager.currentTime();
+        for (int i = 0; isAlive(); ++i) {
+          sendStopSignal();
+          join(250);
+          if (i > 0 && (i % 8) == 0) {
+            LOG.warn("Waiting termination of thread " + getName() + ", " +
+              StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime));
+          }
+        }
+      } catch (InterruptedException e) {
+        LOG.warn(getName() + " join wait got interrupted", e);
+      }
+    }
+  }
+
+  // ============================================================================================
+  //  Internals Helpers
+  // ============================================================================================
+
+  /**
+   * Node that contains a set of RemoteProcedures
+   */
+  protected final class BufferNode extends DelayedContainerWithTimestamp<TRemote>
+      implements RemoteNode<TEnv, TRemote> {
+    private Set<RemoteProcedure> operations;
+
+    protected BufferNode(final TRemote key) {
+      super(key, 0);
+    }
+
+    public TRemote getKey() {
+      return getObject();
+    }
+
+    public synchronized void add(final RemoteProcedure operation) {
+      if (this.operations == null) {
+        this.operations = new HashSet<>();
+        setTimeout(EnvironmentEdgeManager.currentTime() + operationDelay);
+        timeoutExecutor.add(this);
+      }
+      this.operations.add(operation);
+      if (this.operations.size() > queueMaxSize) {
+        timeoutExecutor.remove(this);
+        dispatch();
+      }
+    }
+
+    public synchronized void dispatch() {
+      if (operations != null) {
+        remoteDispatch(getKey(), operations);
+        this.operations = null;
+      }
+    }
+
+    public synchronized void abortOperationsInQueue() {
+      if (operations != null) {
+        abortPendingOperations(getKey(), operations);
+        this.operations = null;
+      }
+    }
+
+    @Override
+    public String toString() {
+      return super.toString() + ", operations=" + this.operations;
+    }
+  }
+
+  /**
+   * Delayed object that holds a FutureTask.
+   * used to submit something later to the thread-pool.
+   */
+  private static final class DelayedTask extends DelayedContainerWithTimestamp<FutureTask<Void>> {
+    public DelayedTask(final FutureTask<Void> task, final long delay, final TimeUnit unit) {
+      super(task, EnvironmentEdgeManager.currentTime() + unit.toMillis(delay));
+    }
+  };
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/9cd5f2d5/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java
index 1a84070..64bb278 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java
@@ -27,12 +27,13 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.SequentialProcedureData;
 
 /**
- * A SequentialProcedure describes one step in a procedure chain.
+ * A SequentialProcedure describes one step in a procedure chain:
+ * <pre>
  *   -&gt; Step 1 -&gt; Step 2 -&gt; Step 3
- *
+ * </pre>
  * The main difference from a base Procedure is that the execute() of a
- * SequentialProcedure will be called only once, there will be no second
- * execute() call once the child are finished. which means once the child
+ * SequentialProcedure will be called only once; there will be no second
+ * execute() call once the children are finished. which means once the child
  * of a SequentialProcedure are completed the SequentialProcedure is completed too.
  */
 @InterfaceAudience.Private

http://git-wip-us.apache.org/repos/asf/hbase/blob/9cd5f2d5/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
index 0590a93..becd9b7 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
@@ -21,9 +21,10 @@ package org.apache.hadoop.hbase.procedure2;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -34,7 +35,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.StateMa
 /**
  * Procedure described by a series of steps.
  *
- * The procedure implementor must have an enum of 'states', describing
+ * <p>The procedure implementor must have an enum of 'states', describing
  * the various step of the procedure.
  * Once the procedure is running, the procedure-framework will call executeFromState()
  * using the 'state' provided by the user. The first call to executeFromState()
@@ -56,7 +57,7 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
   private int stateCount = 0;
   private int[] states = null;
 
-  private ArrayList<Procedure> subProcList = null;
+  private List<Procedure<TEnvironment>> subProcList = null;
 
   protected enum Flow {
     HAS_MORE_STATE,
@@ -70,7 +71,7 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
    *         Flow.HAS_MORE_STATE if there is another step.
    */
   protected abstract Flow executeFromState(TEnvironment env, TState state)
-    throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException;
+  throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException;
 
   /**
    * called to perform the rollback of the specified state
@@ -125,12 +126,15 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
    * Add a child procedure to execute
    * @param subProcedure the child procedure
    */
-  protected void addChildProcedure(Procedure... subProcedure) {
+  protected void addChildProcedure(Procedure<TEnvironment>... subProcedure) {
+    if (subProcedure == null) return;
+    final int len = subProcedure.length;
+    if (len == 0) return;
     if (subProcList == null) {
-      subProcList = new ArrayList<>(subProcedure.length);
+      subProcList = new ArrayList<>(len);
     }
-    for (int i = 0; i < subProcedure.length; ++i) {
-      Procedure proc = subProcedure[i];
+    for (int i = 0; i < len; ++i) {
+      Procedure<TEnvironment> proc = subProcedure[i];
       if (!proc.hasOwner()) proc.setOwner(getOwner());
       subProcList.add(proc);
     }
@@ -138,27 +142,23 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
 
   @Override
   protected Procedure[] execute(final TEnvironment env)
-      throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
+  throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
     updateTimestamp();
     try {
       failIfAborted();
 
       if (!hasMoreState() || isFailed()) return null;
-
       TState state = getCurrentState();
       if (stateCount == 0) {
         setNextState(getStateId(state));
       }
-
       stateFlow = executeFromState(env, state);
       if (!hasMoreState()) setNextState(EOF_STATE);
-
-      if (subProcList != null && subProcList.size() != 0) {
+      if (subProcList != null && !subProcList.isEmpty()) {
         Procedure[] subProcedures = subProcList.toArray(new Procedure[subProcList.size()]);
         subProcList = null;
         return subProcedures;
       }
-
       return (isWaiting() || isFailed() || !hasMoreState()) ? null : new Procedure[] {this};
     } finally {
       updateTimestamp();

http://git-wip-us.apache.org/repos/asf/hbase/blob/9cd5f2d5/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
index c03e326..9e53f42 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
@@ -52,8 +52,8 @@ public class NoopProcedureStore extends ProcedureStoreBase {
   }
 
   @Override
-  public void setRunningProcedureCount(final int count) {
-    // no-op
+  public int setRunningProcedureCount(final int count) {
+    return count;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/9cd5f2d5/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
index 385cedb..a690c81 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
@@ -153,8 +153,9 @@ public interface ProcedureStore {
   /**
    * Set the number of procedure running.
    * This can be used, for example, by the store to know how long to wait before a sync.
+   * @return how many procedures are running (may not be same as <code>count</code>).
    */
-  void setRunningProcedureCount(int count);
+  int setRunningProcedureCount(int count);
 
   /**
    * Acquire the lease for the procedure store.

http://git-wip-us.apache.org/repos/asf/hbase/blob/9cd5f2d5/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
index 012ddeb..95a1ef6 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
@@ -155,9 +155,23 @@ public class ProcedureWALFile implements Comparable<ProcedureWALFile> {
     this.logSize += size;
   }
 
-  public void removeFile() throws IOException {
+  public void removeFile(final Path walArchiveDir) throws IOException {
     close();
-    fs.delete(logFile, false);
+    boolean archived = false;
+    if (walArchiveDir != null) {
+      Path archivedFile = new Path(walArchiveDir, logFile.getName());
+      LOG.info("ARCHIVED (TODO: FILES ARE NOT PURGED FROM ARCHIVE!) " + logFile + " to " + archivedFile);
+      if (!fs.rename(logFile, archivedFile)) {
+        LOG.warn("Failed archive of " + logFile + ", deleting");
+      } else {
+        archived = true;
+      }
+    }
+    if (!archived) {
+      if (!fs.delete(logFile, false)) {
+        LOG.warn("Failed delete of " + logFile);
+      }
+    }
   }
 
   public void setProcIds(long minId, long maxId) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/9cd5f2d5/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
index c672045..0a05e6e 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
@@ -83,11 +83,11 @@ public class ProcedureWALFormatReader {
   //
   //  Fast Start: INIT/INSERT record and StackIDs
   // ---------------------------------------------
-  // We have two special record, INIT and INSERT that tracks the first time
-  // the procedure was added to the WAL. We can use that information to be able
-  // to start procedures before reaching the end of the WAL, or before reading all the WALs.
-  // but in some cases the WAL with that record can be already gone.
-  // In alternative we can use the stackIds on each procedure,
+  // We have two special records, INIT and INSERT, that track the first time
+  // the procedure was added to the WAL. We can use this information to be able
+  // to start procedures before reaching the end of the WAL, or before reading all WALs.
+  // But in some cases, the WAL with that record can be already gone.
+  // As an alternative, we can use the stackIds on each procedure,
   // to identify when a procedure is ready to start.
   // If there are gaps in the sum of the stackIds we need to read more WALs.
   //
@@ -107,16 +107,16 @@ public class ProcedureWALFormatReader {
    * Global tracker that will be used by the WALProcedureStore after load.
    * If the last WAL was closed cleanly we already have a full tracker ready to be used.
    * If the last WAL was truncated (e.g. master killed) the tracker will be empty
-   * and the 'partial' flag will be set. In this case on WAL replay we are going
+   * and the 'partial' flag will be set. In this case, on WAL replay we are going
    * to rebuild the tracker.
    */
   private final ProcedureStoreTracker tracker;
-  // private final boolean hasFastStartSupport;
+  // TODO: private final boolean hasFastStartSupport;
 
   /**
    * If tracker for a log file is partial (see {@link ProcedureStoreTracker#partial}), we
    * re-build the list of procedures updated in that WAL because we need it for log cleaning
-   * purpose. If all procedures updated in a WAL are found to be obsolete, it can be safely deleted.
+   * purposes. If all procedures updated in a WAL are found to be obsolete, it can be safely deleted.
    * (see {@link WALProcedureStore#removeInactiveLogs()}).
    * However, we don't need deleted part of a WAL's tracker for this purpose, so we don't bother
    * re-building it.
@@ -137,7 +137,7 @@ public class ProcedureWALFormatReader {
   public void read(final ProcedureWALFile log) throws IOException {
     localTracker = log.getTracker().isPartial() ? log.getTracker() : null;
     if (localTracker != null) {
-      LOG.info("Rebuilding tracker for log - " + log);
+      LOG.info("Rebuilding tracker for " + log);
     }
 
     FSDataInputStream stream = log.getStream();
@@ -146,7 +146,7 @@ public class ProcedureWALFormatReader {
       while (hasMore) {
         ProcedureWALEntry entry = ProcedureWALFormat.readEntry(stream);
         if (entry == null) {
-          LOG.warn("nothing left to decode. exiting with missing EOF");
+          LOG.warn("Nothing left to decode. Exiting with missing EOF, log=" + log);
           break;
         }
         switch (entry.getType()) {
@@ -171,7 +171,7 @@ public class ProcedureWALFormatReader {
         }
       }
     } catch (InvalidProtocolBufferException e) {
-      LOG.error("got an exception while reading the procedure WAL: " + log, e);
+      LOG.error("While reading procedure from " + log, e);
       loader.markCorruptedWAL(log, e);
     }
 
@@ -211,7 +211,7 @@ public class ProcedureWALFormatReader {
     maxProcId = Math.max(maxProcId, proc.getProcId());
     if (isRequired(proc.getProcId())) {
       if (LOG.isTraceEnabled()) {
-        LOG.trace("read " + entry.getType() + " entry " + proc.getProcId());
+        LOG.trace("Read " + entry.getType() + " entry " + proc.getProcId());
       }
       localProcedureMap.add(proc);
       if (tracker.isPartial()) {
@@ -296,7 +296,7 @@ public class ProcedureWALFormatReader {
   //      replayOrderHead = C <-> B <-> E <-> D <-> A <-> G
   //
   //  We also have a lazy grouping by "root procedure", and a list of
-  //  unlinked procedure. If after reading all the WALs we have unlinked
+  //  unlinked procedures. If after reading all the WALs we have unlinked
   //  procedures it means that we had a missing WAL or a corruption.
   //      rootHead = A <-> D <-> G
   //                 B     E
@@ -639,17 +639,17 @@ public class ProcedureWALFormatReader {
      * "ready" means that we all the information that we need in-memory.
      *
      * Example-1:
-     * We have two WALs, we start reading fronm the newest (wal-2)
+     * We have two WALs, we start reading from the newest (wal-2)
      *    wal-2 | C B |
      *    wal-1 | A B C |
      *
      * If C and B don't depend on A (A is not the parent), we can start them
-     * before reading wal-1. If B is the only one with parent A we can start C
-     * and read one more WAL before being able to start B.
+     * before reading wal-1. If B is the only one with parent A we can start C.
+     * We have to read one more WAL before being able to start B.
      *
      * How do we know with the only information in B that we are not ready.
      *  - easy case, the parent is missing from the global map
-     *  - more complex case we look at the Stack IDs
+     *  - more complex case we look at the Stack IDs.
      *
      * The Stack-IDs are added to the procedure order as incremental index
      * tracking how many times that procedure was executed, which is equivalent
@@ -664,7 +664,7 @@ public class ProcedureWALFormatReader {
      * executed before.
      * To identify when a Procedure is ready we do the sum of the stackIds of
      * the procedure and the parent. if the stackIdSum is equals to the
-     * sum of {1..maxStackId} then everything we need is avaiable.
+     * sum of {1..maxStackId} then everything we need is available.
      *
      * Example-2
      *    wal-2 | A |              A stackIds = [0, 2]
@@ -676,7 +676,7 @@ public class ProcedureWALFormatReader {
       assert !rootEntry.hasParent() : "expected root procedure, got " + rootEntry;
 
       if (rootEntry.isFinished()) {
-        // if the root procedure is finished, sub-procedures should be gone
+        // If the root procedure is finished, sub-procedures should be gone
         if (rootEntry.childHead != null) {
           LOG.error("unexpected active children for root-procedure: " + rootEntry);
           for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/9cd5f2d5/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index 4712c30..1791cae 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -66,6 +66,7 @@ import com.google.common.annotations.VisibleForTesting;
 @InterfaceStability.Evolving
 public class WALProcedureStore extends ProcedureStoreBase {
   private static final Log LOG = LogFactory.getLog(WALProcedureStore.class);
+  public static final String LOG_PREFIX = "pv2-";
 
   public interface LeaseRecovery {
     void recoverFileLease(FileSystem fs, Path path) throws IOException;
@@ -124,6 +125,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
   private final Configuration conf;
   private final FileSystem fs;
   private final Path walDir;
+  private final Path walArchiveDir;
 
   private final AtomicReference<Throwable> syncException = new AtomicReference<>();
   private final AtomicBoolean loading = new AtomicBoolean(true);
@@ -185,9 +187,15 @@ public class WALProcedureStore extends ProcedureStoreBase {
 
   public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path walDir,
       final LeaseRecovery leaseRecovery) {
+    this(conf, fs, walDir, null, leaseRecovery);
+  }
+
+  public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path walDir,
+      final Path walArchiveDir, final LeaseRecovery leaseRecovery) {
     this.fs = fs;
     this.conf = conf;
     this.walDir = walDir;
+    this.walArchiveDir = walArchiveDir;
     this.leaseRecovery = leaseRecovery;
   }
 
@@ -239,6 +247,16 @@ public class WALProcedureStore extends ProcedureStoreBase {
       }
     };
     syncThread.start();
+
+    // Create archive dir up front. Rename won't work w/o it up on HDFS.
+    if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) {
+      if (this.fs.mkdirs(this.walArchiveDir)) {
+        if (LOG.isDebugEnabled()) LOG.debug("Created Procedure Store WAL archive dir " +
+            this.walArchiveDir);
+      } else {
+        LOG.warn("Failed create of " + this.walArchiveDir);
+      }
+    }
   }
 
   @Override
@@ -292,9 +310,9 @@ public class WALProcedureStore extends ProcedureStoreBase {
   }
 
   @Override
-  public void setRunningProcedureCount(final int count) {
-    LOG.debug("Set running procedure count=" + count + ", slots=" + slots.length);
+  public int setRunningProcedureCount(final int count) {
     this.runningProcCount = count > 0 ? Math.min(count, slots.length) : slots.length;
+    return this.runningProcCount;
   }
 
   public ProcedureStoreTracker getStoreTracker() {
@@ -343,7 +361,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
           if (LOG.isDebugEnabled()) {
             LOG.debug("Someone else created new logs. Expected maxLogId < " + flushLogId);
           }
-          logs.getLast().removeFile();
+          logs.getLast().removeFile(this.walArchiveDir);
           continue;
         }
 
@@ -955,7 +973,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
     // but we should check if someone else has created new files
     if (getMaxLogId(getLogFiles()) > flushLogId) {
       LOG.warn("Someone else created new logs. Expected maxLogId < " + flushLogId);
-      logs.getLast().removeFile();
+      logs.getLast().removeFile(this.walArchiveDir);
       return false;
     }
 
@@ -1047,7 +1065,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
     // We keep track of which procedures are holding the oldest WAL in 'holdingCleanupTracker'.
     // once there is nothing olding the oldest WAL we can remove it.
     while (logs.size() > 1 && holdingCleanupTracker.isEmpty()) {
-      removeLogFile(logs.getFirst());
+      removeLogFile(logs.getFirst(), walArchiveDir);
       buildHoldingCleanupTracker();
     }
 
@@ -1079,8 +1097,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
   private void removeAllLogs(long lastLogId) {
     if (logs.size() <= 1) return;
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Remove all state logs with ID less than " + lastLogId);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Remove all state logs with ID less than " + lastLogId);
     }
 
     boolean removed = false;
@@ -1089,7 +1107,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
       if (lastLogId < log.getLogId()) {
         break;
       }
-      removeLogFile(log);
+      removeLogFile(log, walArchiveDir);
       removed = true;
     }
 
@@ -1098,15 +1116,15 @@ public class WALProcedureStore extends ProcedureStoreBase {
     }
   }
 
-  private boolean removeLogFile(final ProcedureWALFile log) {
+  private boolean removeLogFile(final ProcedureWALFile log, final Path walArchiveDir) {
     try {
       if (LOG.isTraceEnabled()) {
         LOG.trace("Removing log=" + log);
       }
-      log.removeFile();
+      log.removeFile(walArchiveDir);
       logs.remove(log);
       if (LOG.isDebugEnabled()) {
-        LOG.info("Removed log=" + log + " activeLogs=" + logs);
+        LOG.info("Removed log=" + log + ", activeLogs=" + logs);
       }
       assert logs.size() > 0 : "expected at least one log";
     } catch (IOException e) {
@@ -1128,7 +1146,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
   }
 
   protected Path getLogFilePath(final long logId) throws IOException {
-    return new Path(walDir, String.format("state-%020d.log", logId));
+    return new Path(walDir, String.format(LOG_PREFIX + "%020d.log", logId));
   }
 
   private static long getLogIdFromName(final String name) {
@@ -1141,7 +1159,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
     @Override
     public boolean accept(Path path) {
       String name = path.getName();
-      return name.startsWith("state-") && name.endsWith(".log");
+      return name.startsWith(LOG_PREFIX) && name.endsWith(".log");
     }
   };
 
@@ -1192,7 +1210,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
         }
 
         maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName()));
-        ProcedureWALFile log = initOldLog(logFiles[i]);
+        ProcedureWALFile log = initOldLog(logFiles[i], this.walArchiveDir);
         if (log != null) {
           this.logs.add(log);
         }
@@ -1222,21 +1240,22 @@ public class WALProcedureStore extends ProcedureStoreBase {
   /**
    * Loads given log file and it's tracker.
    */
-  private ProcedureWALFile initOldLog(final FileStatus logFile) throws IOException {
+  private ProcedureWALFile initOldLog(final FileStatus logFile, final Path walArchiveDir)
+  throws IOException {
     final ProcedureWALFile log = new ProcedureWALFile(fs, logFile);
     if (logFile.getLen() == 0) {
       LOG.warn("Remove uninitialized log: " + logFile);
-      log.removeFile();
+      log.removeFile(walArchiveDir);
       return null;
     }
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Opening state-log: " + logFile);
+      LOG.debug("Opening Pv2 " + logFile);
     }
     try {
       log.open();
     } catch (ProcedureWALFormat.InvalidWALDataException e) {
       LOG.warn("Remove uninitialized log: " + logFile, e);
-      log.removeFile();
+      log.removeFile(walArchiveDir);
       return null;
     } catch (IOException e) {
       String msg = "Unable to read state log: " + logFile;

http://git-wip-us.apache.org/repos/asf/hbase/blob/9cd5f2d5/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java
index cde37bd..faf8e7e 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
+// FIX namings. TODO.
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public final class DelayedUtil {
@@ -148,6 +149,9 @@ public final class DelayedUtil {
     }
   }
 
+  /**
+   * Has a timeout.
+   */
   public static class DelayedContainerWithTimestamp<T> extends DelayedContainer<T> {
     private long timeout;
 
@@ -165,4 +169,4 @@ public final class DelayedUtil {
       this.timeout = timeout;
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/9cd5f2d5/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java
index 408cffd..78daf5a 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java
@@ -42,7 +42,7 @@ public class TestProcedureToString {
    */
   static class BasicProcedure extends Procedure<BasicProcedureEnv> {
     @Override
-    protected Procedure<?>[] execute(BasicProcedureEnv env)
+    protected Procedure<BasicProcedureEnv>[] execute(BasicProcedureEnv env)
         throws ProcedureYieldException, InterruptedException {
       return new Procedure [] {this};
     }
@@ -78,8 +78,6 @@ public class TestProcedureToString {
     }
   }
 
-  
-
   /**
    * Test that I can override the toString for its state value.
    * @throws ProcedureYieldException