You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/10/20 14:00:27 UTC

hbase git commit: HBASE-21336 Simplify the implementation of WALProcedureMap

Repository: hbase
Updated Branches:
  refs/heads/master 4bf3c5a70 -> 7adf59010


HBASE-21336 Simplify the implementation of WALProcedureMap


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7adf5901
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7adf5901
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7adf5901

Branch: refs/heads/master
Commit: 7adf590106826b9e4432cfeee06acdc0ccff8c6e
Parents: 4bf3c5a
Author: zhangduo <zh...@apache.org>
Authored: Sat Oct 20 19:33:29 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sat Oct 20 21:59:46 2018 +0800

----------------------------------------------------------------------
 .../hbase/procedure2/ProcedureExecutor.java     | 102 ++--
 .../hbase/procedure2/store/ProcedureStore.java  |   6 +
 .../store/wal/ProcedureWALFormatReader.java     |  93 +--
 .../procedure2/store/wal/WALProcedureMap.java   | 591 ++-----------------
 .../procedure2/store/wal/WALProcedureTree.java  | 299 ++++++++++
 .../store/wal/TestStressWALProcedureStore.java  |   2 -
 .../store/wal/TestWALProcedureStore.java        | 164 +----
 .../store/wal/TestWALProcedureTree.java         | 173 ++++++
 .../hadoop/hbase/HBaseTestingUtility.java       |   1 +
 9 files changed, 627 insertions(+), 804 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf5901/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 0e2d9b8..91a305b 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -505,8 +505,10 @@ public class ProcedureExecutor<TEnvironment> {
   private void loadProcedures(ProcedureIterator procIter, boolean abortOnCorruption)
       throws IOException {
     // 1. Build the rollback stack
-    int runnablesCount = 0;
+    int runnableCount = 0;
     int failedCount = 0;
+    int waitingCount = 0;
+    int waitingTimeoutCount = 0;
     while (procIter.hasNext()) {
       boolean finished = procIter.isNextFinished();
       @SuppressWarnings("unchecked")
@@ -526,11 +528,21 @@ public class ProcedureExecutor<TEnvironment> {
         // add the procedure to the map
         proc.beforeReplay(getEnvironment());
         procedures.put(proc.getProcId(), proc);
-
-        if (proc.getState() == ProcedureState.RUNNABLE) {
-          runnablesCount++;
-        } else if (proc.getState() == ProcedureState.FAILED) {
-          failedCount++;
+        switch (proc.getState()) {
+          case RUNNABLE:
+            runnableCount++;
+            break;
+          case FAILED:
+            failedCount++;
+            break;
+          case WAITING:
+            waitingCount++;
+            break;
+          case WAITING_TIMEOUT:
+            waitingTimeoutCount++;
+            break;
+          default:
+            break;
         }
       }
 
@@ -551,9 +563,10 @@ public class ProcedureExecutor<TEnvironment> {
     // have been polled out already, so when loading we can not add the procedure to scheduler first
     // and then call acquireLock, since the procedure is still in the queue, and since we will
     // remove the queue from runQueue, then no one can poll it out, then there is a dead lock
-    List<Procedure<TEnvironment>> runnableList = new ArrayList<>(runnablesCount);
+    List<Procedure<TEnvironment>> runnableList = new ArrayList<>(runnableCount);
     List<Procedure<TEnvironment>> failedList = new ArrayList<>(failedCount);
-    Set<Procedure<TEnvironment>> waitingSet = null;
+    List<Procedure<TEnvironment>> waitingList = new ArrayList<>(waitingCount);
+    List<Procedure<TEnvironment>> waitingTimeoutList = new ArrayList<>(waitingTimeoutCount);
     procIter.reset();
     while (procIter.hasNext()) {
       if (procIter.isNextFinished()) {
@@ -591,26 +604,10 @@ public class ProcedureExecutor<TEnvironment> {
           runnableList.add(proc);
           break;
         case WAITING:
-          if (!proc.hasChildren()) {
-            // Normally, WAITING procedures should be waken by its children.
-            // But, there is a case that, all the children are successful and before
-            // they can wake up their parent procedure, the master was killed.
-            // So, during recovering the procedures from ProcedureWal, its children
-            // are not loaded because of their SUCCESS state.
-            // So we need to continue to run this WAITING procedure. But before
-            // executing, we need to set its state to RUNNABLE, otherwise, a exception
-            // will throw:
-            // Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,
-            // "NOT RUNNABLE! " + procedure.toString());
-            proc.setState(ProcedureState.RUNNABLE);
-            runnableList.add(proc);
-          }
+          waitingList.add(proc);
           break;
         case WAITING_TIMEOUT:
-          if (waitingSet == null) {
-            waitingSet = new HashSet<>();
-          }
-          waitingSet.add(proc);
+          waitingTimeoutList.add(proc);
           break;
         case FAILED:
           failedList.add(proc);
@@ -625,39 +622,32 @@ public class ProcedureExecutor<TEnvironment> {
       }
     }
 
-    // 3. Validate the stacks
-    int corruptedCount = 0;
-    Iterator<Map.Entry<Long, RootProcedureState<TEnvironment>>> itStack =
-      rollbackStack.entrySet().iterator();
-    while (itStack.hasNext()) {
-      Map.Entry<Long, RootProcedureState<TEnvironment>> entry = itStack.next();
-      RootProcedureState<TEnvironment> procStack = entry.getValue();
-      if (procStack.isValid()) continue;
-
-      for (Procedure<TEnvironment> proc : procStack.getSubproceduresStack()) {
-        LOG.error("Corrupted " + proc);
-        procedures.remove(proc.getProcId());
-        runnableList.remove(proc);
-        if (waitingSet != null) waitingSet.remove(proc);
-        corruptedCount++;
+    // 4. Check the waiting procedures to see if some of them can be added to runnable.
+    waitingList.forEach(proc -> {
+      if (!proc.hasChildren()) {
+        // Normally, WAITING procedures should be waken by its children.
+        // But, there is a case that, all the children are successful and before
+        // they can wake up their parent procedure, the master was killed.
+        // So, during recovering the procedures from ProcedureWal, its children
+        // are not loaded because of their SUCCESS state.
+        // So we need to continue to run this WAITING procedure. But before
+        // executing, we need to set its state to RUNNABLE, otherwise, a exception
+        // will throw:
+        // Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,
+        // "NOT RUNNABLE! " + procedure.toString());
+        proc.setState(ProcedureState.RUNNABLE);
+        runnableList.add(proc);
       }
-      itStack.remove();
-    }
-
-    if (abortOnCorruption && corruptedCount > 0) {
-      throw new IOException("found " + corruptedCount + " procedures on replay");
-    }
+    });
 
-    // 4. Push the procedures to the timeout executor
-    if (waitingSet != null && !waitingSet.isEmpty()) {
-      for (Procedure<TEnvironment> proc: waitingSet) {
-        proc.afterReplay(getEnvironment());
-        timeoutExecutor.add(proc);
-      }
-    }
-    // 5. restore locks
+    // 5. Push the procedures to the timeout executor
+    waitingTimeoutList.forEach(proc -> {
+      proc.afterReplay(getEnvironment());
+      timeoutExecutor.add(proc);
+    });
+    // 6. restore locks
     restoreLocks();
-    // 6. Push the procedure to the scheduler
+    // 7. Push the procedure to the scheduler
     failedList.forEach(scheduler::addBack);
     runnableList.forEach(p -> {
       p.afterReplay(getEnvironment());

http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf5901/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
index 0599acf..d737a7a 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
@@ -85,12 +85,18 @@ public interface ProcedureStore {
     boolean hasNext();
 
     /**
+     * Calling this method does not need to converting the protobuf message to the Procedure class,
+     * so if it returns true we can call {@link #skipNext()} to skip the procedure without
+     * deserializing. This could increase the performance.
      * @return true if the iterator next element is a completed procedure.
      */
     boolean isNextFinished();
 
     /**
      * Skip the next procedure
+     * <p/>
+     * This method is used to skip the deserializing of the procedure to increase performance, as
+     * when calling next we need to convert the protobuf message to the Procedure class.
      */
     void skipNext();
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf5901/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
index 1ac8e01..2e1e06c 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.procedure2.store.wal;
 
 import java.io.IOException;
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -31,70 +30,25 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
 
 /**
- * Helper class that loads the procedures stored in a WAL
+ * Helper class that loads the procedures stored in a WAL.
  */
 @InterfaceAudience.Private
 public class ProcedureWALFormatReader {
   private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFormatReader.class);
 
-  // ==============================================================================================
-  //  We read the WALs in reverse order from the newest to the oldest.
-  //  We have different entry types:
-  //   - INIT: Procedure submitted by the user (also known as 'root procedure')
-  //   - INSERT: Children added to the procedure <parentId>:[<childId>, ...]
-  //   - UPDATE: The specified procedure was updated
-  //   - DELETE: The procedure was removed (finished/rolledback and result TTL expired)
-  //
-  // In the WAL we can find multiple times the same procedure as UPDATE or INSERT.
-  // We read the WAL from top to bottom, so every time we find an entry of the
-  // same procedure, that will be the "latest" update (Caveat: with multiple threads writing
-  // the store, this assumption does not hold).
-  //
-  // We keep two in-memory maps:
-  //  - localProcedureMap: is the map containing the entries in the WAL we are processing
-  //  - procedureMap: is the map containing all the procedures we found up to the WAL in process.
-  // localProcedureMap is merged with the procedureMap once we reach the WAL EOF.
-  //
-  // Since we are reading the WALs in reverse order (newest to oldest),
-  // if we find an entry related to a procedure we already have in 'procedureMap' we can discard it.
-  //
-  // The WAL is append-only so the last procedure in the WAL is the one that
-  // was in execution at the time we crashed/closed the server.
-  // Given that, the procedure replay order can be inferred by the WAL order.
-  //
-  // Example:
-  //    WAL-2: [A, B, A, C, D]
-  //    WAL-1: [F, G, A, F, B]
-  //    Replay-Order: [D, C, A, B, F, G]
-  //
-  // The "localProcedureMap" keeps a "replayOrder" list. Every time we add the
-  // record to the map that record is moved to the head of the "replayOrder" list.
-  // Using the example above:
-  //    WAL-2 localProcedureMap.replayOrder is [D, C, A, B]
-  //    WAL-1 localProcedureMap.replayOrder is [F, G]
-  //
-  // Each time we reach the WAL-EOF, the "replayOrder" list is merged/appended in 'procedureMap'
-  // so using the example above we end up with: [D, C, A, B] + [F, G] as replay order.
-  //
-  //  Fast Start: INIT/INSERT record and StackIDs
-  // ---------------------------------------------
-  // We have two special records, INIT and INSERT, that track the first time
-  // the procedure was added to the WAL. We can use this information to be able
-  // to start procedures before reaching the end of the WAL, or before reading all WALs.
-  // But in some cases, the WAL with that record can be already gone.
-  // As an alternative, we can use the stackIds on each procedure,
-  // to identify when a procedure is ready to start.
-  // If there are gaps in the sum of the stackIds we need to read more WALs.
-  //
-  // Example (all procs child of A):
-  //   WAL-2: [A, B]                   A stackIds = [0, 4], B stackIds = [1, 5]
-  //   WAL-1: [A, B, C, D]
-  //
-  // In the case above we need to read one more WAL to be able to consider
-  // the root procedure A and all children as ready.
-  // ==============================================================================================
-  private final WALProcedureMap localProcedureMap = new WALProcedureMap(1024);
-  private final WALProcedureMap procedureMap = new WALProcedureMap(1024);
+  /**
+   * We will use the localProcedureMap to track the active procedures for the current proc wal file,
+   * and when we finished reading one proc wal file, we will merge he localProcedureMap to the
+   * procedureMap, which tracks the global active procedures.
+   * <p/>
+   * See the comments of {@link WALProcedureMap} for more details.
+   * <p/>
+   * After reading all the proc wal files, we will use the procedures in the procedureMap to build a
+   * {@link WALProcedureTree}, and then give the result to the upper layer. See the comments of
+   * {@link WALProcedureTree} and the code in {@link #finish()} for more details.
+   */
+  private final WALProcedureMap localProcedureMap = new WALProcedureMap();
+  private final WALProcedureMap procedureMap = new WALProcedureMap();
 
   private final ProcedureWALFormat.Loader loader;
 
@@ -178,7 +132,7 @@ public class ProcedureWALFormatReader {
         localTracker.setMinMaxModifiedProcIds(localProcedureMap.getMinModifiedProcId(),
           localProcedureMap.getMaxModifiedProcId());
       }
-      procedureMap.mergeTail(localProcedureMap);
+      procedureMap.merge(localProcedureMap);
     }
     if (localTracker.isPartial()) {
       localTracker.setPartialFlag(false);
@@ -189,18 +143,11 @@ public class ProcedureWALFormatReader {
     // notify the loader about the max proc ID
     loader.setMaxProcId(maxProcId);
 
-    // fetch the procedure ready to run.
-    ProcedureIterator procIter = procedureMap.fetchReady();
-    if (procIter != null) {
-      loader.load(procIter);
-    }
-
-    // remaining procedures have missing link or dependencies
-    // consider them as corrupted, manual fix is probably required.
-    procIter = procedureMap.fetchAll();
-    if (procIter != null) {
-      loader.handleCorrupted(procIter);
-    }
+    // build the procedure execution tree. When building we will verify that whether a procedure is
+    // valid.
+    WALProcedureTree tree = WALProcedureTree.build(procedureMap.getProcedures());
+    loader.load(tree.getValidProcs());
+    loader.handleCorrupted(tree.getCorruptedProcs());
   }
 
   private void setDeletedIfPartial(ProcedureStoreTracker tracker, long procId) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf5901/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java
index 18d7823..9cda1bc 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java
@@ -17,193 +17,50 @@
  */
 package org.apache.hadoop.hbase.procedure2.store.wal;
 
-import java.io.IOException;
-import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
-import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
 
 /**
- * We keep an in-memory map of the procedures sorted by replay order. (see the details in the
- * beginning of {@link ProcedureWALFormatReader}).
- *
- * <pre>
- *      procedureMap = | A |   | E |   | C |   |   |   |   | G |   |   |
- *                       D               B
- *      replayOrderHead = C <-> B <-> E <-> D <-> A <-> G
- *
- *  We also have a lazy grouping by "root procedure", and a list of
- *  unlinked procedures. If after reading all the WALs we have unlinked
- *  procedures it means that we had a missing WAL or a corruption.
- *      rootHead = A <-> D <-> G
- *                 B     E
- *                 C
- *      unlinkFromLinkList = None
- * </pre>
+ * This class is used to track the active procedures when loading procedures from proc wal file.
+ * <p/>
+ * We will read proc wal files from new to old, but when reading a proc wal file, we will still read
+ * from top to bottom, so there are two groups of methods for this class.
+ * <p/>
+ * The first group is {@link #add(ProcedureProtos.Procedure)} and {@link #remove(long)}. It is used
+ * when reading a proc wal file. In these methods, for the same procedure, typically the one comes
+ * later should win, please see the comment for
+ * {@link #isIncreasing(ProcedureProtos.Procedure, ProcedureProtos.Procedure)} to see the
+ * exceptions.
+ * <p/>
+ * The second group is {@link #merge(WALProcedureMap)}. We will have a global
+ * {@link WALProcedureMap} to hold global the active procedures, and a local {@link WALProcedureMap}
+ * to hold the active procedures for the current proc wal file. And when we finish reading a proc
+ * wal file, we will merge the local one into the global one, by calling the
+ * {@link #merge(WALProcedureMap)} method of the global one and pass the local one in. In this
+ * method, for the same procedure, the one comes earlier will win, as we read the proc wal files
+ * from new to old(the reverse order).
  */
+@InterfaceAudience.Private
 class WALProcedureMap {
 
   private static final Logger LOG = LoggerFactory.getLogger(WALProcedureMap.class);
 
-  private static class Entry {
-    // For bucketed linked lists in hash-table.
-    private Entry hashNext;
-    // child head
-    private Entry childHead;
-    // double-link for rootHead or childHead
-    private Entry linkNext;
-    private Entry linkPrev;
-    // replay double-linked-list
-    private Entry replayNext;
-    private Entry replayPrev;
-    // procedure-infos
-    private Procedure<?> procedure;
-    private ProcedureProtos.Procedure proto;
-    private boolean ready = false;
-
-    public Entry(Entry hashNext) {
-      this.hashNext = hashNext;
-    }
-
-    public long getProcId() {
-      return proto.getProcId();
-    }
-
-    public long getParentId() {
-      return proto.getParentId();
-    }
-
-    public boolean hasParent() {
-      return proto.hasParentId();
-    }
-
-    public boolean isReady() {
-      return ready;
-    }
-
-    public boolean isFinished() {
-      if (!hasParent()) {
-        // we only consider 'root' procedures. because for the user 'finished'
-        // means when everything up to the 'root' is finished.
-        switch (proto.getState()) {
-          case ROLLEDBACK:
-          case SUCCESS:
-            return true;
-          default:
-            break;
-        }
-      }
-      return false;
-    }
-
-    public Procedure<?> convert() throws IOException {
-      if (procedure == null) {
-        procedure = ProcedureUtil.convertToProcedure(proto);
-      }
-      return procedure;
-    }
-
-    @Override
-    public String toString() {
-      final StringBuilder sb = new StringBuilder();
-      sb.append("Entry(");
-      sb.append(getProcId());
-      sb.append(", parentId=");
-      sb.append(getParentId());
-      sb.append(", class=");
-      sb.append(proto.getClassName());
-      sb.append(")");
-      return sb.toString();
-    }
-  }
-
-  private static class EntryIterator implements ProcedureIterator {
-    private final Entry replayHead;
-    private Entry current;
-
-    public EntryIterator(Entry replayHead) {
-      this.replayHead = replayHead;
-      this.current = replayHead;
-    }
-
-    @Override
-    public void reset() {
-      this.current = replayHead;
-    }
-
-    @Override
-    public boolean hasNext() {
-      return current != null;
-    }
-
-    @Override
-    public boolean isNextFinished() {
-      return current != null && current.isFinished();
-    }
-
-    @Override
-    public void skipNext() {
-      current = current.replayNext;
-    }
-
-    @Override
-    public Procedure<?> next() throws IOException {
-      try {
-        return current.convert();
-      } finally {
-        current = current.replayNext;
-      }
-    }
-  }
-
-  // procedure hash table
-  private Entry[] procedureMap;
+  private final Map<Long, ProcedureProtos.Procedure> procMap = new HashMap<>();
 
-  // replay-order double-linked-list
-  private Entry replayOrderHead;
-  private Entry replayOrderTail;
-
-  // root linked-list
-  private Entry rootHead;
-
-  // pending unlinked children (root not present yet)
-  private Entry childUnlinkedHead;
-
-  // Track ProcId range
   private long minModifiedProcId = Long.MAX_VALUE;
-  private long maxModifiedProcId = Long.MIN_VALUE;
 
-  public WALProcedureMap(int size) {
-    procedureMap = new Entry[size];
-    replayOrderHead = null;
-    replayOrderTail = null;
-    rootHead = null;
-    childUnlinkedHead = null;
-  }
+  private long maxModifiedProcId = Long.MIN_VALUE;
 
-  public void add(ProcedureProtos.Procedure procProto) {
-    trackProcIds(procProto.getProcId());
-    Entry entry = addToMap(procProto.getProcId(), procProto.hasParentId());
-    boolean newEntry = entry.proto == null;
-    // We have seen procedure WALs where the entries are out of order; see HBASE-18152.
-    // To compensate, only replace the Entry procedure if for sure this new procedure
-    // is indeed an entry that came later.
-    // TODO: Fix the writing of procedure info so it does not violate basic expectation, that WALs
-    // contain procedure changes goingfrom start to finish in sequence.
-    if (newEntry || isIncreasing(entry.proto, procProto)) {
-      entry.proto = procProto;
-    }
-    addToReplayList(entry);
-    if (newEntry) {
-      if (procProto.hasParentId()) {
-        childUnlinkedHead = addToLinkList(entry, childUnlinkedHead);
-      } else {
-        rootHead = addToLinkList(entry, rootHead);
-      }
-    }
+  private void trackProcId(long procId) {
+    minModifiedProcId = Math.min(minModifiedProcId, procId);
+    maxModifiedProcId = Math.max(maxModifiedProcId, procId);
   }
 
   /**
@@ -225,383 +82,51 @@ class WALProcedureMap {
     return increasing;
   }
 
-  public boolean remove(long procId) {
-    trackProcIds(procId);
-    Entry entry = removeFromMap(procId);
-    if (entry != null) {
-      unlinkFromReplayList(entry);
-      unlinkFromLinkList(entry);
-      return true;
-    }
-    return false;
-  }
-
-  private void trackProcIds(long procId) {
-    minModifiedProcId = Math.min(minModifiedProcId, procId);
-    maxModifiedProcId = Math.max(maxModifiedProcId, procId);
-  }
-
-  public long getMinModifiedProcId() {
-    return minModifiedProcId;
-  }
-
-  public long getMaxModifiedProcId() {
-    return maxModifiedProcId;
+  public void add(ProcedureProtos.Procedure proc) {
+    procMap.compute(proc.getProcId(), (procId, existingProc) -> {
+      if (existingProc == null || isIncreasing(existingProc, proc)) {
+        return proc;
+      } else {
+        return existingProc;
+      }
+    });
+    trackProcId(proc.getProcId());
   }
 
-  public boolean contains(long procId) {
-    return getProcedure(procId) != null;
+  public void remove(long procId) {
+    procMap.remove(procId);
   }
 
   public boolean isEmpty() {
-    return replayOrderHead == null;
+    return procMap.isEmpty();
   }
 
-  public void clear() {
-    for (int i = 0; i < procedureMap.length; ++i) {
-      procedureMap[i] = null;
-    }
-    replayOrderHead = null;
-    replayOrderTail = null;
-    rootHead = null;
-    childUnlinkedHead = null;
-    minModifiedProcId = Long.MAX_VALUE;
-    maxModifiedProcId = Long.MIN_VALUE;
+  public boolean contains(long procId) {
+    return procMap.containsKey(procId);
   }
 
-  /*
-   * Merges two WalProcedureMap, the target is the "global" map, the source is the "local" map. -
-   * The entries in the hashtables are guaranteed to be unique. On replay we don't load procedures
-   * that already exist in the "global" map (the one we are merging the "local" in to). - The
-   * replayOrderList of the "local" nao will be appended to the "global" map replay list. - The
-   * "local" map will be cleared at the end of the operation.
+  /**
+   * Merge the given {@link WALProcedureMap} into this one. The {@link WALProcedureMap} passed in
+   * will be cleared after merging.
    */
-  public void mergeTail(WALProcedureMap other) {
-    for (Entry p = other.replayOrderHead; p != null; p = p.replayNext) {
-      int slotIndex = getMapSlot(p.getProcId());
-      p.hashNext = procedureMap[slotIndex];
-      procedureMap[slotIndex] = p;
-    }
-
-    if (replayOrderHead == null) {
-      replayOrderHead = other.replayOrderHead;
-      replayOrderTail = other.replayOrderTail;
-      rootHead = other.rootHead;
-      childUnlinkedHead = other.childUnlinkedHead;
-    } else {
-      // append replay list
-      assert replayOrderTail.replayNext == null;
-      assert other.replayOrderHead.replayPrev == null;
-      replayOrderTail.replayNext = other.replayOrderHead;
-      other.replayOrderHead.replayPrev = replayOrderTail;
-      replayOrderTail = other.replayOrderTail;
-
-      // merge rootHead
-      if (rootHead == null) {
-        rootHead = other.rootHead;
-      } else if (other.rootHead != null) {
-        Entry otherTail = findLinkListTail(other.rootHead);
-        otherTail.linkNext = rootHead;
-        rootHead.linkPrev = otherTail;
-        rootHead = other.rootHead;
-      }
-
-      // merge childUnlinkedHead
-      if (childUnlinkedHead == null) {
-        childUnlinkedHead = other.childUnlinkedHead;
-      } else if (other.childUnlinkedHead != null) {
-        Entry otherTail = findLinkListTail(other.childUnlinkedHead);
-        otherTail.linkNext = childUnlinkedHead;
-        childUnlinkedHead.linkPrev = otherTail;
-        childUnlinkedHead = other.childUnlinkedHead;
-      }
-    }
+  public void merge(WALProcedureMap other) {
+    other.procMap.forEach(procMap::putIfAbsent);
     maxModifiedProcId = Math.max(maxModifiedProcId, other.maxModifiedProcId);
     minModifiedProcId = Math.max(minModifiedProcId, other.minModifiedProcId);
-
-    other.clear();
-  }
-
-  /**
-   * Returns an EntryIterator with the list of procedures ready to be added to the executor. A
-   * Procedure is ready if its children and parent are ready.
-   */
-  public ProcedureIterator fetchReady() {
-    buildGraph();
-
-    Entry readyHead = null;
-    Entry readyTail = null;
-    Entry p = replayOrderHead;
-    while (p != null) {
-      Entry next = p.replayNext;
-      if (p.isReady()) {
-        unlinkFromReplayList(p);
-        if (readyTail != null) {
-          readyTail.replayNext = p;
-          p.replayPrev = readyTail;
-        } else {
-          p.replayPrev = null;
-          readyHead = p;
-        }
-        readyTail = p;
-        p.replayNext = null;
-      }
-      p = next;
-    }
-    // we need the hash-table lookups for parents, so this must be done
-    // out of the loop where we check isReadyToRun()
-    for (p = readyHead; p != null; p = p.replayNext) {
-      removeFromMap(p.getProcId());
-      unlinkFromLinkList(p);
-    }
-    return readyHead != null ? new EntryIterator(readyHead) : null;
-  }
-
-  /**
-   * Drain this map and return all procedures in it.
-   */
-  public ProcedureIterator fetchAll() {
-    Entry head = replayOrderHead;
-    for (Entry p = head; p != null; p = p.replayNext) {
-      removeFromMap(p.getProcId());
-    }
-    for (int i = 0; i < procedureMap.length; ++i) {
-      assert procedureMap[i] == null : "map not empty i=" + i;
-    }
-    replayOrderHead = null;
-    replayOrderTail = null;
-    childUnlinkedHead = null;
-    rootHead = null;
-    return head != null ? new EntryIterator(head) : null;
-  }
-
-  private void buildGraph() {
-    Entry p = childUnlinkedHead;
-    while (p != null) {
-      Entry next = p.linkNext;
-      Entry rootProc = getRootProcedure(p);
-      if (rootProc != null) {
-        rootProc.childHead = addToLinkList(p, rootProc.childHead);
-      }
-      p = next;
-    }
-
-    for (p = rootHead; p != null; p = p.linkNext) {
-      checkReadyToRun(p);
-    }
-  }
-
-  private Entry getRootProcedure(Entry entry) {
-    while (entry != null && entry.hasParent()) {
-      entry = getProcedure(entry.getParentId());
-    }
-    return entry;
-  }
-
-  /**
-   * (see the comprehensive explanation in the beginning of {@link ProcedureWALFormatReader}). A
-   * Procedure is ready when parent and children are ready. "ready" means that we all the
-   * information that we need in-memory.
-   * <p/>
-   * Example-1:<br/>
-   * We have two WALs, we start reading from the newest (wal-2)
-   *
-   * <pre>
-   *    wal-2 | C B |
-   *    wal-1 | A B C |
-   * </pre>
-   *
-   * If C and B don't depend on A (A is not the parent), we can start them before reading wal-1. If
-   * B is the only one with parent A we can start C. We have to read one more WAL before being able
-   * to start B.
-   * <p/>
-   * How do we know with the only information in B that we are not ready.
-   * <ul>
-   * <li>easy case, the parent is missing from the global map</li>
-   * <li>more complex case we look at the Stack IDs.</li>
-   * </ul>
-   * The Stack-IDs are added to the procedure order as an incremental index tracking how many times
-   * that procedure was executed, which is equivalent to the number of times we wrote the procedure
-   * to the WAL. <br/>
-   * In the example above:
-   *
-   * <pre>
-   *   wal-2: B has stackId = [1, 2]
-   *   wal-1: B has stackId = [1]
-   *   wal-1: A has stackId = [0]
-   * </pre>
-   *
-   * Since we know that the Stack-IDs are incremental for a Procedure, we notice that there is a gap
-   * in the stackIds of B, so something was executed before.
-   * <p/>
-   * To identify when a Procedure is ready we do the sum of the stackIds of the procedure and the
-   * parent. if the stackIdSum is equal to the sum of {1..maxStackId} then everything we need is
-   * available.
-   * <p/>
-   * Example-2
-   *
-   * <pre>
-   *    wal-2 | A |              A stackIds = [0, 2]
-   *    wal-1 | A B |            B stackIds = [1]
-   * </pre>
-   *
-   * There is a gap between A stackIds so something was executed in between.
-   */
-  private boolean checkReadyToRun(Entry rootEntry) {
-    assert !rootEntry.hasParent() : "expected root procedure, got " + rootEntry;
-
-    if (rootEntry.isFinished()) {
-      // If the root procedure is finished, sub-procedures should be gone
-      if (rootEntry.childHead != null) {
-        LOG.error("unexpected active children for root-procedure: {}", rootEntry);
-        for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
-          LOG.error("unexpected active children: {}", p);
-        }
-      }
-
-      assert rootEntry.childHead == null : "unexpected children on root completion. " + rootEntry;
-      rootEntry.ready = true;
-      return true;
-    }
-
-    int stackIdSum = 0;
-    int maxStackId = 0;
-    for (int i = 0; i < rootEntry.proto.getStackIdCount(); ++i) {
-      int stackId = 1 + rootEntry.proto.getStackId(i);
-      maxStackId = Math.max(maxStackId, stackId);
-      stackIdSum += stackId;
-      LOG.trace("stackId={} stackIdSum={} maxStackid={} {}", stackId, stackIdSum, maxStackId,
-        rootEntry);
-    }
-
-    for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
-      for (int i = 0; i < p.proto.getStackIdCount(); ++i) {
-        int stackId = 1 + p.proto.getStackId(i);
-        maxStackId = Math.max(maxStackId, stackId);
-        stackIdSum += stackId;
-        LOG.trace("stackId={} stackIdSum={} maxStackid={} {}", stackId, stackIdSum, maxStackId, p);
-      }
-    }
-    // The cmpStackIdSum is this formula for finding the sum of a series of numbers:
-    // http://www.wikihow.com/Sum-the-Integers-from-1-to-N#/Image:Sum-the-Integers-from-1-to-N-Step-2-Version-3.jpg
-    final int cmpStackIdSum = (maxStackId * (maxStackId + 1) / 2);
-    if (cmpStackIdSum == stackIdSum) {
-      rootEntry.ready = true;
-      for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
-        p.ready = true;
-      }
-      return true;
-    }
-    return false;
-  }
-
-  private void unlinkFromReplayList(Entry entry) {
-    if (replayOrderHead == entry) {
-      replayOrderHead = entry.replayNext;
-    }
-    if (replayOrderTail == entry) {
-      replayOrderTail = entry.replayPrev;
-    }
-    if (entry.replayPrev != null) {
-      entry.replayPrev.replayNext = entry.replayNext;
-    }
-    if (entry.replayNext != null) {
-      entry.replayNext.replayPrev = entry.replayPrev;
-    }
+    other.procMap.clear();
+    other.maxModifiedProcId = Long.MIN_VALUE;
+    other.minModifiedProcId = Long.MAX_VALUE;
   }
 
-  private void addToReplayList(final Entry entry) {
-    unlinkFromReplayList(entry);
-    entry.replayNext = replayOrderHead;
-    entry.replayPrev = null;
-    if (replayOrderHead != null) {
-      replayOrderHead.replayPrev = entry;
-    } else {
-      replayOrderTail = entry;
-    }
-    replayOrderHead = entry;
+  public Collection<ProcedureProtos.Procedure> getProcedures() {
+    return Collections.unmodifiableCollection(procMap.values());
   }
 
-  private void unlinkFromLinkList(Entry entry) {
-    if (entry == rootHead) {
-      rootHead = entry.linkNext;
-    } else if (entry == childUnlinkedHead) {
-      childUnlinkedHead = entry.linkNext;
-    }
-    if (entry.linkPrev != null) {
-      entry.linkPrev.linkNext = entry.linkNext;
-    }
-    if (entry.linkNext != null) {
-      entry.linkNext.linkPrev = entry.linkPrev;
-    }
-  }
-
-  private Entry addToLinkList(Entry entry, Entry linkHead) {
-    unlinkFromLinkList(entry);
-    entry.linkNext = linkHead;
-    entry.linkPrev = null;
-    if (linkHead != null) {
-      linkHead.linkPrev = entry;
-    }
-    return entry;
-  }
-
-  private Entry findLinkListTail(Entry linkHead) {
-    Entry tail = linkHead;
-    while (tail.linkNext != null) {
-      tail = tail.linkNext;
-    }
-    return tail;
-  }
-
-  private Entry addToMap(long procId, boolean hasParent) {
-    int slotIndex = getMapSlot(procId);
-    Entry entry = getProcedure(slotIndex, procId);
-    if (entry != null) {
-      return entry;
-    }
-
-    entry = new Entry(procedureMap[slotIndex]);
-    procedureMap[slotIndex] = entry;
-    return entry;
-  }
-
-  private Entry removeFromMap(final long procId) {
-    int slotIndex = getMapSlot(procId);
-    Entry prev = null;
-    Entry entry = procedureMap[slotIndex];
-    while (entry != null) {
-      if (procId == entry.getProcId()) {
-        if (prev != null) {
-          prev.hashNext = entry.hashNext;
-        } else {
-          procedureMap[slotIndex] = entry.hashNext;
-        }
-        entry.hashNext = null;
-        return entry;
-      }
-      prev = entry;
-      entry = entry.hashNext;
-    }
-    return null;
-  }
-
-  private Entry getProcedure(long procId) {
-    return getProcedure(getMapSlot(procId), procId);
-  }
-
-  private Entry getProcedure(int slotIndex, long procId) {
-    Entry entry = procedureMap[slotIndex];
-    while (entry != null) {
-      if (procId == entry.getProcId()) {
-        return entry;
-      }
-      entry = entry.hashNext;
-    }
-    return null;
+  public long getMinModifiedProcId() {
+    return minModifiedProcId;
   }
 
-  private int getMapSlot(long procId) {
-    return (int) (Procedure.getProcIdHashCode(procId) % procedureMap.length);
+  public long getMaxModifiedProcId() {
+    return maxModifiedProcId;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf5901/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureTree.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureTree.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureTree.java
new file mode 100644
index 0000000..c32bd7f
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureTree.java
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2.store.wal;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
+
+/**
+ * Used to build the tree for procedures.
+ * <p/>
+ * We will group the procedures with the root procedure, and then validate each group. For each
+ * group of procedures(with the same root procedure), we will collect all the stack ids, if the max
+ * stack id is n, then all the stack ids should be from 0 to n, non-repetition and non-omission. If
+ * not, we will consider all the procedures in this group as corrupted. Please see the code in
+ * {@link #checkReady(Entry, Map)} method.
+ * <p/>
+ * For the procedures not in any group, i.e, can not find the root procedure for these procedures,
+ * we will also consider them as corrupted. Please see the code in {@link #checkOrphan(Map)} method.
+ */
+@InterfaceAudience.Private
+public final class WALProcedureTree {
+
+  private static final Logger LOG = LoggerFactory.getLogger(WALProcedureTree.class);
+
+  private static final class Entry {
+
+    private final ProcedureProtos.Procedure proc;
+
+    private final List<Entry> subProcs = new ArrayList<>();
+
+    public Entry(ProcedureProtos.Procedure proc) {
+      this.proc = proc;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("Procedure(pid=");
+      sb.append(proc.getProcId());
+      sb.append(", ppid=");
+      sb.append(proc.hasParentId() ? proc.getParentId() : Procedure.NO_PROC_ID);
+      sb.append(", class=");
+      sb.append(proc.getClassName());
+      sb.append(")");
+      return sb.toString();
+    }
+  }
+
+  // when loading we will iterator the procedures twice, so use this class to cache the deserialized
+  // result to prevent deserializing multiple times.
+  private static final class ProtoAndProc {
+    private final ProcedureProtos.Procedure proto;
+
+    private Procedure<?> proc;
+
+    public ProtoAndProc(ProcedureProtos.Procedure proto) {
+      this.proto = proto;
+    }
+
+    public Procedure<?> getProc() throws IOException {
+      if (proc == null) {
+        proc = ProcedureUtil.convertToProcedure(proto);
+      }
+      return proc;
+    }
+  }
+
+  private final List<ProtoAndProc> validProcs = new ArrayList<>();
+
+  private final List<ProtoAndProc> corruptedProcs = new ArrayList<>();
+
+  private static boolean isFinished(ProcedureProtos.Procedure proc) {
+    if (!proc.hasParentId()) {
+      switch (proc.getState()) {
+        case ROLLEDBACK:
+        case SUCCESS:
+          return true;
+        default:
+          break;
+      }
+    }
+    return false;
+  }
+
+  private WALProcedureTree(Map<Long, Entry> procMap) {
+    List<Entry> rootEntries = buildTree(procMap);
+    for (Entry rootEntry : rootEntries) {
+      checkReady(rootEntry, procMap);
+    }
+    checkOrphan(procMap);
+    Comparator<ProtoAndProc> cmp =
+      (p1, p2) -> Long.compare(p1.proto.getProcId(), p2.proto.getProcId());
+    Collections.sort(validProcs, cmp);
+    Collections.sort(corruptedProcs, cmp);
+  }
+
+  private List<Entry> buildTree(Map<Long, Entry> procMap) {
+    List<Entry> rootEntries = new ArrayList<>();
+    procMap.values().forEach(entry -> {
+      if (!entry.proc.hasParentId()) {
+        rootEntries.add(entry);
+      } else {
+        Entry parentEntry = procMap.get(entry.proc.getParentId());
+        // For a valid procedure this should not be null. We will log the error later if it is null,
+        // as it will not be referenced by any root procedures.
+        if (parentEntry != null) {
+          parentEntry.subProcs.add(entry);
+        }
+      }
+    });
+    return rootEntries;
+  }
+
+  private void collectStackId(Entry entry, Map<Integer, List<Entry>> stackId2Proc,
+      MutableInt maxStackId) {
+    for (int i = 0, n = entry.proc.getStackIdCount(); i < n; i++) {
+      int stackId = entry.proc.getStackId(i);
+      if (stackId > maxStackId.intValue()) {
+        maxStackId.setValue(stackId);
+      }
+      stackId2Proc.computeIfAbsent(stackId, k -> new ArrayList<>()).add(entry);
+    }
+    entry.subProcs.forEach(e -> collectStackId(e, stackId2Proc, maxStackId));
+  }
+
+  private void addAllToCorruptedAndRemoveFromProcMap(Entry entry,
+      Map<Long, Entry> remainingProcMap) {
+    corruptedProcs.add(new ProtoAndProc(entry.proc));
+    remainingProcMap.remove(entry.proc.getProcId());
+    for (Entry e : entry.subProcs) {
+      addAllToCorruptedAndRemoveFromProcMap(e, remainingProcMap);
+    }
+  }
+
+  private void addAllToValidAndRemoveFromProcMap(Entry entry, Map<Long, Entry> remainingProcMap) {
+    validProcs.add(new ProtoAndProc(entry.proc));
+    remainingProcMap.remove(entry.proc.getProcId());
+    for (Entry e : entry.subProcs) {
+      addAllToValidAndRemoveFromProcMap(e, remainingProcMap);
+    }
+  }
+
+  // In this method first we will check whether the given root procedure and all its sub procedures
+  // are valid, through the procedure stack. And we will also remove all these procedures from the
+  // remainingProcMap, so at last, if there are still procedures in the map, we know that there are
+  // orphan procedures.
+  private void checkReady(Entry rootEntry, Map<Long, Entry> remainingProcMap) {
+    if (isFinished(rootEntry.proc)) {
+      if (!rootEntry.subProcs.isEmpty()) {
+        LOG.error("unexpected active children for root-procedure: {}", rootEntry);
+        rootEntry.subProcs.forEach(e -> LOG.error("unexpected active children: {}", e));
+        addAllToCorruptedAndRemoveFromProcMap(rootEntry, remainingProcMap);
+      } else {
+        addAllToValidAndRemoveFromProcMap(rootEntry, remainingProcMap);
+      }
+      return;
+    }
+    Map<Integer, List<Entry>> stackId2Proc = new HashMap<>();
+    MutableInt maxStackId = new MutableInt(Integer.MIN_VALUE);
+    collectStackId(rootEntry, stackId2Proc, maxStackId);
+    // the stack ids should start from 0 and increase by one every time
+    boolean valid = true;
+    for (int i = 0; i <= maxStackId.intValue(); i++) {
+      List<Entry> entries = stackId2Proc.get(i);
+      if (entries == null) {
+        LOG.error("Missing stack id {}, max stack id is {}, root procedure is {}", i, maxStackId,
+          rootEntry);
+        valid = false;
+      } else if (entries.size() > 1) {
+        LOG.error("Multiple procedures {} have the same stack id {}, max stack id is {}," +
+          " root procedure is {}", entries, i, maxStackId, rootEntry);
+        valid = false;
+      }
+    }
+    if (valid) {
+      addAllToValidAndRemoveFromProcMap(rootEntry, remainingProcMap);
+    } else {
+      addAllToCorruptedAndRemoveFromProcMap(rootEntry, remainingProcMap);
+    }
+  }
+
+  private void checkOrphan(Map<Long, Entry> procMap) {
+    procMap.values().forEach(entry -> {
+      LOG.error("Orphan procedure: {}", entry);
+      corruptedProcs.add(new ProtoAndProc(entry.proc));
+    });
+  }
+
+  private static final class Iter implements ProcedureIterator {
+
+    private final List<ProtoAndProc> procs;
+
+    private Iterator<ProtoAndProc> iter;
+
+    private ProtoAndProc current;
+
+    public Iter(List<ProtoAndProc> procs) {
+      this.procs = procs;
+      reset();
+    }
+
+    @Override
+    public void reset() {
+      iter = procs.iterator();
+      if (iter.hasNext()) {
+        current = iter.next();
+      } else {
+        current = null;
+      }
+    }
+
+    @Override
+    public boolean hasNext() {
+      return current != null;
+    }
+
+    private void checkNext() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+    }
+
+    @Override
+    public boolean isNextFinished() {
+      checkNext();
+      return isFinished(current.proto);
+    }
+
+    private void moveToNext() {
+      if (iter.hasNext()) {
+        current = iter.next();
+      } else {
+        current = null;
+      }
+    }
+
+    @Override
+    public void skipNext() {
+      checkNext();
+      moveToNext();
+    }
+
+    @Override
+    public Procedure<?> next() throws IOException {
+      checkNext();
+      Procedure<?> proc = current.getProc();
+      moveToNext();
+      return proc;
+    }
+  }
+
+  public ProcedureIterator getValidProcs() {
+    return new Iter(validProcs);
+  }
+
+  public ProcedureIterator getCorruptedProcs() {
+    return new Iter(corruptedProcs);
+  }
+
+  public static WALProcedureTree build(Collection<ProcedureProtos.Procedure> procedures) {
+    Map<Long, Entry> procMap = new HashMap<>();
+    for (ProcedureProtos.Procedure proc : procedures) {
+      procMap.put(proc.getProcId(), new Entry(proc));
+    }
+    return new WALProcedureTree(procMap);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf5901/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java
index 443386d..da53fa5 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java
@@ -18,9 +18,7 @@
 package org.apache.hadoop.hbase.procedure2.store.wal;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.Random;

http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf5901/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
index d682481..0f598b0 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.procedure2.store.wal;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -44,7 +43,6 @@ import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
 import org.apache.hadoop.hbase.procedure2.SequentialProcedure;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
-import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -72,7 +70,6 @@ public class TestWALProcedureStore {
   private static final Logger LOG = LoggerFactory.getLogger(TestWALProcedureStore.class);
 
   private static final int PROCEDURE_STORE_SLOTS = 1;
-  private static final Procedure NULL_PROC = null;
 
   private WALProcedureStore procStore;
 
@@ -153,7 +150,7 @@ public class TestWALProcedureStore {
 
   @Test
   public void testWalCleanerSequentialClean() throws Exception {
-    final Procedure[] procs = new Procedure[5];
+    final Procedure<?>[] procs = new Procedure[5];
     ArrayList<ProcedureWALFile> logs = null;
 
     // Insert procedures and roll wal after every insert.
@@ -182,7 +179,7 @@ public class TestWALProcedureStore {
   // they are in the starting of the list.
   @Test
   public void testWalCleanerNoHoles() throws Exception {
-    final Procedure[] procs = new Procedure[5];
+    final Procedure<?>[] procs = new Procedure[5];
     ArrayList<ProcedureWALFile> logs = null;
     // Insert procedures and roll wal after every insert.
     for (int i = 0; i < procs.length; i++) {
@@ -242,7 +239,7 @@ public class TestWALProcedureStore {
 
   @Test
   public void testWalCleanerWithEmptyRolls() throws Exception {
-    final Procedure[] procs = new Procedure[3];
+    final Procedure<?>[] procs = new Procedure[3];
     for (int i = 0; i < procs.length; ++i) {
       procs[i] = new TestSequentialProcedure();
       procStore.insert(procs[i], null);
@@ -284,12 +281,12 @@ public class TestWALProcedureStore {
     Set<Long> procIds = new HashSet<>();
 
     // Insert something in the log
-    Procedure proc1 = new TestSequentialProcedure();
+    Procedure<?> proc1 = new TestSequentialProcedure();
     procIds.add(proc1.getProcId());
     procStore.insert(proc1, null);
 
-    Procedure proc2 = new TestSequentialProcedure();
-    Procedure[] child2 = new Procedure[2];
+    Procedure<?> proc2 = new TestSequentialProcedure();
+    Procedure<?>[] child2 = new Procedure[2];
     child2[0] = new TestSequentialProcedure();
     child2[1] = new TestSequentialProcedure();
 
@@ -323,11 +320,11 @@ public class TestWALProcedureStore {
   @Test
   public void testNoTrailerDoubleRestart() throws Exception {
     // log-0001: proc 0, 1 and 2 are inserted
-    Procedure proc0 = new TestSequentialProcedure();
+    Procedure<?> proc0 = new TestSequentialProcedure();
     procStore.insert(proc0, null);
-    Procedure proc1 = new TestSequentialProcedure();
+    Procedure<?> proc1 = new TestSequentialProcedure();
     procStore.insert(proc1, null);
-    Procedure proc2 = new TestSequentialProcedure();
+    Procedure<?> proc2 = new TestSequentialProcedure();
     procStore.insert(proc2, null);
     procStore.rollWriterForTesting();
 
@@ -420,7 +417,7 @@ public class TestWALProcedureStore {
   }
 
   private static void assertUpdated(final ProcedureStoreTracker tracker,
-      final Procedure[] procs, final int[] updatedProcs, final int[] nonUpdatedProcs) {
+      final Procedure<?>[] procs, final int[] updatedProcs, final int[] nonUpdatedProcs) {
     for (int index : updatedProcs) {
       long procId = procs[index].getProcId();
       assertTrue("Procedure id : " + procId, tracker.isModified(procId));
@@ -432,7 +429,7 @@ public class TestWALProcedureStore {
   }
 
   private static void assertDeleted(final ProcedureStoreTracker tracker,
-      final Procedure[] procs, final int[] deletedProcs, final int[] nonDeletedProcs) {
+      final Procedure<?>[] procs, final int[] deletedProcs, final int[] nonDeletedProcs) {
     for (int index : deletedProcs) {
       long procId = procs[index].getProcId();
       assertEquals("Procedure id : " + procId,
@@ -447,7 +444,7 @@ public class TestWALProcedureStore {
 
   @Test
   public void testCorruptedTrailersRebuild() throws Exception {
-    final Procedure[] procs = new Procedure[6];
+    final Procedure<?>[] procs = new Procedure[6];
     for (int i = 0; i < procs.length; ++i) {
       procs[i] = new TestSequentialProcedure();
     }
@@ -575,127 +572,20 @@ public class TestWALProcedureStore {
     storeRestart(loader);
     assertEquals(0, loader.getLoadedCount());
     assertEquals(rootProcs.length, loader.getCorruptedCount());
-    for (Procedure proc: loader.getCorrupted()) {
+    for (Procedure<?> proc : loader.getCorrupted()) {
       assertTrue(proc.toString(), proc.getParentProcId() <= rootProcs.length);
       assertTrue(proc.toString(),
-                  proc.getProcId() > rootProcs.length &&
-                  proc.getProcId() <= (rootProcs.length * 2));
+        proc.getProcId() > rootProcs.length && proc.getProcId() <= (rootProcs.length * 2));
     }
   }
 
   @Test
-  public void testWalReplayOrder_AB_A() throws Exception {
-    /*
-     * | A B | -> | A |
-     */
-    TestProcedure a = new TestProcedure(1, 0);
-    TestProcedure b = new TestProcedure(2, 1);
-
-    procStore.insert(a, null);
-    a.addStackId(0);
-    procStore.update(a);
-
-    procStore.insert(a, new Procedure[] { b });
-    b.addStackId(1);
-    procStore.update(b);
-
-    procStore.rollWriterForTesting();
-
-    a.addStackId(2);
-    procStore.update(a);
-
-    storeRestart(new ProcedureStore.ProcedureLoader() {
-      @Override
-      public void setMaxProcId(long maxProcId) {
-        assertEquals(2, maxProcId);
-      }
-
-      @Override
-      public void load(ProcedureIterator procIter) throws IOException {
-        assertTrue(procIter.hasNext());
-        assertEquals(1, procIter.next().getProcId());
-        assertTrue(procIter.hasNext());
-        assertEquals(2, procIter.next().getProcId());
-        assertFalse(procIter.hasNext());
-      }
-
-      @Override
-      public void handleCorrupted(ProcedureIterator procIter) throws IOException {
-        assertFalse(procIter.hasNext());
-      }
-    });
-  }
-
-  @Test
-  public void testWalReplayOrder_ABC_BAD() throws Exception {
-    /*
-     * | A B C | -> | B A D |
-     */
-    TestProcedure a = new TestProcedure(1, 0);
-    TestProcedure b = new TestProcedure(2, 1);
-    TestProcedure c = new TestProcedure(3, 2);
-    TestProcedure d = new TestProcedure(4, 0);
-
-    procStore.insert(a, null);
-    a.addStackId(0);
-    procStore.update(a);
-
-    procStore.insert(a, new Procedure[] { b });
-    b.addStackId(1);
-    procStore.update(b);
-
-    procStore.insert(b, new Procedure[] { c });
-    b.addStackId(2);
-    procStore.update(b);
-
-    procStore.rollWriterForTesting();
-
-    b.addStackId(3);
-    procStore.update(b);
-
-    a.addStackId(4);
-    procStore.update(a);
-
-    procStore.insert(d, null);
-    d.addStackId(0);
-    procStore.update(d);
-
-    storeRestart(new ProcedureStore.ProcedureLoader() {
-      @Override
-      public void setMaxProcId(long maxProcId) {
-        assertEquals(4, maxProcId);
-      }
-
-      @Override
-      public void load(ProcedureIterator procIter) throws IOException {
-        assertTrue(procIter.hasNext());
-        assertEquals(4, procIter.next().getProcId());
-        // TODO: This will be multiple call once we do fast-start
-        //assertFalse(procIter.hasNext());
-
-        assertTrue(procIter.hasNext());
-        assertEquals(1, procIter.next().getProcId());
-        assertTrue(procIter.hasNext());
-        assertEquals(2, procIter.next().getProcId());
-        assertTrue(procIter.hasNext());
-        assertEquals(3, procIter.next().getProcId());
-        assertFalse(procIter.hasNext());
-      }
-
-      @Override
-      public void handleCorrupted(ProcedureIterator procIter) throws IOException {
-        assertFalse(procIter.hasNext());
-      }
-    });
-  }
-
-  @Test
   public void testRollAndRemove() throws IOException {
     // Insert something in the log
-    Procedure proc1 = new TestSequentialProcedure();
+    Procedure<?> proc1 = new TestSequentialProcedure();
     procStore.insert(proc1, null);
 
-    Procedure proc2 = new TestSequentialProcedure();
+    Procedure<?> proc2 = new TestSequentialProcedure();
     procStore.insert(proc2, null);
 
     // roll the log, now we have 2
@@ -942,17 +832,6 @@ public class TestWALProcedureStore {
     assertEquals(0, loader.getCorruptedCount());
   }
 
-  private void assertEmptyLogDir() {
-    try {
-      FileStatus[] status = fs.listStatus(logDir);
-      assertTrue("expected empty state-log dir", status == null || status.length == 0);
-    } catch (FileNotFoundException e) {
-      fail("expected the state-log dir to be present: " + logDir);
-    } catch (IOException e) {
-      fail("got en exception on state-log dir list: " + e.getMessage());
-    }
-  }
-
   public static class TestSequentialProcedure extends SequentialProcedure<Void> {
     private static long seqid = 0;
 
@@ -961,13 +840,18 @@ public class TestWALProcedureStore {
     }
 
     @Override
-    protected Procedure[] execute(Void env) { return null; }
+    protected Procedure<Void>[] execute(Void env) {
+      return null;
+    }
 
     @Override
-    protected void rollback(Void env) { }
+    protected void rollback(Void env) {
+    }
 
     @Override
-    protected boolean abort(Void env) { return false; }
+    protected boolean abort(Void env) {
+      return false;
+    }
 
     @Override
     protected void serializeStateData(ProcedureStateSerializer serializer)

http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf5901/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureTree.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureTree.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureTree.java
new file mode 100644
index 0000000..890d0e3
--- /dev/null
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureTree.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2.store.wal;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
+
+@Category({ MasterTests.class, SmallTests.class })
+public class TestWALProcedureTree {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestWALProcedureTree.class);
+
+  public static final class TestProcedure extends Procedure<Void> {
+
+    @Override
+    public void setProcId(long procId) {
+      super.setProcId(procId);
+    }
+
+    @Override
+    public void setParentProcId(long parentProcId) {
+      super.setParentProcId(parentProcId);
+    }
+
+    @Override
+    public synchronized void addStackIndex(int index) {
+      super.addStackIndex(index);
+    }
+
+    @Override
+    protected Procedure<Void>[] execute(Void env)
+        throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
+      return null;
+    }
+
+    @Override
+    protected void rollback(Void env) throws IOException, InterruptedException {
+    }
+
+    @Override
+    protected boolean abort(Void env) {
+      return false;
+    }
+
+    @Override
+    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
+    }
+
+    @Override
+    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
+    }
+  }
+
+  private TestProcedure createProc(long procId, long parentProcId) {
+    TestProcedure proc = new TestProcedure();
+    proc.setProcId(procId);
+    if (parentProcId != Procedure.NO_PROC_ID) {
+      proc.setParentProcId(parentProcId);
+    }
+    return proc;
+  }
+
+  private List<ProcedureProtos.Procedure> toProtos(TestProcedure... procs) {
+    return Arrays.stream(procs).map(p -> {
+      try {
+        return ProcedureUtil.convertToProtoProcedure(p);
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+    }).collect(Collectors.toList());
+  }
+
+  private List<TestProcedure> getProcs(ProcedureIterator iter) throws IOException {
+    List<TestProcedure> procs = new ArrayList<>();
+    while (iter.hasNext()) {
+      procs.add((TestProcedure) iter.next());
+    }
+    return procs;
+  }
+
+  @Test
+  public void testMissingStackId() throws IOException {
+    TestProcedure proc0 = createProc(1, Procedure.NO_PROC_ID);
+    proc0.addStackIndex(0);
+    TestProcedure proc1 = createProc(2, 1);
+    proc1.addStackIndex(1);
+    TestProcedure proc2 = createProc(3, 2);
+    proc2.addStackIndex(3);
+    WALProcedureTree tree = WALProcedureTree.build(toProtos(proc0, proc1, proc2));
+    List<TestProcedure> validProcs = getProcs(tree.getValidProcs());
+    assertEquals(0, validProcs.size());
+    List<TestProcedure> corruptedProcs = getProcs(tree.getCorruptedProcs());
+    assertEquals(3, corruptedProcs.size());
+    assertEquals(1, corruptedProcs.get(0).getProcId());
+    assertEquals(2, corruptedProcs.get(1).getProcId());
+    assertEquals(3, corruptedProcs.get(2).getProcId());
+  }
+
+  @Test
+  public void testDuplicatedStackId() throws IOException {
+    TestProcedure proc0 = createProc(1, Procedure.NO_PROC_ID);
+    proc0.addStackIndex(0);
+    TestProcedure proc1 = createProc(2, 1);
+    proc1.addStackIndex(1);
+    TestProcedure proc2 = createProc(3, 2);
+    proc2.addStackIndex(1);
+    WALProcedureTree tree = WALProcedureTree.build(toProtos(proc0, proc1, proc2));
+    List<TestProcedure> validProcs = getProcs(tree.getValidProcs());
+    assertEquals(0, validProcs.size());
+    List<TestProcedure> corruptedProcs = getProcs(tree.getCorruptedProcs());
+    assertEquals(3, corruptedProcs.size());
+    assertEquals(1, corruptedProcs.get(0).getProcId());
+    assertEquals(2, corruptedProcs.get(1).getProcId());
+    assertEquals(3, corruptedProcs.get(2).getProcId());
+  }
+
+  @Test
+  public void testOrphan() throws IOException {
+    TestProcedure proc0 = createProc(1, Procedure.NO_PROC_ID);
+    proc0.addStackIndex(0);
+    TestProcedure proc1 = createProc(2, 1);
+    proc1.addStackIndex(1);
+    TestProcedure proc2 = createProc(3, Procedure.NO_PROC_ID);
+    proc2.addStackIndex(0);
+    TestProcedure proc3 = createProc(5, 4);
+    proc3.addStackIndex(1);
+    WALProcedureTree tree = WALProcedureTree.build(toProtos(proc0, proc1, proc2, proc3));
+    List<TestProcedure> validProcs = getProcs(tree.getValidProcs());
+    assertEquals(3, validProcs.size());
+    List<TestProcedure> corruptedProcs = getProcs(tree.getCorruptedProcs());
+    assertEquals(1, corruptedProcs.size());
+    assertEquals(5, corruptedProcs.get(0).getProcId());
+    assertEquals(4, corruptedProcs.get(0).getParentProcId());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/7adf5901/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index e2e4aec..ab9a799 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -171,6 +171,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
  * avoiding port contention if another local HBase instance is already running).
  * <p>To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
  * setting it to true.
+ * Trigger pre commit.
  */
 @InterfaceAudience.Public
 @SuppressWarnings("deprecation")