You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2015/05/28 20:49:29 UTC

[2/2] hbase git commit: HBASE-13476 Procedure v2 - Add Replay Order logic for child procedures

HBASE-13476 Procedure v2 - Add Replay Order logic for child procedures


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/24ef755f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/24ef755f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/24ef755f

Branch: refs/heads/branch-1
Commit: 24ef755f83faf17ff35735badac66f0c8d250a5a
Parents: 2f9851a
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Thu May 28 19:33:22 2015 +0100
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Thu May 28 19:42:42 2015 +0100

----------------------------------------------------------------------
 .../hadoop/hbase/procedure2/Procedure.java      |  14 +
 .../hbase/procedure2/ProcedureExecutor.java     | 126 ++--
 .../hbase/procedure2/store/ProcedureStore.java  |  56 +-
 .../store/wal/ProcedureWALFormat.java           |   8 +-
 .../store/wal/ProcedureWALFormatReader.java     | 592 +++++++++++++++++--
 .../procedure2/store/wal/WALProcedureStore.java |  45 +-
 .../procedure2/ProcedureTestingUtility.java     |   6 +-
 .../procedure2/TestProcedureExecution.java      |   2 +-
 .../hbase/procedure2/TestProcedureRecovery.java |   2 +-
 .../procedure2/TestProcedureReplayOrder.java    | 162 ++---
 .../store/wal/TestWALProcedureStore.java        | 324 +++++++++-
 .../org/apache/hadoop/hbase/master/HMaster.java |   5 +-
 .../procedure/MasterProcedureConstants.java     |  13 +
 hbase-server/src/test/resources/hbase-site.xml  |   7 +
 14 files changed, 1158 insertions(+), 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/24ef755f/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
index 338fcad..6abf2c5 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
@@ -519,6 +519,20 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
     return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
   }
 
+  /**
+   * Get an hashcode for the specified Procedure ID
+   * @return the hashcode for the specified procId
+   */
+  public static long getProcIdHashCode(final long procId) {
+    long h = procId;
+    h ^= h >> 16;
+    h *= 0x85ebca6b;
+    h ^= h >> 13;
+    h *= 0xc2b2ae35;
+    h ^= h >> 16;
+    return h;
+  }
+
   /*
    * Helper to lookup the root Procedure ID given a specified procedure.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/24ef755f/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 6e87997..59b346a 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -28,7 +28,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.HashSet;
-import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -43,6 +42,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
 import org.apache.hadoop.hbase.procedure2.util.StringUtils;
 import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue;
 import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever;
@@ -264,45 +264,70 @@ public class ProcedureExecutor<TEnvironment> {
     this.conf = conf;
   }
 
-  private List<Map.Entry<Long, RootProcedureState>> load() throws IOException {
+  private void load(final boolean abortOnCorruption) throws IOException {
     Preconditions.checkArgument(completed.isEmpty());
     Preconditions.checkArgument(rollbackStack.isEmpty());
     Preconditions.checkArgument(procedures.isEmpty());
     Preconditions.checkArgument(waitingTimeout.isEmpty());
     Preconditions.checkArgument(runnables.size() == 0);
 
-    // 1. Load the procedures
-    Iterator<Procedure> loader = store.load();
-    if (loader == null) {
-      lastProcId.set(0);
-      return null;
-    }
+    store.load(new ProcedureStore.ProcedureLoader() {
+      @Override
+      public void setMaxProcId(long maxProcId) {
+        assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()";
+        LOG.debug("load procedures maxProcId=" + maxProcId);
+        lastProcId.set(maxProcId);
+      }
 
-    long logMaxProcId = 0;
-    int runnablesCount = 0;
-    while (loader.hasNext()) {
-      Procedure proc = loader.next();
-      proc.beforeReplay(getEnvironment());
-      procedures.put(proc.getProcId(), proc);
-      logMaxProcId = Math.max(logMaxProcId, proc.getProcId());
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Loading procedure state=" + proc.getState() +
-            " isFailed=" + proc.hasException() + ": " + proc);
+      @Override
+      public void load(ProcedureIterator procIter) throws IOException {
+        loadProcedures(procIter, abortOnCorruption);
       }
+
+      @Override
+      public void handleCorrupted(ProcedureIterator procIter) throws IOException {
+        int corruptedCount = 0;
+        while (procIter.hasNext()) {
+          Procedure proc = procIter.next();
+          LOG.error("corrupted procedure: " + proc);
+          corruptedCount++;
+        }
+        if (abortOnCorruption && corruptedCount > 0) {
+          throw new IOException("found " + corruptedCount + " procedures on replay");
+        }
+      }
+    });
+  }
+
+  private void loadProcedures(final ProcedureIterator procIter,
+      final boolean abortOnCorruption) throws IOException {
+    // 1. Build the rollback stack
+    int runnablesCount = 0;
+    while (procIter.hasNext()) {
+      Procedure proc = procIter.next();
       if (!proc.hasParent() && !proc.isFinished()) {
         rollbackStack.put(proc.getProcId(), new RootProcedureState());
       }
+      // add the procedure to the map
+      proc.beforeReplay(getEnvironment());
+      procedures.put(proc.getProcId(), proc);
+
       if (proc.getState() == ProcedureState.RUNNABLE) {
         runnablesCount++;
       }
     }
-    assert lastProcId.get() < 0;
-    lastProcId.set(logMaxProcId);
 
     // 2. Initialize the stacks
-    TreeSet<Procedure> runnableSet = null;
+    ArrayList<Procedure> runnableList = new ArrayList(runnablesCount);
     HashSet<Procedure> waitingSet = null;
-    for (final Procedure proc: procedures.values()) {
+    procIter.reset();
+    while (procIter.hasNext()) {
+      Procedure proc = procIter.next();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("Loading procedure state=%s isFailed=%s: %s",
+                    proc.getState(), proc.hasException(), proc));
+      }
+
       Long rootProcId = getRootProcedureId(proc);
       if (rootProcId == null) {
         // The 'proc' was ready to run but the root procedure was rolledback?
@@ -312,10 +337,11 @@ public class ProcedureExecutor<TEnvironment> {
 
       if (!proc.hasParent() && proc.isFinished()) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("The procedure is completed state=" + proc.getState() +
-              " isFailed=" + proc.hasException() + ": " + proc);
+          LOG.debug(String.format("The procedure is completed state=%s isFailed=%s",
+                    proc.getState(), proc.hasException()));
         }
         assert !rollbackStack.containsKey(proc.getProcId());
+        procedures.remove(proc.getProcId());
         completed.put(proc.getProcId(), newResultFromProcedure(proc));
         continue;
       }
@@ -333,10 +359,7 @@ public class ProcedureExecutor<TEnvironment> {
 
       switch (proc.getState()) {
         case RUNNABLE:
-          if (runnableSet == null) {
-            runnableSet = new TreeSet<Procedure>();
-          }
-          runnableSet.add(proc);
+          runnableList.add(proc);
           break;
         case WAITING_TIMEOUT:
           if (waitingSet == null) {
@@ -361,7 +384,7 @@ public class ProcedureExecutor<TEnvironment> {
     }
 
     // 3. Validate the stacks
-    List<Map.Entry<Long, RootProcedureState>> corrupted = null;
+    int corruptedCount = 0;
     Iterator<Map.Entry<Long, RootProcedureState>> itStack = rollbackStack.entrySet().iterator();
     while (itStack.hasNext()) {
       Map.Entry<Long, RootProcedureState> entry = itStack.next();
@@ -369,32 +392,49 @@ public class ProcedureExecutor<TEnvironment> {
       if (procStack.isValid()) continue;
 
       for (Procedure proc: procStack.getSubprocedures()) {
+        LOG.error("corrupted procedure: " + proc);
         procedures.remove(proc.getProcId());
-        if (runnableSet != null) runnableSet.remove(proc);
+        runnableList.remove(proc);
         if (waitingSet != null) waitingSet.remove(proc);
+        corruptedCount++;
       }
       itStack.remove();
-      if (corrupted == null) {
-        corrupted = new ArrayList<Map.Entry<Long, RootProcedureState>>();
-      }
-      corrupted.add(entry);
+    }
+
+    if (abortOnCorruption && corruptedCount > 0) {
+      throw new IOException("found " + corruptedCount + " procedures on replay");
     }
 
     // 4. Push the runnables
-    if (runnableSet != null) {
-      // TODO: See ProcedureWALFormatReader.readInitEntry() some procedure
-      // may be started way before this stuff.
-      for (Procedure proc: runnableSet) {
+    if (!runnableList.isEmpty()) {
+      // TODO: See ProcedureWALFormatReader#hasFastStartSupport
+      // some procedure may be started way before this stuff.
+      for (int i = runnableList.size() - 1; i >= 0; --i) {
+        Procedure proc = runnableList.get(i);
         if (!proc.hasParent()) {
           sendProcedureLoadedNotification(proc.getProcId());
         }
-        runnables.addBack(proc);
+        if (proc.wasExecuted()) {
+          runnables.addFront(proc);
+        } else {
+          // if it was not in execution, it can wait.
+          runnables.addBack(proc);
+        }
       }
     }
-    return corrupted;
   }
 
-  public void start(int numThreads) throws IOException {
+  /**
+   * Start the procedure executor.
+   * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to
+   * recover the lease, and ensure a single executor, and start the procedure
+   * replay to resume and recover the previous pending and in-progress perocedures.
+   *
+   * @param numThreads number of threads available for procedure execution.
+   * @param abortOnCorruption true if you want to abort your service in case
+   *          a corrupted procedure is found on replay. otherwise false.
+   */
+  public void start(int numThreads, boolean abortOnCorruption) throws IOException {
     if (running.getAndSet(true)) {
       LOG.warn("Already running");
       return;
@@ -427,11 +467,11 @@ public class ProcedureExecutor<TEnvironment> {
     store.recoverLease();
 
     // TODO: Split in two steps.
-    // TODO: Handle corrupted procedure returned (probably just a WARN)
+    // TODO: Handle corrupted procedures (currently just a warn)
     // The first one will make sure that we have the latest id,
     // so we can start the threads and accept new procedures.
     // The second step will do the actual load of old procedures.
-    load();
+    load(abortOnCorruption);
 
     // Start the executors. Here we must have the lastProcId set.
     for (int i = 0; i < threads.length; ++i) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/24ef755f/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
index 06bfa44..a05c115 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hbase.procedure2.store;
 
 import java.io.IOException;
-import java.util.Iterator;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
@@ -46,6 +45,57 @@ public interface ProcedureStore {
   }
 
   /**
+   * An Iterator over a collection of Procedure
+   */
+  public interface ProcedureIterator {
+    /**
+     * Reset the Iterator by seeking to the beginning of the list.
+     */
+    void reset();
+
+    /**
+     * Returns true if the iterator has more elements.
+     * (In other words, returns true if next() would return a Procedure
+     * rather than throwing an exception.)
+     * @return true if the iterator has more procedures
+     */
+    boolean hasNext();
+
+    /**
+     * Returns the next procedure in the iteration.
+     * @throws IOException if there was an error fetching/deserializing the procedure
+     * @throws NoSuchElementException if the iteration has no more elements
+     * @return the next procedure in the iteration.
+     */
+    Procedure next() throws IOException;
+  }
+
+  /**
+   * Interface passed to the ProcedureStore.load() method to handle the store-load events.
+   */
+  public interface ProcedureLoader {
+    /**
+     * Called by ProcedureStore.load() to notify about the maximum proc-id in the store.
+     * @param maxProcId the highest proc-id in the store
+     */
+    void setMaxProcId(long maxProcId);
+
+    /**
+     * Called by the ProcedureStore.load() every time a set of procedures are ready to be executed.
+     * The ProcedureIterator passed to the method, has the procedure sorted in replay-order.
+     * @param procIter iterator over the procedures ready to be added to the executor.
+     */
+    void load(ProcedureIterator procIter) throws IOException;
+
+    /**
+     * Called by the ProcedureStore.load() in case we have procedures not-ready to be added to
+     * the executor, which probably means they are corrupted since some information/link is missing.
+     * @param procIter iterator over the procedures not ready to be added to the executor, corrupted
+     */
+    void handleCorrupted(ProcedureIterator procIter) throws IOException;
+  }
+
+  /**
    * Add the listener to the notification list.
    * @param listener The AssignmentListener to register
    */
@@ -87,9 +137,9 @@ public interface ProcedureStore {
 
   /**
    * Load the Procedures in the store.
-   * @return the set of procedures present in the store
+   * @param loader the ProcedureLoader that will handle the store-load events
    */
-  Iterator<Procedure> load() throws IOException;
+  void load(ProcedureLoader loader) throws IOException;
 
   /**
    * When a procedure is submitted to the executor insert(proc, null) will be called.

http://git-wip-us.apache.org/repos/asf/hbase/blob/24ef755f/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
index 17432ac..c75c141 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.hbase.io.util.StreamUtils;
 import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
 import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
@@ -63,14 +64,14 @@ public final class ProcedureWALFormat {
     }
   }
 
-  interface Loader {
+  interface Loader extends ProcedureLoader {
     void removeLog(ProcedureWALFile log);
     void markCorruptedWAL(ProcedureWALFile log, IOException e);
   }
 
   private ProcedureWALFormat() {}
 
-  public static Iterator<Procedure> load(final Iterator<ProcedureWALFile> logs,
+  public static void load(final Iterator<ProcedureWALFile> logs,
       final ProcedureStoreTracker tracker, final Loader loader) throws IOException {
     ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker);
     tracker.setKeepDeletes(true);
@@ -84,14 +85,13 @@ public final class ProcedureWALFormat {
           log.close();
         }
       }
+      reader.finalize(loader);
       // The tracker is now updated with all the procedures read from the logs
       tracker.setPartialFlag(false);
       tracker.resetUpdates();
     } finally {
       tracker.setKeepDeletes(false);
     }
-    // TODO: Write compacted version?
-    return reader.getProcedures();
   }
 
   public static void writeHeader(OutputStream stream, ProcedureWALHeader header)

http://git-wip-us.apache.org/repos/asf/hbase/blob/24ef755f/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
index a60b8f5..76c0554 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
@@ -19,9 +19,6 @@
 package org.apache.hadoop.hbase.procedure2.store.wal;
 
 import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.HashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -30,6 +27,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
 
@@ -41,17 +39,74 @@ import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEn
 public class ProcedureWALFormatReader {
   private static final Log LOG = LogFactory.getLog(ProcedureWALFormatReader.class);
 
-  private final ProcedureStoreTracker tracker;
-  //private final long compactionLogId;
-
-  private final Map<Long, Procedure> procedures = new HashMap<Long, Procedure>();
-  private final Map<Long, ProcedureProtos.Procedure> localProcedures =
-    new HashMap<Long, ProcedureProtos.Procedure>();
+  // ==============================================================================================
+  //  We read the WALs in reverse order. from the newest to the oldest.
+  //  We have different entry types:
+  //   - INIT: Procedure submitted by the user (also known as 'root procedure')
+  //   - INSERT: Children added to the procedure <parentId>:[<childId>, ...]
+  //   - UPDATE: The specified procedure was updated
+  //   - DELETE: The procedure was removed (completed/rolledback and result TTL expired)
+  //
+  // In the WAL we can find multiple times the same procedure as UPDATE or INSERT.
+  // We read the WAL from top to bottom, so every time we find an entry of the
+  // same procedure, that will be the "latest" update.
+  //
+  // We keep two in-memory maps:
+  //  - localProcedureMap: is the map containing the entries in the WAL we are processing
+  //  - procedureMap: is the map containing all the procedures we found up to the WAL in process.
+  // localProcedureMap is merged with the procedureMap once we reach the WAL EOF.
+  //
+  // Since we are reading the WALs in reverse order (newest to oldest),
+  // if we find an entry related to a procedure we already have in 'procedureMap' we can discard it.
+  //
+  // The WAL is append-only so the last procedure in the WAL is the one that
+  // was in execution at the time we crashed/closed the server.
+  // given that, the procedure replay order can be inferred by the WAL order.
+  //
+  // Example:
+  //    WAL-2: [A, B, A, C, D]
+  //    WAL-1: [F, G, A, F, B]
+  //    Replay-Order: [D, C, A, B, F, G]
+  //
+  // The "localProcedureMap" keeps a "replayOrder" list. Every time we add the
+  // record to the map that record is moved to the head of the "replayOrder" list.
+  // Using the example above:
+  //    WAL-2 localProcedureMap.replayOrder is [D, C, A, B]
+  //    WAL-1 localProcedureMap.replayOrder is [F, G]
+  //
+  // each time we reach the WAL-EOF, the "replayOrder" list is merged/appended in 'procedureMap'
+  // so using the example above we end up with: [D, C, A, B] + [F, G] as replay order.
+  //
+  //  Fast Start: INIT/INSERT record and StackIDs
+  // ---------------------------------------------
+  // We have to special record, INIT and INSERT that tracks the first time
+  // the procedure was added to the WAL. We can use that information to be able
+  // to start procedures before reaching the end of the WAL, or before reading all the WALs.
+  // but in some cases the WAL with that record can be already gone.
+  // In alternative we can use the stackIds on each procedure,
+  // to identify when a procedure is ready to start.
+  // If there are gaps in the sum of the stackIds we need to read more WALs.
+  //
+  // Example (all procs child of A):
+  //   WAL-2: [A, B]                   A stackIds = [0, 4], B stackIds = [1, 5]
+  //   WAL-1: [A, B, C, D]
+  //
+  // In the case above we need to read one more WAL to be able to consider
+  // the root procedure A and all children as ready.
+  // ==============================================================================================
+  private final WalProcedureMap localProcedureMap = new WalProcedureMap(1024);
+  private final WalProcedureMap procedureMap = new WalProcedureMap(1024);
 
+  //private long compactionLogId;
   private long maxProcId = 0;
 
+  private final ProcedureStoreTracker tracker;
+  private final boolean hasFastStartSupport;
+
   public ProcedureWALFormatReader(final ProcedureStoreTracker tracker) {
     this.tracker = tracker;
+    // we support fast-start only if we have a clean shutdown.
+    this.hasFastStartSupport = !tracker.isEmpty();
   }
 
   public void read(ProcedureWALFile log, ProcedureWALFormat.Loader loader) throws IOException {
@@ -91,58 +146,65 @@ public class ProcedureWALFormatReader {
       loader.markCorruptedWAL(log, e);
     }
 
-    if (localProcedures.isEmpty()) {
+    if (localProcedureMap.isEmpty()) {
       LOG.info("No active entry found in state log " + log + ". removing it");
       loader.removeLog(log);
     } else {
-      Iterator<Map.Entry<Long, ProcedureProtos.Procedure>> itd =
-        localProcedures.entrySet().iterator();
-      while (itd.hasNext()) {
-        Map.Entry<Long, ProcedureProtos.Procedure> entry = itd.next();
-        itd.remove();
+      procedureMap.mergeTail(localProcedureMap);
 
-        // Deserialize the procedure
-        Procedure proc = Procedure.convert(entry.getValue());
-        procedures.put(entry.getKey(), proc);
-      }
-
-      // TODO: Some procedure may be already runnables (see readInitEntry())
-      //       (we can also check the "update map" in the log trackers)
+      //if (hasFastStartSupport) {
+        // TODO: Some procedure may be already runnables (see readInitEntry())
+        //       (we can also check the "update map" in the log trackers)
+        // --------------------------------------------------
+        //EntryIterator iter = procedureMap.fetchReady();
+        //if (iter != null) loader.load(iter);
+        // --------------------------------------------------
+      //}
     }
   }
 
-  public Iterator<Procedure> getProcedures() {
-    return procedures.values().iterator();
+  public void finalize(ProcedureWALFormat.Loader loader) throws IOException {
+    // notify the loader about the max proc ID
+    loader.setMaxProcId(maxProcId);
+
+    // fetch the procedure ready to run.
+    ProcedureIterator procIter = procedureMap.fetchReady();
+    if (procIter != null) loader.load(procIter);
+
+    // remaining procedures have missing link or dependencies
+    // consider them as corrupted, manual fix is probably required.
+    procIter = procedureMap.fetchAll();
+    if (procIter != null) loader.handleCorrupted(procIter);
   }
 
-  private void loadEntries(final ProcedureWALEntry entry) {
-    for (ProcedureProtos.Procedure proc: entry.getProcedureList()) {
-      maxProcId = Math.max(maxProcId, proc.getProcId());
-      if (isRequired(proc.getProcId())) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("read " + entry.getType() + " entry " + proc.getProcId());
-        }
-        localProcedures.put(proc.getProcId(), proc);
-        tracker.setDeleted(proc.getProcId(), false);
+  private void loadProcedure(final ProcedureWALEntry entry, final ProcedureProtos.Procedure proc) {
+    maxProcId = Math.max(maxProcId, proc.getProcId());
+    if (isRequired(proc.getProcId())) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("read " + entry.getType() + " entry " + proc.getProcId());
       }
+      localProcedureMap.add(proc);
+      tracker.setDeleted(proc.getProcId(), false);
     }
   }
 
   private void readInitEntry(final ProcedureWALEntry entry)
       throws IOException {
     assert entry.getProcedureCount() == 1 : "Expected only one procedure";
-    // TODO: Make it runnable, before reading other files
-    loadEntries(entry);
+    loadProcedure(entry, entry.getProcedure(0));
   }
 
   private void readInsertEntry(final ProcedureWALEntry entry) throws IOException {
     assert entry.getProcedureCount() >= 1 : "Expected one or more procedures";
-    loadEntries(entry);
+    loadProcedure(entry, entry.getProcedure(0));
+    for (int i = 1; i < entry.getProcedureCount(); ++i) {
+      loadProcedure(entry, entry.getProcedure(i));
+    }
   }
 
   private void readUpdateEntry(final ProcedureWALEntry entry) throws IOException {
     assert entry.getProcedureCount() == 1 : "Expected only one procedure";
-    loadEntries(entry);
+    loadProcedure(entry, entry.getProcedure(0));
   }
 
   private void readDeleteEntry(final ProcedureWALEntry entry) throws IOException {
@@ -152,7 +214,7 @@ public class ProcedureWALFormatReader {
       LOG.trace("read delete entry " + entry.getProcId());
     }
     maxProcId = Math.max(maxProcId, entry.getProcId());
-    localProcedures.remove(entry.getProcId());
+    localProcedureMap.remove(entry.getProcId());
     tracker.setDeleted(entry.getProcId(), true);
   }
 
@@ -161,6 +223,458 @@ public class ProcedureWALFormatReader {
   }
 
   private boolean isRequired(final long procId) {
-    return !isDeleted(procId) && !procedures.containsKey(procId);
+    return !isDeleted(procId) && !procedureMap.contains(procId);
+  }
+
+  // ==========================================================================
+  //  We keep an in-memory map of the procedures sorted by replay order.
+  //  (see the details in the beginning of the file)
+  //                      _______________________________________________
+  //      procedureMap = | A |   | E |   | C |   |   |   |   | G |   |   |
+  //                       D               B
+  //      replayOrderHead = C <-> B <-> E <-> D <-> A <-> G
+  //
+  //  We also have a lazy grouping by "root procedure", and a list of
+  //  unlinked procedure. If after reading all the WALs we have unlinked
+  //  procedures it means that we had a missing WAL or a corruption.
+  //      rootHead = A <-> D <-> G
+  //                 B     E
+  //                 C
+  //      unlinkFromLinkList = None
+  // ==========================================================================
+  private static class Entry {
+    // hash-table next
+    protected Entry hashNext;
+    // child head
+    protected Entry childHead;
+    // double-link for rootHead or childHead
+    protected Entry linkNext;
+    protected Entry linkPrev;
+    // replay double-linked-list
+    protected Entry replayNext;
+    protected Entry replayPrev;
+    // procedure-infos
+    protected Procedure procedure;
+    protected ProcedureProtos.Procedure proto;
+    protected boolean ready = false;
+
+    public Entry(Entry hashNext) { this.hashNext = hashNext; }
+
+    public long getProcId() { return proto.getProcId(); }
+    public long getParentId() { return proto.getParentId(); }
+    public boolean hasParent() { return proto.hasParentId(); }
+    public boolean isReady() { return ready; }
+
+    public Procedure convert() throws IOException {
+      if (procedure == null) {
+        procedure = Procedure.convert(proto);
+      }
+      return procedure;
+    }
+
+    @Override
+    public String toString() {
+      return "Entry(" + getProcId() + ", parentId=" + getParentId() + ")";
+    }
+  }
+
+  private static class EntryIterator implements ProcedureIterator {
+    private final Entry replayHead;
+    private Entry current;
+
+    public EntryIterator(Entry replayHead) {
+      this.replayHead = replayHead;
+      this.current = replayHead;
+    }
+
+    @Override
+    public void reset() {
+      this.current = replayHead;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return current != null;
+    }
+
+    @Override
+    public Procedure next() throws IOException {
+      try {
+        return current.convert();
+      } finally {
+        current = current.replayNext;
+      }
+    }
+  }
+
+  private static class WalProcedureMap {
+    // procedure hash table
+    private Entry[] procedureMap;
+
+    // replay-order double-linked-list
+    private Entry replayOrderHead;
+    private Entry replayOrderTail;
+
+    // root linked-list
+    private Entry rootHead;
+
+    // pending unlinked children (root not present yet)
+    private Entry childUnlinkedHead;
+
+    public WalProcedureMap(int size) {
+      procedureMap = new Entry[size];
+      replayOrderHead = null;
+      replayOrderTail = null;
+      rootHead = null;
+      childUnlinkedHead = null;
+    }
+
+    public void add(ProcedureProtos.Procedure procProto) {
+      Entry entry = addToMap(procProto.getProcId(), procProto.hasParentId());
+      boolean isNew = entry.proto == null;
+      entry.proto = procProto;
+      addToReplayList(entry);
+
+      if (isNew) {
+        if (procProto.hasParentId()) {
+          childUnlinkedHead = addToLinkList(entry, childUnlinkedHead);
+        } else {
+          rootHead = addToLinkList(entry, rootHead);
+        }
+      }
+    }
+
+    public boolean remove(long procId) {
+      Entry entry = removeFromMap(procId);
+      if (entry != null) {
+        unlinkFromReplayList(entry);
+        unlinkFromLinkList(entry);
+        return true;
+      }
+      return false;
+    }
+
+    public boolean contains(long procId) {
+      return getProcedure(procId) != null;
+    }
+
+    public boolean isEmpty() {
+      return replayOrderHead == null;
+    }
+
+    public void clear() {
+      for (int i = 0; i < procedureMap.length; ++i) {
+        procedureMap[i] = null;
+      }
+      replayOrderHead = null;
+      replayOrderTail = null;
+      rootHead = null;
+      childUnlinkedHead = null;
+    }
+
+    /*
+     * Merges two WalProcedureMap,
+     * the target is the "global" map, the source is the "local" map.
+     *  - The entries in the hashtables are guaranteed to be unique.
+     *    On replay we don't load procedures that already exist in the "global"
+     *    map (the one we are merging the "local" in to).
+     *  - The replayOrderList of the "local" nao will be appended to the "global"
+     *    map replay list.
+     *  - The "local" map will be cleared at the end of the operation.
+     */
+    public void mergeTail(WalProcedureMap other) {
+      for (Entry p = other.replayOrderHead; p != null; p = p.replayNext) {
+        int slotIndex = getMapSlot(p.getProcId());
+        p.hashNext = procedureMap[slotIndex];
+        procedureMap[slotIndex] = p;
+      }
+
+      if (replayOrderHead == null) {
+        replayOrderHead = other.replayOrderHead;
+        replayOrderTail = other.replayOrderTail;
+        rootHead = other.rootHead;
+        childUnlinkedHead = other.childUnlinkedHead;
+      } else {
+        // append replay list
+        assert replayOrderTail.replayNext == null;
+        assert other.replayOrderHead.replayPrev == null;
+        replayOrderTail.replayNext = other.replayOrderHead;
+        other.replayOrderHead.replayPrev = replayOrderTail;
+        replayOrderTail = other.replayOrderTail;
+
+        // merge rootHead
+        if (rootHead == null) {
+          rootHead = other.rootHead;
+        } else if (other.rootHead != null) {
+          Entry otherTail = findLinkListTail(other.rootHead);
+          otherTail.linkNext = rootHead;
+          rootHead.linkPrev = otherTail;
+          rootHead = other.rootHead;
+        }
+
+        // merge childUnlinkedHead
+        if (childUnlinkedHead == null) {
+          childUnlinkedHead = other.childUnlinkedHead;
+        } else if (other.childUnlinkedHead != null) {
+          Entry otherTail = findLinkListTail(other.childUnlinkedHead);
+          otherTail.linkNext = childUnlinkedHead;
+          childUnlinkedHead.linkPrev = otherTail;
+          childUnlinkedHead = other.childUnlinkedHead;
+        }
+      }
+
+      other.clear();
+    }
+
+    /*
+     * Returns an EntryIterator with the list of procedures ready
+     * to be added to the executor.
+     * A Procedure is ready if its children and parent are ready.
+     */
+    public EntryIterator fetchReady() {
+      buildGraph();
+
+      Entry readyHead = null;
+      Entry readyTail = null;
+      Entry p = replayOrderHead;
+      while (p != null) {
+        Entry next = p.replayNext;
+        if (p.isReady()) {
+          unlinkFromReplayList(p);
+          if (readyTail != null) {
+            readyTail.replayNext = p;
+            p.replayPrev = readyTail;
+          } else {
+            p.replayPrev = null;
+            readyHead = p;
+          }
+          readyTail = p;
+          p.replayNext = null;
+        }
+        p = next;
+      }
+      // we need the hash-table lookups for parents, so this must be done
+      // out of the loop where we check isReadyToRun()
+      for (p = readyHead; p != null; p = p.replayNext) {
+        removeFromMap(p.getProcId());
+        unlinkFromLinkList(p);
+      }
+      return readyHead != null ? new EntryIterator(readyHead) : null;
+    }
+
+    /*
+     * Drain this map and return all procedures in it.
+     */
+    public EntryIterator fetchAll() {
+      Entry head = replayOrderHead;
+      for (Entry p = head; p != null; p = p.replayNext) {
+        removeFromMap(p.getProcId());
+      }
+      for (int i = 0; i < procedureMap.length; ++i) {
+        assert procedureMap[i] == null : "map not empty i=" + i;
+      }
+      replayOrderHead = null;
+      replayOrderTail = null;
+      childUnlinkedHead = null;
+      rootHead = null;
+      return head != null ? new EntryIterator(head) : null;
+    }
+
+    private void buildGraph() {
+      Entry p = childUnlinkedHead;
+      while (p != null) {
+        Entry next = p.linkNext;
+        Entry rootProc = getRootProcedure(p);
+        if (rootProc != null) {
+          rootProc.childHead = addToLinkList(p, rootProc.childHead);
+        }
+        p = next;
+      }
+
+      for (p = rootHead; p != null; p = p.linkNext) {
+        checkReadyToRun(p);
+      }
+    }
+
+    private Entry getRootProcedure(Entry entry) {
+      while (entry != null && entry.hasParent()) {
+        entry = getProcedure(entry.getParentId());
+      }
+      return entry;
+    }
+
+    /*
+     * (see the comprehensive explaination in the beginning of the file)
+     * A Procedure is ready when parent and children are ready.
+     * "ready" means that we all the information that we need in-memory.
+     *
+     * Example-1:
+     * We have two WALs, we start reading fronm the newest (wal-2)
+     *    wal-2 | C B |
+     *    wal-1 | A B C |
+     *
+     * If C and B don't depend on A (A is not the parent), we can start them
+     * before reading wal-1. If B is the only one with parent A we can start C
+     * and read one more WAL before being able to start B.
+     *
+     * How do we know with the only information in B that we are not ready.
+     *  - easy case, the parent is missing from the global map
+     *  - more complex case we look at the Stack IDs
+     *
+     * The Stack-IDs are added to the procedure order as incremental index
+     * tracking how many times that procedure was executed, which is equivalent
+     * at the number of times we wrote the procedure to the WAL.
+     * In the example above:
+     *   wal-2: B has stackId = [1, 2]
+     *   wal-1: B has stackId = [1]
+     *   wal-1: A has stackId = [0]
+     *
+     * Since we know that the Stack-IDs are incremental for a Procedure,
+     * we notice that there is a gap in the stackIds of B, so something was
+     * executed before.
+     * To identify when a Procedure is ready we do the sum of the stackIds of
+     * the procedure and the parent. if the stackIdSum is equals to the
+     * sum of {1..maxStackId} then everything we need is avaiable.
+     *
+     * Example-2
+     *    wal-2 | A |              A stackIds = [0, 2]
+     *    wal-1 | A B |            B stackIds = [1]
+     *
+     * There is a gap between A stackIds so something was executed in between.
+     */
+    private boolean checkReadyToRun(Entry rootEntry) {
+      int stackIdSum = 0;
+      int maxStackId = 0;
+      for (int i = 0; i < rootEntry.proto.getStackIdCount(); ++i) {
+        int stackId = 1 + rootEntry.proto.getStackId(i);
+        maxStackId  = Math.max(maxStackId, stackId);
+        stackIdSum += stackId;
+      }
+
+      for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
+        for (int i = 0; i < p.proto.getStackIdCount(); ++i) {
+          int stackId = 1 + p.proto.getStackId(i);
+          maxStackId  = Math.max(maxStackId, stackId);
+          stackIdSum += stackId;
+        }
+      }
+      final int cmpStackIdSum = (maxStackId * (maxStackId + 1) / 2);
+      if (cmpStackIdSum == stackIdSum) {
+        rootEntry.ready = true;
+        for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
+          p.ready = true;
+        }
+        return true;
+      }
+      return false;
+    }
+
+    private void unlinkFromReplayList(Entry entry) {
+      if (replayOrderHead == entry) {
+        replayOrderHead = entry.replayNext;
+      }
+      if (replayOrderTail == entry) {
+        replayOrderTail = entry.replayPrev;
+      }
+      if (entry.replayPrev != null) {
+        entry.replayPrev.replayNext = entry.replayNext;
+      }
+      if (entry.replayNext != null) {
+        entry.replayNext.replayPrev = entry.replayPrev;
+      }
+    }
+
+    private void addToReplayList(final Entry entry) {
+      unlinkFromReplayList(entry);
+      entry.replayNext = replayOrderHead;
+      entry.replayPrev = null;
+      if (replayOrderHead != null) {
+        replayOrderHead.replayPrev = entry;
+      } else {
+        replayOrderTail = entry;
+      }
+      replayOrderHead = entry;
+    }
+
+    private void unlinkFromLinkList(Entry entry) {
+      if (entry == rootHead) {
+        rootHead = entry.linkNext;
+      } else if (entry == childUnlinkedHead) {
+        childUnlinkedHead = entry.linkNext;
+      }
+      if (entry.linkPrev != null) {
+        entry.linkPrev.linkNext = entry.linkNext;
+      }
+      if (entry.linkNext != null) {
+        entry.linkNext.linkPrev = entry.linkPrev;
+      }
+    }
+
+    private Entry addToLinkList(Entry entry, Entry linkHead) {
+      unlinkFromLinkList(entry);
+      entry.linkNext = linkHead;
+      entry.linkPrev = null;
+      if (linkHead != null) {
+        linkHead.linkPrev = entry;
+      }
+      return entry;
+    }
+
+    private Entry findLinkListTail(Entry linkHead) {
+      Entry tail = linkHead;
+      while (tail.linkNext != null) {
+        tail = tail.linkNext;
+      }
+      return tail;
+    }
+
+    private Entry addToMap(final long procId, final boolean hasParent) {
+      int slotIndex = getMapSlot(procId);
+      Entry entry = getProcedure(slotIndex, procId);
+      if (entry != null) return entry;
+
+      entry = new Entry(procedureMap[slotIndex]);
+      procedureMap[slotIndex] = entry;
+      return entry;
+    }
+
+    private Entry removeFromMap(final long procId) {
+      int slotIndex = getMapSlot(procId);
+      Entry prev = null;
+      Entry entry = procedureMap[slotIndex];
+      while (entry != null) {
+        if (procId == entry.getProcId()) {
+          if (prev != null) {
+            prev.hashNext = entry.hashNext;
+          } else {
+            procedureMap[slotIndex] = entry.hashNext;
+          }
+          entry.hashNext = null;
+          return entry;
+        }
+        prev = entry;
+        entry = entry.hashNext;
+      }
+      return null;
+    }
+
+    private Entry getProcedure(final long procId) {
+      return getProcedure(getMapSlot(procId), procId);
+    }
+
+    private Entry getProcedure(final int slotIndex, final long procId) {
+      Entry entry = procedureMap[slotIndex];
+      while (entry != null) {
+        if (procId == entry.getProcId()) {
+          return entry;
+        }
+        entry = entry.hashNext;
+      }
+      return null;
+    }
+
+    private int getMapSlot(final long procId) {
+      return (int)(Procedure.getProcIdHashCode(procId) % procedureMap.length);
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/24ef755f/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index 1884adc..f4a52b1 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -215,13 +215,12 @@ public class WALProcedureStore implements ProcedureStore {
     FileStatus[] oldLogs = getLogFiles();
     while (running.get()) {
       // Get Log-MaxID and recover lease on old logs
-      flushLogId = initOldLogs(oldLogs) + 1;
+      flushLogId = initOldLogs(oldLogs);
 
       // Create new state-log
-      if (!rollWriter(flushLogId)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Someone else has already created log: " + flushLogId);
-        }
+      if (!rollWriter(flushLogId + 1)) {
+        // someone else has already created this log
+        LOG.debug("someone else has already created log " + flushLogId);
         continue;
       }
 
@@ -241,7 +240,7 @@ public class WALProcedureStore implements ProcedureStore {
   }
 
   @Override
-  public Iterator<Procedure> load() throws IOException {
+  public void load(final ProcedureLoader loader) throws IOException {
     if (logs.isEmpty()) {
       throw new RuntimeException("recoverLease() must be called before loading data");
     }
@@ -251,7 +250,8 @@ public class WALProcedureStore implements ProcedureStore {
       if (LOG.isDebugEnabled()) {
         LOG.debug("No state logs to replay.");
       }
-      return null;
+      loader.setMaxProcId(0);
+      return;
     }
 
     // Load the old logs
@@ -259,7 +259,22 @@ public class WALProcedureStore implements ProcedureStore {
     Iterator<ProcedureWALFile> it = logs.descendingIterator();
     it.next(); // Skip the current log
     try {
-      return ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() {
+      ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() {
+        @Override
+        public void setMaxProcId(long maxProcId) {
+          loader.setMaxProcId(maxProcId);
+        }
+
+        @Override
+        public void load(ProcedureIterator procIter) throws IOException {
+          loader.load(procIter);
+        }
+
+        @Override
+        public void handleCorrupted(ProcedureIterator procIter) throws IOException {
+          loader.handleCorrupted(procIter);
+        }
+
         @Override
         public void removeLog(ProcedureWALFile log) {
           toRemove.add(log);
@@ -301,7 +316,7 @@ public class WALProcedureStore implements ProcedureStore {
       }
 
       // Push the transaction data and wait until it is persisted
-      logId = pushData(slot);
+      pushData(slot);
     } catch (IOException e) {
       // We are not able to serialize the procedure.
       // this is a code error, and we are not able to go on.
@@ -383,7 +398,7 @@ public class WALProcedureStore implements ProcedureStore {
       storeTracker.delete(procId);
       if (logId == flushLogId) {
         if (storeTracker.isEmpty() && totalSynced.get() > rollThreshold) {
-          removeOldLogs = rollWriterOrDie(logId + 1);
+          removeOldLogs = rollWriterOrDie();
         }
       }
     }
@@ -541,9 +556,9 @@ public class WALProcedureStore implements ProcedureStore {
     }
   }
 
-  private boolean rollWriterOrDie(final long logId) {
+  private boolean rollWriterOrDie() {
     try {
-      return rollWriter(logId);
+      return rollWriter();
     } catch (IOException e) {
       LOG.warn("Unable to roll the log", e);
       sendAbortProcessSignal();
@@ -551,7 +566,13 @@ public class WALProcedureStore implements ProcedureStore {
     }
   }
 
+  protected boolean rollWriter() throws IOException {
+    return rollWriter(flushLogId + 1);
+  }
+
   private boolean rollWriter(final long logId) throws IOException {
+    assert logId > flushLogId : "logId=" + logId + " flushLogId=" + flushLogId;
+
     ProcedureWALHeader header = ProcedureWALHeader.newBuilder()
       .setVersion(ProcedureWALFormat.HEADER_VERSION)
       .setType(ProcedureWALFormat.LOG_TYPE_STREAM)

http://git-wip-us.apache.org/repos/asf/hbase/blob/24ef755f/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
index 7b9fc69..ddea9d2 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
@@ -57,11 +57,11 @@ public class ProcedureTestingUtility {
 
   public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor)
       throws Exception {
-    restart(procExecutor, null);
+    restart(procExecutor, null, true);
   }
 
   public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor,
-      Runnable beforeStartAction) throws Exception {
+      Runnable beforeStartAction, boolean failOnCorrupted) throws Exception {
     ProcedureStore procStore = procExecutor.getStore();
     int storeThreads = procExecutor.getNumThreads();
     int execThreads = procExecutor.getNumThreads();
@@ -75,7 +75,7 @@ public class ProcedureTestingUtility {
     }
     // re-start
     procStore.start(storeThreads);
-    procExecutor.start(execThreads);
+    procExecutor.start(execThreads, failOnCorrupted);
   }
 
   public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,

http://git-wip-us.apache.org/repos/asf/hbase/blob/24ef755f/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java
index 022f8ad..0b2a364 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java
@@ -70,7 +70,7 @@ public class TestProcedureExecution {
     procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
     procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore);
     procStore.start(PROCEDURE_EXECUTOR_SLOTS);
-    procExecutor.start(PROCEDURE_EXECUTOR_SLOTS);
+    procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/hbase/blob/24ef755f/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
index f21b6fa..7735b63 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
@@ -74,7 +74,7 @@ public class TestProcedureRecovery {
     procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore);
     procExecutor.testing = new ProcedureExecutor.Testing();
     procStore.start(PROCEDURE_EXECUTOR_SLOTS);
-    procExecutor.start(PROCEDURE_EXECUTOR_SLOTS);
+    procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
     procSleepInterval = 0;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/24ef755f/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
index 8a7c1a1..61c58e1 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
@@ -19,13 +19,17 @@
 package org.apache.hadoop.hbase.procedure2;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.io.util.StreamUtils;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 
@@ -45,7 +49,7 @@ import static org.junit.Assert.fail;
 public class TestProcedureReplayOrder {
   private static final Log LOG = LogFactory.getLog(TestProcedureReplayOrder.class);
 
-  private static final Procedure NULL_PROC = null;
+  private static final int NUM_THREADS = 16;
 
   private ProcedureExecutor<Void> procExecutor;
   private TestProcedureEnv procEnv;
@@ -59,7 +63,7 @@ public class TestProcedureReplayOrder {
   @Before
   public void setUp() throws IOException {
     htu = new HBaseCommonTestingUtility();
-    htu.getConfiguration().setInt("hbase.procedure.store.wal.sync.wait.msec", 10);
+    htu.getConfiguration().setInt("hbase.procedure.store.wal.sync.wait.msec", 25);
 
     testDir = htu.getDataTestDir();
     fs = testDir.getFileSystem(htu.getConfiguration());
@@ -69,8 +73,8 @@ public class TestProcedureReplayOrder {
     procEnv = new TestProcedureEnv();
     procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
     procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
-    procStore.start(24);
-    procExecutor.start(1);
+    procStore.start(NUM_THREADS);
+    procExecutor.start(1, true);
   }
 
   @After
@@ -81,47 +85,45 @@ public class TestProcedureReplayOrder {
   }
 
   @Test(timeout=90000)
-  public void testSingleStepReplyOrder() throws Exception {
-    // avoid the procedure to be runnable
-    procEnv.setAcquireLock(false);
+  public void testSingleStepReplayOrder() throws Exception {
+    final int NUM_PROC_XTHREAD = 32;
+    final int NUM_PROCS = NUM_THREADS * NUM_PROC_XTHREAD;
 
     // submit the procedures
-    submitProcedures(16, 25, TestSingleStepProcedure.class);
+    submitProcedures(NUM_THREADS, NUM_PROC_XTHREAD, TestSingleStepProcedure.class);
+
+    while (procEnv.getExecId() < NUM_PROCS) {
+      Thread.sleep(100);
+    }
 
     // restart the executor and allow the procedures to run
-    ProcedureTestingUtility.restart(procExecutor, new Runnable() {
-      @Override
-      public void run() {
-        procEnv.setAcquireLock(true);
-      }
-    });
+    ProcedureTestingUtility.restart(procExecutor);
 
     // wait the execution of all the procedures and
     // assert that the execution order was sorted by procId
     ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
-    procEnv.assertSortedExecList();
-
-    // TODO: FIXME: This should be revisited
+    procEnv.assertSortedExecList(NUM_PROCS);
   }
 
-  @Ignore
   @Test(timeout=90000)
-  public void testMultiStepReplyOrder() throws Exception {
-    // avoid the procedure to be runnable
-    procEnv.setAcquireLock(false);
+  public void testMultiStepReplayOrder() throws Exception {
+    final int NUM_PROC_XTHREAD = 24;
+    final int NUM_PROCS = NUM_THREADS * (NUM_PROC_XTHREAD * 2);
 
     // submit the procedures
-    submitProcedures(16, 10, TestTwoStepProcedure.class);
+    submitProcedures(NUM_THREADS, NUM_PROC_XTHREAD, TestTwoStepProcedure.class);
+
+    while (procEnv.getExecId() < NUM_PROCS) {
+      Thread.sleep(100);
+    }
 
     // restart the executor and allow the procedures to run
-    ProcedureTestingUtility.restart(procExecutor, new Runnable() {
-      @Override
-      public void run() {
-        procEnv.setAcquireLock(true);
-      }
-    });
+    ProcedureTestingUtility.restart(procExecutor);
 
-    fail("TODO: FIXME: NOT IMPLEMENT REPLAY ORDER");
+    // wait the execution of all the procedures and
+    // assert that the execution order was sorted by procId
+    ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
+    procEnv.assertSortedExecList(NUM_PROCS);
   }
 
   private void submitProcedures(final int nthreads, final int nprocPerThread,
@@ -153,46 +155,38 @@ public class TestProcedureReplayOrder {
   }
 
   private static class TestProcedureEnv {
-    private ArrayList<Long> execList = new ArrayList<Long>();
-    private boolean acquireLock = true;
-
-    public void setAcquireLock(boolean acquireLock) {
-      this.acquireLock = acquireLock;
-    }
+    private ArrayList<TestProcedure> execList = new ArrayList<TestProcedure>();
+    private AtomicLong execTimestamp = new AtomicLong(0);
 
-    public boolean canAcquireLock() {
-      return acquireLock;
+    public long getExecId() {
+      return execTimestamp.get();
     }
 
-    public void addToExecList(final Procedure proc) {
-      execList.add(proc.getProcId());
+    public long nextExecId() {
+      return execTimestamp.incrementAndGet();
     }
 
-    public ArrayList<Long> getExecList() {
-      return execList;
+    public void addToExecList(final TestProcedure proc) {
+      execList.add(proc);
     }
 
-    public void assertSortedExecList() {
+    public void assertSortedExecList(int numProcs) {
+      assertEquals(numProcs, execList.size());
       LOG.debug("EXEC LIST: " + execList);
-      for (int i = 1; i < execList.size(); ++i) {
-        assertTrue("exec list not sorted: " + execList.get(i-1) + " >= " + execList.get(i),
-          execList.get(i-1) < execList.get(i));
+      for (int i = 0; i < execList.size() - 1; ++i) {
+        TestProcedure a = execList.get(i);
+        TestProcedure b = execList.get(i + 1);
+        assertTrue("exec list not sorted: " + a + " < " + b, a.getExecId() > b.getExecId());
       }
     }
   }
 
-  public static class TestSingleStepProcedure extends SequentialProcedure<TestProcedureEnv> {
-    public TestSingleStepProcedure() { }
-
-    @Override
-    protected Procedure[] execute(TestProcedureEnv env) {
-      LOG.debug("execute procedure " + this);
-      env.addToExecList(this);
-      return null;
-    }
+  public static abstract class TestProcedure extends Procedure<TestProcedureEnv> {
+    protected long execId = 0;
+    protected int step = 0;
 
-    protected boolean acquireLock(final TestProcedureEnv env) {
-      return env.canAcquireLock();
+    public long getExecId() {
+      return execId;
     }
 
     @Override
@@ -200,26 +194,62 @@ public class TestProcedureReplayOrder {
 
     @Override
     protected boolean abort(TestProcedureEnv env) { return true; }
+
+    @Override
+    protected void serializeStateData(final OutputStream stream) throws IOException {
+      StreamUtils.writeLong(stream, execId);
+    }
+
+    @Override
+    protected void deserializeStateData(final InputStream stream) throws IOException {
+      execId = StreamUtils.readLong(stream);
+      step = 2;
+    }
   }
 
-  public static class TestTwoStepProcedure extends SequentialProcedure<TestProcedureEnv> {
-    public TestTwoStepProcedure() { }
+  public static class TestSingleStepProcedure extends TestProcedure {
+    public TestSingleStepProcedure() { }
 
     @Override
-    protected Procedure[] execute(TestProcedureEnv env) {
-      LOG.debug("execute procedure " + this);
-      env.addToExecList(this);
-      return new Procedure[] { new TestSingleStepProcedure() };
+    protected Procedure[] execute(TestProcedureEnv env) throws ProcedureYieldException {
+      LOG.trace("execute procedure step=" + step + ": " + this);
+      if (step == 0) {
+        step = 1;
+        execId = env.nextExecId();
+        return new Procedure[] { this };
+      } else if (step == 2) {
+        env.addToExecList(this);
+        return null;
+      }
+      throw new ProcedureYieldException();
     }
 
-    protected boolean acquireLock(final TestProcedureEnv env) {
-      return true;
+    @Override
+    public String toString() {
+      return "SingleStep(procId=" + getProcId() + " execId=" + execId + ")";
     }
+  }
+
+  public static class TestTwoStepProcedure extends TestProcedure {
+    public TestTwoStepProcedure() { }
 
     @Override
-    protected void rollback(TestProcedureEnv env) { }
+    protected Procedure[] execute(TestProcedureEnv env) throws ProcedureYieldException {
+      LOG.trace("execute procedure step=" + step + ": " + this);
+      if (step == 0) {
+        step = 1;
+        execId = env.nextExecId();
+        return new Procedure[] { new TestSingleStepProcedure() };
+      } else if (step == 2) {
+        env.addToExecList(this);
+        return null;
+      }
+      throw new ProcedureYieldException();
+    }
 
     @Override
-    protected boolean abort(TestProcedureEnv env) { return true; }
+    public String toString() {
+      return "TwoStep(procId=" + getProcId() + " execId=" + execId + ")";
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/24ef755f/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
index 1829d4b..19a9ea4 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
@@ -22,6 +22,10 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.HashSet;
 import java.util.Set;
@@ -35,6 +39,8 @@ import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
 import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
 import org.apache.hadoop.hbase.procedure2.SequentialProcedure;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.IOUtils;
@@ -83,17 +89,20 @@ public class TestWALProcedureStore {
     fs.delete(logDir, true);
   }
 
-  private Iterator<Procedure> storeRestart() throws Exception {
+  private void storeRestart(ProcedureStore.ProcedureLoader loader) throws Exception {
     procStore.stop(false);
     procStore.start(PROCEDURE_STORE_SLOTS);
     procStore.recoverLease();
-    return procStore.load();
+    procStore.load(loader);
   }
 
   @Test
   public void testEmptyLogLoad() throws Exception {
-    Iterator<Procedure> loader = storeRestart();
-    assertEquals(0, countProcedures(loader));
+    LoadCounter loader = new LoadCounter();
+    storeRestart(loader);
+    assertEquals(0, loader.getMaxProcId());
+    assertEquals(0, loader.getLoadedCount());
+    assertEquals(0, loader.getCorruptedCount());
   }
 
   @Test
@@ -152,8 +161,10 @@ public class TestWALProcedureStore {
     assertEquals(1, logs.length);
     corruptLog(logs[0], 4);
 
-    int count = countProcedures(storeRestart());
-    assertEquals(100, count);
+    LoadCounter loader = new LoadCounter();
+    storeRestart(loader);
+    assertEquals(100, loader.getLoadedCount());
+    assertEquals(0, loader.getCorruptedCount());
   }
 
   @Test
@@ -172,10 +183,205 @@ public class TestWALProcedureStore {
     assertEquals(1, logs.length);
     corruptLog(logs[0], 1823);
 
-    int count = countProcedures(storeRestart());
+    LoadCounter loader = new LoadCounter();
+    storeRestart(loader);
     assertTrue(procStore.getCorruptedLogs() != null);
     assertEquals(1, procStore.getCorruptedLogs().size());
-    assertEquals(85, count);
+    assertEquals(85, loader.getLoadedCount());
+    assertEquals(0, loader.getCorruptedCount());
+  }
+
+  @Test
+  public void testCorruptedProcedures() throws Exception {
+    // Insert root-procedures
+    TestProcedure[] rootProcs = new TestProcedure[10];
+    for (int i = 1; i <= rootProcs.length; i++) {
+      rootProcs[i-1] = new TestProcedure(i, 0);
+      procStore.insert(rootProcs[i-1], null);
+      rootProcs[i-1].addStackId(0);
+      procStore.update(rootProcs[i-1]);
+    }
+    // insert root-child txn
+    procStore.rollWriter();
+    for (int i = 1; i <= rootProcs.length; i++) {
+      TestProcedure b = new TestProcedure(rootProcs.length + i, i);
+      rootProcs[i-1].addStackId(1);
+      procStore.insert(rootProcs[i-1], new Procedure[] { b });
+    }
+    // insert child updates
+    procStore.rollWriter();
+    for (int i = 1; i <= rootProcs.length; i++) {
+      procStore.update(new TestProcedure(rootProcs.length + i, i));
+    }
+
+    // Stop the store
+    procStore.stop(false);
+
+    // Remove 4 byte from the trailer
+    FileStatus[] logs = fs.listStatus(logDir);
+    assertEquals(3, logs.length);
+    Arrays.sort(logs, new Comparator<FileStatus>() {
+      @Override
+      public int compare(FileStatus o1, FileStatus o2) {
+        return o1.getPath().getName().compareTo(o2.getPath().getName());
+      }
+    });
+
+    // Remove the first log, we have insert-txn and updates in the others so everything is fine.
+    fs.delete(logs[0].getPath(), false);
+    LoadCounter loader = new LoadCounter();
+    storeRestart(loader);
+    assertEquals(rootProcs.length * 2, loader.getLoadedCount());
+    assertEquals(0, loader.getCorruptedCount());
+
+    // Remove the second log, we have lost any root/parent references
+    fs.delete(logs[1].getPath(), false);
+    loader.reset();
+    storeRestart(loader);
+    assertEquals(0, loader.getLoadedCount());
+    assertEquals(rootProcs.length, loader.getCorruptedCount());
+    for (Procedure proc: loader.getCorrupted()) {
+      assertTrue(proc.toString(), proc.getParentProcId() <= rootProcs.length);
+      assertTrue(proc.toString(),
+                  proc.getProcId() > rootProcs.length &&
+                  proc.getProcId() <= (rootProcs.length * 2));
+    }
+  }
+
+  @Test(timeout=60000)
+  public void testWalReplayOrder_AB_A() throws Exception {
+    /*
+     * | A B | -> | A |
+     */
+    TestProcedure a = new TestProcedure(1, 0);
+    TestProcedure b = new TestProcedure(2, 1);
+
+    procStore.insert(a, null);
+    a.addStackId(0);
+    procStore.update(a);
+
+    procStore.insert(a, new Procedure[] { b });
+    b.addStackId(1);
+    procStore.update(b);
+
+    procStore.rollWriter();
+
+    a.addStackId(2);
+    procStore.update(a);
+
+    storeRestart(new ProcedureStore.ProcedureLoader() {
+      @Override
+      public void setMaxProcId(long maxProcId) {
+        assertEquals(2, maxProcId);
+      }
+
+      @Override
+      public void load(ProcedureIterator procIter) throws IOException {
+        assertTrue(procIter.hasNext());
+        assertEquals(1, procIter.next().getProcId());
+        assertTrue(procIter.hasNext());
+        assertEquals(2, procIter.next().getProcId());
+        assertFalse(procIter.hasNext());
+      }
+
+      @Override
+      public void handleCorrupted(ProcedureIterator procIter) throws IOException {
+        assertFalse(procIter.hasNext());
+      }
+    });
+  }
+
+  @Test(timeout=60000)
+  public void testWalReplayOrder_ABC_BAD() throws Exception {
+    /*
+     * | A B C | -> | B A D |
+     */
+    TestProcedure a = new TestProcedure(1, 0);
+    TestProcedure b = new TestProcedure(2, 1);
+    TestProcedure c = new TestProcedure(3, 2);
+    TestProcedure d = new TestProcedure(4, 0);
+
+    procStore.insert(a, null);
+    a.addStackId(0);
+    procStore.update(a);
+
+    procStore.insert(a, new Procedure[] { b });
+    b.addStackId(1);
+    procStore.update(b);
+
+    procStore.insert(b, new Procedure[] { c });
+    b.addStackId(2);
+    procStore.update(b);
+
+    procStore.rollWriter();
+
+    b.addStackId(3);
+    procStore.update(b);
+
+    a.addStackId(4);
+    procStore.update(a);
+
+    procStore.insert(d, null);
+    d.addStackId(0);
+    procStore.update(d);
+
+    storeRestart(new ProcedureStore.ProcedureLoader() {
+      @Override
+      public void setMaxProcId(long maxProcId) {
+        assertEquals(4, maxProcId);
+      }
+
+      @Override
+      public void load(ProcedureIterator procIter) throws IOException {
+        assertTrue(procIter.hasNext());
+        assertEquals(4, procIter.next().getProcId());
+        // TODO: This will be multiple call once we do fast-start
+        //assertFalse(procIter.hasNext());
+
+        assertTrue(procIter.hasNext());
+        assertEquals(1, procIter.next().getProcId());
+        assertTrue(procIter.hasNext());
+        assertEquals(2, procIter.next().getProcId());
+        assertTrue(procIter.hasNext());
+        assertEquals(3, procIter.next().getProcId());
+        assertFalse(procIter.hasNext());
+      }
+
+      @Override
+      public void handleCorrupted(ProcedureIterator procIter) throws IOException {
+        assertFalse(procIter.hasNext());
+      }
+    });
+  }
+
+  public static class TestProcedure extends Procedure<Void> {
+    public TestProcedure() {}
+
+    public TestProcedure(long procId, long parentId) {
+      setProcId(procId);
+      if (parentId > 0) {
+        setParentProcId(parentId);
+      }
+    }
+
+    public void addStackId(final int index) {
+      addStackIndex(index);
+    }
+
+    @Override
+    protected Procedure[] execute(Void env) { return null; }
+
+    @Override
+    protected void rollback(Void env) { }
+
+    @Override
+    protected boolean abort(Void env) { return false; }
+
+    @Override
+    protected void serializeStateData(final OutputStream stream) throws IOException { }
+
+    @Override
+    protected void deserializeStateData(final InputStream stream) throws IOException { }
   }
 
   private void corruptLog(final FileStatus logFile, final long dropBytes)
@@ -191,29 +397,11 @@ public class TestWALProcedureStore {
   }
 
   private void verifyProcIdsOnRestart(final Set<Long> procIds) throws Exception {
-    int count = 0;
-    Iterator<Procedure> loader = storeRestart();
-    while (loader.hasNext()) {
-      Procedure proc = loader.next();
-      LOG.debug("loading procId=" + proc.getProcId());
-      assertTrue("procId=" + proc.getProcId() + " unexpected", procIds.contains(proc.getProcId()));
-      count++;
-    }
-    assertEquals(procIds.size(), count);
-  }
-
-  private void assertIsEmpty(Iterator<Procedure> iterator) {
-    assertEquals(0, countProcedures(iterator));
-  }
-
-  private int countProcedures(Iterator<Procedure> iterator) {
-    int count = 0;
-    while (iterator.hasNext()) {
-      Procedure proc = iterator.next();
-      LOG.trace("loading procId=" + proc.getProcId());
-      count++;
-    }
-    return count;
+    LOG.debug("expected: " + procIds);
+    LoadCounter loader = new LoadCounter();
+    storeRestart(loader);
+    assertEquals(procIds.size(), loader.getLoadedCount());
+    assertEquals(0, loader.getCorruptedCount());
   }
 
   private void assertEmptyLogDir() {
@@ -263,4 +451,78 @@ public class TestWALProcedureStore {
       }
     }
   }
+
+  private class LoadCounter implements ProcedureStore.ProcedureLoader {
+    private final ArrayList<Procedure> corrupted = new ArrayList<Procedure>();
+    private final ArrayList<Procedure> loaded = new ArrayList<Procedure>();
+
+    private Set<Long> procIds;
+    private long maxProcId = 0;
+
+    public LoadCounter() {
+      this(null);
+    }
+
+    public LoadCounter(final Set<Long> procIds) {
+      this.procIds = procIds;
+    }
+
+    public void reset() {
+      reset(null);
+    }
+
+    public void reset(final Set<Long> procIds) {
+      corrupted.clear();
+      loaded.clear();
+      this.procIds = procIds;
+      this.maxProcId = 0;
+    }
+
+    public long getMaxProcId() {
+      return maxProcId;
+    }
+
+    public ArrayList<Procedure> getLoaded() {
+      return loaded;
+    }
+
+    public int getLoadedCount() {
+      return loaded.size();
+    }
+
+    public ArrayList<Procedure> getCorrupted() {
+      return corrupted;
+    }
+
+    public int getCorruptedCount() {
+      return corrupted.size();
+    }
+
+    @Override
+    public void setMaxProcId(long maxProcId) {
+      maxProcId = maxProcId;
+    }
+
+    @Override
+    public void load(ProcedureIterator procIter) throws IOException {
+      while (procIter.hasNext()) {
+        Procedure proc = procIter.next();
+        LOG.debug("loading procId=" + proc.getProcId() + ": " + proc);
+        if (procIds != null) {
+          assertTrue("procId=" + proc.getProcId() + " unexpected",
+                     procIds.contains(proc.getProcId()));
+        }
+        loaded.add(proc);
+      }
+    }
+
+    @Override
+    public void handleCorrupted(ProcedureIterator procIter) throws IOException {
+      while (procIter.hasNext()) {
+        Procedure proc = procIter.next();
+        LOG.debug("corrupted procId=" + proc.getProcId() + ": " + proc);
+        corrupted.add(proc);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/24ef755f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 2b180ad..7c99abb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -1131,8 +1131,11 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
     final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
         Math.max(Runtime.getRuntime().availableProcessors(),
           MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
+    final boolean abortOnCorruption = conf.getBoolean(
+        MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
+        MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
     procedureStore.start(numThreads);
-    procedureExecutor.start(numThreads);
+    procedureExecutor.start(numThreads, abortOnCorruption);
   }
 
   private void stopProcedureExecutor() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/24ef755f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java
index 90ed4ee..c21137d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java
@@ -24,8 +24,21 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 public final class MasterProcedureConstants {
   private MasterProcedureConstants() {}
 
+  /** Used to construct the name of the log directory for master procedures */
   public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs";
 
+  /** Number of threads used by the procedure executor */
   public static final String MASTER_PROCEDURE_THREADS = "hbase.master.procedure.threads";
   public static final int DEFAULT_MIN_MASTER_PROCEDURE_THREADS = 4;
+
+  /**
+   * Procedure replay sanity check. In case a WAL is missing or unreadable we
+   * may lose information about pending/running procedures.
+   * Set this to true in case you want the Master failing on load if a corrupted
+   * procedure is encountred.
+   * (Default is off, because we prefer having the Master up and running and
+   * fix the "in transition" state "by hand")
+   */
+  public static final String EXECUTOR_ABORT_ON_CORRUPTION = "hbase.procedure.abort.on.corruption";
+  public static final boolean DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION = false;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/24ef755f/hbase-server/src/test/resources/hbase-site.xml
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/resources/hbase-site.xml b/hbase-server/src/test/resources/hbase-site.xml
index 8c8312c..34a1b20 100644
--- a/hbase-server/src/test/resources/hbase-site.xml
+++ b/hbase-server/src/test/resources/hbase-site.xml
@@ -147,4 +147,11 @@
     <description>Skip sanity checks in tests
     </description>
   </property>
+  <property>
+    <name>hbase.procedure.fail.on.corruption</name>
+    <value>true</value>
+    <description>
+      Enable replay sanity checks on procedure tests.
+    </description>
+  </property>
 </configuration>