You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/12/25 04:24:50 UTC

[hbase] branch branch-2 updated: HBASE-23326 Implement a ProcedureStore which stores procedures in a HRegion (#941)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 5cae75e  HBASE-23326 Implement a ProcedureStore which stores procedures in a HRegion (#941)
5cae75e is described below

commit 5cae75e12435190c519dd1a3e8aa89ff308c2f19
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Wed Dec 25 12:02:12 2019 +0800

    HBASE-23326 Implement a ProcedureStore which stores procedures in a HRegion (#941)
    
    Signed-off-by: Guanghao Zhang <zg...@apache.org>
    Signed-off-by: stack <st...@apache.org>
---
 .../hadoop/hbase/client/RawAsyncHBaseAdmin.java    |   8 +-
 .../org/apache/hadoop/hbase/client/RegionInfo.java |   2 +-
 .../procedure2/CompletedProcedureCleaner.java      |   3 +
 .../hadoop/hbase/procedure2/ProcedureUtil.java     |  13 +
 .../store/InMemoryProcedureIterator.java           |  94 ++++
 ...edureStoreException.java => LeaseRecovery.java} |  28 +-
 .../hbase/procedure2/store/ProcedureStore.java     |  24 +-
 .../WALProcedureTree.java => ProcedureTree.java}   | 134 +----
 ...eStoreException.java => ProtoAndProcedure.java} |  38 +-
 .../procedure2/store/{ => wal}/BitSetNode.java     |   7 +-
 .../wal/CorruptedWALProcedureStoreException.java   |   3 +
 .../store/{ => wal}/ProcedureStoreTracker.java     |   7 +-
 .../procedure2/store/wal/ProcedureWALFile.java     |   6 +-
 .../procedure2/store/wal/ProcedureWALFormat.java   |   6 +-
 .../store/wal/ProcedureWALFormatReader.java        |  13 +-
 .../store/wal/ProcedureWALPrettyPrinter.java       |   3 +
 .../procedure2/store/wal/WALProcedureMap.java      |   3 +
 .../procedure2/store/wal/WALProcedureStore.java    |  21 +-
 .../hbase/procedure2/ProcedureTestingUtility.java  | 127 ++---
 ...ALProcedureTree.java => TestProcedureTree.java} |  12 +-
 .../wal/ProcedureWALPerformanceEvaluation.java     |   3 +-
 .../procedure2/store/{ => wal}/TestBitSetNode.java |   4 +-
 .../store/{ => wal}/TestProcedureStoreTracker.java |   2 +-
 .../store/wal/TestWALProcedureStore.java           |   6 +-
 .../org/apache/hadoop/hbase/master/HMaster.java    |  38 +-
 .../hbase/master/procedure/MasterProcedureEnv.java |   6 +-
 .../master/procedure/MasterProcedureUtil.java      |  10 +-
 .../store/region/RegionFlusherAndCompactor.java    | 239 +++++++++
 .../store/region/RegionProcedureStore.java         | 584 +++++++++++++++++++++
 .../region/RegionProcedureStoreWALRoller.java      | 120 +++++
 .../apache/hadoop/hbase/regionserver/HRegion.java  | 216 ++++----
 .../hadoop/hbase/regionserver/LogRoller.java       | 189 +------
 .../hbase/regionserver/wal/AbstractFSWAL.java      |   5 +-
 .../LogRoller.java => wal/AbstractWALRoller.java}  | 100 ++--
 .../org/apache/hadoop/hbase/wal/WALFactory.java    |   4 +-
 .../resources/hbase-webapps/master/procedures.jsp  | 114 ----
 .../hbase/master/TestLoadProcedureError.java       |   2 +-
 .../hbase/master/TestMasterMetricsWrapper.java     |   6 +-
 .../procedure/TestMasterProcedureWalLease.java     | 238 ---------
 .../region/RegionProcedureStoreTestHelper.java     |  54 ++
 .../region/RegionProcedureStoreTestProcedure.java  |  77 +++
 .../store/region/TestRegionProcedureStore.java     | 159 ++++++
 .../region/TestRegionProcedureStoreMigration.java  | 143 +++++
 .../region/TestRegionProcedureStoreWALCleaner.java | 129 +++++
 .../TestRegionServerCrashDisableWAL.java           |   3 +-
 45 files changed, 2038 insertions(+), 965 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 98dda51..1527f10 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -1182,12 +1182,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   }
 
   private byte[] toEncodeRegionName(byte[] regionName) {
-    try {
-      return RegionInfo.isEncodedRegionName(regionName) ? regionName
-          : Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
-    } catch (IOException e) {
-      return regionName;
-    }
+    return RegionInfo.isEncodedRegionName(regionName) ? regionName :
+      Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
   }
 
   private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionInfo.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionInfo.java
index 2f9e88d..a971332 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionInfo.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionInfo.java
@@ -351,7 +351,7 @@ public interface RegionInfo {
    * @return True if <code>regionName</code> represents an encoded name.
    */
   @InterfaceAudience.Private // For use by internals only.
-  public static boolean isEncodedRegionName(byte[] regionName) throws IOException {
+  public static boolean isEncodedRegionName(byte[] regionName) {
     // If not parseable as region name, presume encoded. TODO: add stringency; e.g. if hex.
     return parseRegionNameOrReturnNull(regionName) == null && regionName.length <= MD5_HEX_LENGTH;
   }
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/CompletedProcedureCleaner.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/CompletedProcedureCleaner.java
index e51b77b..796a8e4 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/CompletedProcedureCleaner.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/CompletedProcedureCleaner.java
@@ -134,5 +134,8 @@ class CompletedProcedureCleaner<TEnvironment> extends ProcedureInMemoryChore<TEn
     if (batchCount > 0) {
       store.delete(batchIds, 0, batchCount);
     }
+    // let the store do some cleanup works, i.e, delete the place marker for preserving the max
+    // procedure id.
+    store.cleanup();
   }
 }
\ No newline at end of file
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java
index 30201ca..c557c20 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java
@@ -370,4 +370,17 @@ public final class ProcedureUtil {
       .setMaxSleepTime(maxSleepTimeMs).setBackoffPolicy(new ExponentialBackoffPolicyWithLimit());
     return new RetryCounter(retryConfig);
   }
+
+  public static boolean isFinished(ProcedureProtos.Procedure proc) {
+    if (!proc.hasParentId()) {
+      switch (proc.getState()) {
+        case ROLLEDBACK:
+        case SUCCESS:
+          return true;
+        default:
+          break;
+      }
+    }
+    return false;
+  }
 }
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/InMemoryProcedureIterator.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/InMemoryProcedureIterator.java
new file mode 100644
index 0000000..aba71b9
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/InMemoryProcedureIterator.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2.store;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * A procedure iterator which holds all the procedure protos in memory. For fast access.
+ */
+@InterfaceAudience.Private
+public class InMemoryProcedureIterator implements ProcedureIterator {
+
+  private final List<ProtoAndProcedure> procs;
+
+  private Iterator<ProtoAndProcedure> iter;
+
+  private ProtoAndProcedure current;
+
+  public InMemoryProcedureIterator(List<ProtoAndProcedure> procs) {
+    this.procs = procs;
+    reset();
+  }
+
+  @Override
+  public void reset() {
+    iter = procs.iterator();
+    if (iter.hasNext()) {
+      current = iter.next();
+    } else {
+      current = null;
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    return current != null;
+  }
+
+  private void checkNext() {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+  }
+
+  @Override
+  public boolean isNextFinished() {
+    checkNext();
+    return ProcedureUtil.isFinished(current.getProto());
+  }
+
+  private void moveToNext() {
+    if (iter.hasNext()) {
+      current = iter.next();
+    } else {
+      current = null;
+    }
+  }
+
+  @Override
+  public void skipNext() {
+    checkNext();
+    moveToNext();
+  }
+
+  @Override
+  public Procedure<?> next() throws IOException {
+    checkNext();
+    Procedure<?> proc = current.getProcedure();
+    moveToNext();
+    return proc;
+  }
+}
\ No newline at end of file
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/LeaseRecovery.java
similarity index 62%
copy from hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java
copy to hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/LeaseRecovery.java
index ba4480f..7a9ea1b 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/LeaseRecovery.java
@@ -15,29 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.procedure2.store.wal;
+package org.apache.hadoop.hbase.procedure2.store;
 
-import org.apache.hadoop.hbase.HBaseIOException;
+import java.io.IOException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.yetus.audience.InterfaceAudience;
 
-/**
- * Thrown when a procedure WAL is corrupted
- */
 @InterfaceAudience.Private
-public class CorruptedWALProcedureStoreException extends HBaseIOException {
-
-  private static final long serialVersionUID = -3407300445435898074L;
-
-  /** default constructor */
-  public CorruptedWALProcedureStoreException() {
-    super();
-  }
+public interface LeaseRecovery {
 
-  /**
-   * Constructor
-   * @param s message
-   */
-  public CorruptedWALProcedureStoreException(String s) {
-    super(s);
-  }
-}
+  void recoverFileLease(FileSystem fs, Path path) throws IOException;
+}
\ No newline at end of file
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
index 4398888..4965e17 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
@@ -24,9 +24,18 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 
 /**
- * The ProcedureStore is used by the executor to persist the state of each procedure execution.
- * This allows to resume the execution of pending/in-progress procedures in case
- * of machine failure or service shutdown.
+ * The ProcedureStore is used by the executor to persist the state of each procedure execution. This
+ * allows to resume the execution of pending/in-progress procedures in case of machine failure or
+ * service shutdown.
+ * <p/>
+ * Notice that, the implementation must guarantee that the maxProcId when loading is the maximum one
+ * in the whole history, not only the current live procedures. This is very important as for
+ * executing remote procedures, we have some nonce checks at region server side to prevent executing
+ * non-idempotent operations more than once. If the procedure id could go back, then we may
+ * accidentally ignore some important operations such as region assign or unassign.<br/>
+ * This may lead to some garbages so we provide a {@link #cleanup()} method, the framework will call
+ * this method periodically and the store implementation could do some clean up works in this
+ * method.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
@@ -240,4 +249,13 @@ public interface ProcedureStore {
    * @param count the number of IDs to delete
    */
   void delete(long[] procIds, int offset, int count);
+
+  /**
+   * Will be called by the framework to give the store a chance to do some clean up works.
+   * <p/>
+   * Notice that this is for periodical clean up work, not for the clean up after close, if you want
+   * to close the store just call the {@link #stop(boolean)} method above.
+   */
+  default void cleanup() {
+  }
 }
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureTree.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureTree.java
similarity index 69%
rename from hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureTree.java
rename to hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureTree.java
index 6e624b4..4e615b9 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureTree.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureTree.java
@@ -15,18 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.procedure2.store.wal;
+package org.apache.hadoop.hbase.procedure2.store;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.NoSuchElementException;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
@@ -50,9 +47,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
  * we will also consider them as corrupted. Please see the code in {@link #checkOrphan(Map)} method.
  */
 @InterfaceAudience.Private
-public final class WALProcedureTree {
+public final class ProcedureTree {
 
-  private static final Logger LOG = LoggerFactory.getLogger(WALProcedureTree.class);
+  private static final Logger LOG = LoggerFactory.getLogger(ProcedureTree.class);
 
   private static final class Entry {
 
@@ -78,50 +75,18 @@ public final class WALProcedureTree {
     }
   }
 
-  // when loading we will iterator the procedures twice, so use this class to cache the deserialized
-  // result to prevent deserializing multiple times.
-  private static final class ProtoAndProc {
-    private final ProcedureProtos.Procedure proto;
+  private final List<ProtoAndProcedure> validProcs = new ArrayList<>();
 
-    private Procedure<?> proc;
+  private final List<ProtoAndProcedure> corruptedProcs = new ArrayList<>();
 
-    public ProtoAndProc(ProcedureProtos.Procedure proto) {
-      this.proto = proto;
-    }
-
-    public Procedure<?> getProc() throws IOException {
-      if (proc == null) {
-        proc = ProcedureUtil.convertToProcedure(proto);
-      }
-      return proc;
-    }
-  }
-
-  private final List<ProtoAndProc> validProcs = new ArrayList<>();
-
-  private final List<ProtoAndProc> corruptedProcs = new ArrayList<>();
-
-  private static boolean isFinished(ProcedureProtos.Procedure proc) {
-    if (!proc.hasParentId()) {
-      switch (proc.getState()) {
-        case ROLLEDBACK:
-        case SUCCESS:
-          return true;
-        default:
-          break;
-      }
-    }
-    return false;
-  }
-
-  private WALProcedureTree(Map<Long, Entry> procMap) {
+  private ProcedureTree(Map<Long, Entry> procMap) {
     List<Entry> rootEntries = buildTree(procMap);
     for (Entry rootEntry : rootEntries) {
       checkReady(rootEntry, procMap);
     }
     checkOrphan(procMap);
-    Comparator<ProtoAndProc> cmp =
-      (p1, p2) -> Long.compare(p1.proto.getProcId(), p2.proto.getProcId());
+    Comparator<ProtoAndProcedure> cmp =
+      (p1, p2) -> Long.compare(p1.getProto().getProcId(), p2.getProto().getProcId());
     Collections.sort(validProcs, cmp);
     Collections.sort(corruptedProcs, cmp);
   }
@@ -144,7 +109,7 @@ public final class WALProcedureTree {
   }
 
   private void collectStackId(Entry entry, Map<Integer, List<Entry>> stackId2Proc,
-      MutableInt maxStackId) {
+    MutableInt maxStackId) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Procedure {} stack ids={}", entry, entry.proc.getStackIdList());
     }
@@ -159,8 +124,8 @@ public final class WALProcedureTree {
   }
 
   private void addAllToCorruptedAndRemoveFromProcMap(Entry entry,
-      Map<Long, Entry> remainingProcMap) {
-    corruptedProcs.add(new ProtoAndProc(entry.proc));
+    Map<Long, Entry> remainingProcMap) {
+    corruptedProcs.add(new ProtoAndProcedure(entry.proc));
     remainingProcMap.remove(entry.proc.getProcId());
     for (Entry e : entry.subProcs) {
       addAllToCorruptedAndRemoveFromProcMap(e, remainingProcMap);
@@ -168,7 +133,7 @@ public final class WALProcedureTree {
   }
 
   private void addAllToValidAndRemoveFromProcMap(Entry entry, Map<Long, Entry> remainingProcMap) {
-    validProcs.add(new ProtoAndProc(entry.proc));
+    validProcs.add(new ProtoAndProcedure(entry.proc));
     remainingProcMap.remove(entry.proc.getProcId());
     for (Entry e : entry.subProcs) {
       addAllToValidAndRemoveFromProcMap(e, remainingProcMap);
@@ -180,7 +145,7 @@ public final class WALProcedureTree {
   // remainingProcMap, so at last, if there are still procedures in the map, we know that there are
   // orphan procedures.
   private void checkReady(Entry rootEntry, Map<Long, Entry> remainingProcMap) {
-    if (isFinished(rootEntry.proc)) {
+    if (ProcedureUtil.isFinished(rootEntry.proc)) {
       if (!rootEntry.subProcs.isEmpty()) {
         LOG.error("unexpected active children for root-procedure: {}", rootEntry);
         rootEntry.subProcs.forEach(e -> LOG.error("unexpected active children: {}", e));
@@ -217,86 +182,23 @@ public final class WALProcedureTree {
   private void checkOrphan(Map<Long, Entry> procMap) {
     procMap.values().forEach(entry -> {
       LOG.error("Orphan procedure: {}", entry);
-      corruptedProcs.add(new ProtoAndProc(entry.proc));
+      corruptedProcs.add(new ProtoAndProcedure(entry.proc));
     });
   }
 
-  private static final class Iter implements ProcedureIterator {
-
-    private final List<ProtoAndProc> procs;
-
-    private Iterator<ProtoAndProc> iter;
-
-    private ProtoAndProc current;
-
-    public Iter(List<ProtoAndProc> procs) {
-      this.procs = procs;
-      reset();
-    }
-
-    @Override
-    public void reset() {
-      iter = procs.iterator();
-      if (iter.hasNext()) {
-        current = iter.next();
-      } else {
-        current = null;
-      }
-    }
-
-    @Override
-    public boolean hasNext() {
-      return current != null;
-    }
-
-    private void checkNext() {
-      if (!hasNext()) {
-        throw new NoSuchElementException();
-      }
-    }
-
-    @Override
-    public boolean isNextFinished() {
-      checkNext();
-      return isFinished(current.proto);
-    }
-
-    private void moveToNext() {
-      if (iter.hasNext()) {
-        current = iter.next();
-      } else {
-        current = null;
-      }
-    }
-
-    @Override
-    public void skipNext() {
-      checkNext();
-      moveToNext();
-    }
-
-    @Override
-    public Procedure<?> next() throws IOException {
-      checkNext();
-      Procedure<?> proc = current.getProc();
-      moveToNext();
-      return proc;
-    }
-  }
-
   public ProcedureIterator getValidProcs() {
-    return new Iter(validProcs);
+    return new InMemoryProcedureIterator(validProcs);
   }
 
   public ProcedureIterator getCorruptedProcs() {
-    return new Iter(corruptedProcs);
+    return new InMemoryProcedureIterator(corruptedProcs);
   }
 
-  public static WALProcedureTree build(Collection<ProcedureProtos.Procedure> procedures) {
+  public static ProcedureTree build(Collection<ProcedureProtos.Procedure> procedures) {
     Map<Long, Entry> procMap = new HashMap<>();
     for (ProcedureProtos.Procedure proc : procedures) {
       procMap.put(proc.getProcId(), new Entry(proc));
     }
-    return new WALProcedureTree(procMap);
+    return new ProcedureTree(procMap);
   }
 }
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProtoAndProcedure.java
similarity index 51%
copy from hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java
copy to hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProtoAndProcedure.java
index ba4480f..0cdc480 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProtoAndProcedure.java
@@ -15,29 +15,37 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.procedure2.store.wal;
+package org.apache.hadoop.hbase.procedure2.store;
 
-import org.apache.hadoop.hbase.HBaseIOException;
+import java.io.IOException;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
+
 /**
- * Thrown when a procedure WAL is corrupted
+ * when loading we will iterator the procedures twice, so use this class to cache the deserialized
+ * result to prevent deserializing multiple times.
  */
 @InterfaceAudience.Private
-public class CorruptedWALProcedureStoreException extends HBaseIOException {
+public class ProtoAndProcedure {
+  private final ProcedureProtos.Procedure proto;
+
+  private Procedure<?> proc;
 
-  private static final long serialVersionUID = -3407300445435898074L;
+  public ProtoAndProcedure(ProcedureProtos.Procedure proto) {
+    this.proto = proto;
+  }
 
-  /** default constructor */
-  public CorruptedWALProcedureStoreException() {
-    super();
+  public Procedure<?> getProcedure() throws IOException {
+    if (proc == null) {
+      proc = ProcedureUtil.convertToProcedure(proto);
+    }
+    return proc;
   }
 
-  /**
-   * Constructor
-   * @param s message
-   */
-  public CorruptedWALProcedureStoreException(String s) {
-    super(s);
+  public ProcedureProtos.Procedure getProto() {
+    return proto;
   }
-}
+}
\ No newline at end of file
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/BitSetNode.java
similarity index 97%
rename from hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java
rename to hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/BitSetNode.java
index 78d2d91..98416a5 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/BitSetNode.java
@@ -15,13 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.procedure2.store;
+package org.apache.hadoop.hbase.procedure2.store.wal;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker.DeleteState;
+import org.apache.hadoop.hbase.procedure2.store.wal.ProcedureStoreTracker.DeleteState;
 import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
@@ -51,7 +51,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
  * For a non-partial BitSetNode, the initial modified value is 0 and deleted value is 1. For the
  * partial one, the initial modified value is 0 and the initial deleted value is also 0. In
  * {@link #unsetPartialFlag()} we will reset the deleted to 1 if it is not modified.
+ * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
+ *             use the new region based procedure store.
  */
+@Deprecated
 @InterfaceAudience.Private
 class BitSetNode {
   private static final long WORD_MASK = 0xffffffffffffffffL;
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java
index ba4480f..dc9d16c 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java
@@ -22,7 +22,10 @@ import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * Thrown when a procedure WAL is corrupted
+ * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
+ *             use the new region based procedure store.
  */
+@Deprecated
 @InterfaceAudience.Private
 public class CorruptedWALProcedureStoreException extends HBaseIOException {
 
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureStoreTracker.java
similarity index 98%
rename from hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
rename to hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureStoreTracker.java
index 03e2ce3..3436e8b 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureStoreTracker.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.procedure2.store;
+package org.apache.hadoop.hbase.procedure2.store.wal;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -36,9 +36,12 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
  *
  * It can be used by the ProcedureStore to identify which procedures are already
  * deleted/completed to avoid the deserialization step on restart
+ * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
+ *             use the new region based procedure store.
  */
+@Deprecated
 @InterfaceAudience.Private
-public class ProcedureStoreTracker {
+class ProcedureStoreTracker {
   private static final Logger LOG = LoggerFactory.getLogger(ProcedureStoreTracker.class);
 
   // Key is procedure id corresponding to first bit of the bitmap.
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
index aeae900..947d5bd 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,9 +32,12 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
 
 /**
  * Describes a WAL File
+ * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
+ *             use the new region based procedure store.
  */
+@Deprecated
 @InterfaceAudience.Private
-public class ProcedureWALFile implements Comparable<ProcedureWALFile> {
+class ProcedureWALFile implements Comparable<ProcedureWALFile> {
   private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFile.class);
 
   private ProcedureWALHeader header;
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
index 9686593..bc60584 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.io.util.StreamUtils;
 import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
-import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
 import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -40,9 +39,12 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
 
 /**
  * Helper class that contains the WAL serialization utils.
+ * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
+ *             use the new region based procedure store.
  */
+@Deprecated
 @InterfaceAudience.Private
-public final class ProcedureWALFormat {
+final class ProcedureWALFormat {
 
   static final byte LOG_TYPE_STREAM = 0;
   static final byte LOG_TYPE_COMPACTED = 1;
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
index 1b19abb..31150ca 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hbase.procedure2.store.wal;
 
 import java.io.IOException;
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureTree;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,9 +31,12 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
 
 /**
  * Helper class that loads the procedures stored in a WAL.
+ * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
+ *             use the new region based procedure store.
  */
+@Deprecated
 @InterfaceAudience.Private
-public class ProcedureWALFormatReader {
+class ProcedureWALFormatReader {
   private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFormatReader.class);
 
   /**
@@ -44,8 +47,8 @@ public class ProcedureWALFormatReader {
    * See the comments of {@link WALProcedureMap} for more details.
    * <p/>
    * After reading all the proc wal files, we will use the procedures in the procedureMap to build a
-   * {@link WALProcedureTree}, and then give the result to the upper layer. See the comments of
-   * {@link WALProcedureTree} and the code in {@link #finish()} for more details.
+   * {@link ProcedureTree}, and then give the result to the upper layer. See the comments of
+   * {@link ProcedureTree} and the code in {@link #finish()} for more details.
    */
   private final WALProcedureMap localProcedureMap = new WALProcedureMap();
   private final WALProcedureMap procedureMap = new WALProcedureMap();
@@ -144,7 +147,7 @@ public class ProcedureWALFormatReader {
 
     // build the procedure execution tree. When building we will verify that whether a procedure is
     // valid.
-    WALProcedureTree tree = WALProcedureTree.build(procedureMap.getProcedures());
+    ProcedureTree tree = ProcedureTree.build(procedureMap.getProcedures());
     loader.load(tree.getValidProcs());
     loader.handleCorrupted(tree.getCorruptedProcs());
   }
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPrettyPrinter.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPrettyPrinter.java
index a11a46b..89e32c3 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPrettyPrinter.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPrettyPrinter.java
@@ -48,7 +48,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
 /**
  * ProcedureWALPrettyPrinter prints the contents of a given ProcedureWAL file
  * @see WALProcedureStore#main(String[]) if you want to check parse of a directory of WALs.
+ * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
+ *             use the new region based procedure store.
  */
+@Deprecated
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
 @InterfaceStability.Evolving
 public class ProcedureWALPrettyPrinter extends Configured implements Tool {
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java
index 9cda1bc..5e1983f 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java
@@ -46,7 +46,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
  * {@link #merge(WALProcedureMap)} method of the global one and pass the local one in. In this
  * method, for the same procedure, the one comes earlier will win, as we read the proc wal files
  * from new to old(the reverse order).
+ * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
+ *             use the new region based procedure store.
  */
+@Deprecated
 @InterfaceAudience.Private
 class WALProcedureMap {
 
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index ae1088a..c685d8b 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -45,9 +45,9 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
 import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
-import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
 import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
 import org.apache.hadoop.hbase.procedure2.util.StringUtils;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
@@ -106,7 +106,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
  * deleted.
  * @see ProcedureWALPrettyPrinter for printing content of a single WAL.
  * @see #main(String[]) to parse a directory of MasterWALProcs.
+ * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
+ *             use the new region based procedure store.
  */
+@Deprecated
 @InterfaceAudience.Private
 public class WALProcedureStore extends ProcedureStoreBase {
   private static final Logger LOG = LoggerFactory.getLogger(WALProcedureStore.class);
@@ -115,10 +118,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
   public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs";
 
 
-  public interface LeaseRecovery {
-    void recoverFileLease(FileSystem fs, Path path) throws IOException;
-  }
-
   public static final String WAL_COUNT_WARN_THRESHOLD_CONF_KEY =
     "hbase.procedure.store.wal.warn.threshold";
   private static final int DEFAULT_WAL_COUNT_WARN_THRESHOLD = 10;
@@ -233,12 +232,10 @@ public class WALProcedureStore extends ProcedureStoreBase {
     }
   }
 
-  public WALProcedureStore(final Configuration conf, final LeaseRecovery leaseRecovery)
-      throws IOException {
-    this(conf,
-        new Path(CommonFSUtils.getWALRootDir(conf), MASTER_PROCEDURE_LOGDIR),
-        new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_OLDLOGDIR_NAME),
-        leaseRecovery);
+  public WALProcedureStore(Configuration conf, LeaseRecovery leaseRecovery) throws IOException {
+    this(conf, new Path(CommonFSUtils.getWALRootDir(conf), MASTER_PROCEDURE_LOGDIR),
+      new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_OLDLOGDIR_NAME),
+      leaseRecovery);
   }
 
   @VisibleForTesting
@@ -1411,7 +1408,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
       System.exit(-1);
     }
     WALProcedureStore store = new WALProcedureStore(conf, new Path(args[0]), null,
-      new WALProcedureStore.LeaseRecovery() {
+      new LeaseRecovery() {
         @Override
         public void recoverFileLease(FileSystem fs, Path path) throws IOException {
           // no-op
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
index b885ba5..6c66a49 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
+import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
 import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
@@ -52,13 +53,13 @@ public final class ProcedureTestingUtility {
   }
 
   public static ProcedureStore createStore(final Configuration conf, final Path dir)
-      throws IOException {
+    throws IOException {
     return createWalStore(conf, dir);
   }
 
   public static WALProcedureStore createWalStore(final Configuration conf, final Path dir)
-      throws IOException {
-    return new WALProcedureStore(conf, dir, null, new WALProcedureStore.LeaseRecovery() {
+    throws IOException {
+    return new WALProcedureStore(conf, dir, null, new LeaseRecovery() {
       @Override
       public void recoverFileLease(FileSystem fs, Path path) throws IOException {
         // no-op
@@ -66,13 +67,13 @@ public final class ProcedureTestingUtility {
     });
   }
 
-  public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor,
-      boolean abort, boolean startWorkers) throws Exception {
-    restart(procExecutor, false, true, null, null, null,  abort, startWorkers);
+  public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor, boolean abort,
+    boolean startWorkers) throws Exception {
+    restart(procExecutor, false, true, null, null, null, abort, startWorkers);
   }
 
-  public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor,
-      boolean abort) throws Exception {
+  public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor, boolean abort)
+    throws Exception {
     restart(procExecutor, false, true, null, null, null, abort, true);
   }
 
@@ -81,12 +82,12 @@ public final class ProcedureTestingUtility {
   }
 
   public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads,
-      boolean abortOnCorruption) throws IOException {
+    boolean abortOnCorruption) throws IOException {
     initAndStartWorkers(procExecutor, numThreads, abortOnCorruption, true);
   }
 
   public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads,
-      boolean abortOnCorruption, boolean startWorkers) throws IOException {
+    boolean abortOnCorruption, boolean startWorkers) throws IOException {
     procExecutor.init(numThreads, abortOnCorruption);
     if (startWorkers) {
       procExecutor.startWorkers();
@@ -94,16 +95,16 @@ public final class ProcedureTestingUtility {
   }
 
   public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor,
-      boolean avoidTestKillDuringRestart, boolean failOnCorrupted, Callable<Void> stopAction,
-      Callable<Void> actionBeforeStartWorker, Callable<Void> startAction) throws Exception {
+    boolean avoidTestKillDuringRestart, boolean failOnCorrupted, Callable<Void> stopAction,
+    Callable<Void> actionBeforeStartWorker, Callable<Void> startAction) throws Exception {
     restart(procExecutor, avoidTestKillDuringRestart, failOnCorrupted, stopAction,
       actionBeforeStartWorker, startAction, false, true);
   }
 
   public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor,
-      boolean avoidTestKillDuringRestart, boolean failOnCorrupted, Callable<Void> stopAction,
-      Callable<Void> actionBeforeStartWorker, Callable<Void> startAction, boolean abort,
-      boolean startWorkers) throws Exception {
+    boolean avoidTestKillDuringRestart, boolean failOnCorrupted, Callable<Void> stopAction,
+    Callable<Void> actionBeforeStartWorker, Callable<Void> startAction, boolean abort,
+    boolean startWorkers) throws Exception {
     final ProcedureStore procStore = procExecutor.getStore();
     final int storeThreads = procExecutor.getCorePoolSize();
     final int execThreads = procExecutor.getCorePoolSize();
@@ -144,15 +145,20 @@ public final class ProcedureTestingUtility {
   }
 
   public static void storeRestart(ProcedureStore procStore, ProcedureStore.ProcedureLoader loader)
-      throws Exception {
-    procStore.stop(false);
+    throws Exception {
+    storeRestart(procStore, false, loader);
+  }
+
+  public static void storeRestart(ProcedureStore procStore, boolean abort,
+    ProcedureStore.ProcedureLoader loader) throws Exception {
+    procStore.stop(abort);
     procStore.start(procStore.getNumThreads());
     procStore.recoverLease();
     procStore.load(loader);
   }
 
   public static LoadCounter storeRestartAndAssert(ProcedureStore procStore, long maxProcId,
-      long runnableCount, int completedCount, int corruptedCount) throws Exception {
+    long runnableCount, int completedCount, int corruptedCount) throws Exception {
     final LoadCounter loader = new LoadCounter();
     storeRestart(procStore, loader);
     assertEquals(maxProcId, loader.getMaxProcId());
@@ -169,19 +175,19 @@ public final class ProcedureTestingUtility {
   }
 
   public static <TEnv> void setKillIfHasParent(ProcedureExecutor<TEnv> procExecutor,
-      boolean value) {
+    boolean value) {
     createExecutorTesting(procExecutor);
     procExecutor.testing.killIfHasParent = value;
   }
 
   public static <TEnv> void setKillIfSuspended(ProcedureExecutor<TEnv> procExecutor,
-      boolean value) {
+    boolean value) {
     createExecutorTesting(procExecutor);
     procExecutor.testing.killIfSuspended = value;
   }
 
   public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
-      boolean value) {
+    boolean value) {
     createExecutorTesting(procExecutor);
     procExecutor.testing.killBeforeStoreUpdate = value;
     LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate);
@@ -189,7 +195,7 @@ public final class ProcedureTestingUtility {
   }
 
   public static <TEnv> void setToggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
-      boolean value) {
+    boolean value) {
     createExecutorTesting(procExecutor);
     procExecutor.testing.toggleKillBeforeStoreUpdate = value;
     assertSingleExecutorForKillTests(procExecutor);
@@ -210,27 +216,27 @@ public final class ProcedureTestingUtility {
   }
 
   public static <TEnv> void setKillAndToggleBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
-      boolean value) {
+    boolean value) {
     ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, value);
     ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, value);
     assertSingleExecutorForKillTests(procExecutor);
   }
 
-  private static <TEnv> void assertSingleExecutorForKillTests(
-      final ProcedureExecutor<TEnv> procExecutor) {
+  private static <TEnv> void
+    assertSingleExecutorForKillTests(final ProcedureExecutor<TEnv> procExecutor) {
     if (procExecutor.testing == null) {
       return;
     }
 
     if (procExecutor.testing.killBeforeStoreUpdate ||
-        procExecutor.testing.toggleKillBeforeStoreUpdate) {
-      assertEquals("expected only one executor running during test with kill/restart",
-        1, procExecutor.getCorePoolSize());
+      procExecutor.testing.toggleKillBeforeStoreUpdate) {
+      assertEquals("expected only one executor running during test with kill/restart", 1,
+        procExecutor.getCorePoolSize());
     }
   }
 
   public static <TEnv> long submitAndWait(Configuration conf, TEnv env, Procedure<TEnv> proc)
-      throws IOException {
+    throws IOException {
     NoopProcedureStore procStore = new NoopProcedureStore();
     ProcedureExecutor<TEnv> procExecutor = new ProcedureExecutor<>(conf, env, procStore);
     procStore.start(1);
@@ -248,14 +254,14 @@ public final class ProcedureTestingUtility {
   }
 
   public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc,
-      final long nonceGroup, final long nonce) {
+    final long nonceGroup, final long nonce) {
     long procId = submitProcedure(procExecutor, proc, nonceGroup, nonce);
     waitProcedure(procExecutor, procId);
     return procId;
   }
 
   public static <TEnv> long submitProcedure(ProcedureExecutor<TEnv> procExecutor, Procedure proc,
-      final long nonceGroup, final long nonce) {
+    final long nonceGroup, final long nonce) {
     final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce);
     long procId = procExecutor.registerNonce(nonceKey);
     assertFalse(procId >= 0);
@@ -301,13 +307,12 @@ public final class ProcedureTestingUtility {
   }
 
   public static <TEnv> void assertProcNotYetCompleted(ProcedureExecutor<TEnv> procExecutor,
-      long procId) {
+    long procId) {
     assertFalse("expected a running proc", procExecutor.isFinished(procId));
     assertEquals(null, procExecutor.getResult(procId));
   }
 
-  public static <TEnv> void assertProcNotFailed(ProcedureExecutor<TEnv> procExecutor,
-      long procId) {
+  public static <TEnv> void assertProcNotFailed(ProcedureExecutor<TEnv> procExecutor, long procId) {
     Procedure<?> result = procExecutor.getResult(procId);
     assertTrue("expected procedure result", result != null);
     assertProcNotFailed(result);
@@ -318,7 +323,7 @@ public final class ProcedureTestingUtility {
   }
 
   public static <TEnv> Throwable assertProcFailed(final ProcedureExecutor<TEnv> procExecutor,
-      final long procId) {
+    final long procId) {
     Procedure<?> result = procExecutor.getResult(procId);
     assertTrue("expected procedure result", result != null);
     return assertProcFailed(result);
@@ -332,7 +337,8 @@ public final class ProcedureTestingUtility {
 
   public static void assertIsAbortException(final Procedure<?> result) {
     Throwable cause = assertProcFailed(result);
-    assertTrue("expected abort exception, got "+ cause, cause instanceof ProcedureAbortedException);
+    assertTrue("expected abort exception, got " + cause,
+      cause instanceof ProcedureAbortedException);
   }
 
   public static void assertIsTimeoutException(final Procedure<?> result) {
@@ -353,31 +359,30 @@ public final class ProcedureTestingUtility {
   }
 
   /**
-   * Run through all procedure flow states TWICE while also restarting
-   * procedure executor at each step; i.e force a reread of procedure store.
-   *
-   *<p>It does
-   * <ol><li>Execute step N - kill the executor before store update
+   * Run through all procedure flow states TWICE while also restarting procedure executor at each
+   * step; i.e force a reread of procedure store.
+   * <p>
+   * It does
+   * <ol>
+   * <li>Execute step N - kill the executor before store update
    * <li>Restart executor/store
    * <li>Execute step N - and then save to store
    * </ol>
-   *
-   *<p>This is a good test for finding state that needs persisting and steps that are not
-   * idempotent.
+   * <p>
+   * This is a good test for finding state that needs persisting and steps that are not idempotent.
    */
   public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec,
-      final long procId) throws Exception {
+    final long procId) throws Exception {
     testRecoveryAndDoubleExecution(procExec, procId, false);
   }
 
   public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec,
-      final long procId, final boolean expectFailure) throws Exception {
+    final long procId, final boolean expectFailure) throws Exception {
     testRecoveryAndDoubleExecution(procExec, procId, expectFailure, null);
   }
 
   public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec,
-      final long procId, final boolean expectFailure, final Runnable customRestart)
-      throws Exception {
+    final long procId, final boolean expectFailure, final Runnable customRestart) throws Exception {
     Procedure proc = procExec.getProcedure(procId);
     waitProcedure(procExec, procId);
     assertEquals(false, procExec.isRunning());
@@ -401,11 +406,12 @@ public final class ProcedureTestingUtility {
   }
 
   public static class NoopProcedure<TEnv> extends Procedure<TEnv> {
-    public NoopProcedure() {}
+    public NoopProcedure() {
+    }
 
     @Override
     protected Procedure<TEnv>[] execute(TEnv env)
-        throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
+      throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
       return null;
     }
 
@@ -419,18 +425,16 @@ public final class ProcedureTestingUtility {
     }
 
     @Override
-    protected void serializeStateData(ProcedureStateSerializer serializer)
-        throws IOException {
+    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
     }
 
     @Override
-    protected void deserializeStateData(ProcedureStateSerializer serializer)
-        throws IOException {
+    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
     }
   }
 
   public static class NoopStateMachineProcedure<TEnv, TState>
-      extends StateMachineProcedure<TEnv, TState> {
+    extends StateMachineProcedure<TEnv, TState> {
     private TState initialState;
     private TEnv env;
 
@@ -444,7 +448,7 @@ public final class ProcedureTestingUtility {
 
     @Override
     protected Flow executeFromState(TEnv env, TState tState)
-        throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
+      throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
       return null;
     }
 
@@ -472,7 +476,8 @@ public final class ProcedureTestingUtility {
   public static class TestProcedure extends NoopProcedure<Void> {
     private byte[] data = null;
 
-    public TestProcedure() {}
+    public TestProcedure() {
+    }
 
     public TestProcedure(long procId) {
       this(procId, 0);
@@ -510,16 +515,14 @@ public final class ProcedureTestingUtility {
     }
 
     @Override
-    protected void serializeStateData(ProcedureStateSerializer serializer)
-        throws IOException {
+    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
       ByteString dataString = ByteString.copyFrom((data == null) ? new byte[0] : data);
       BytesValue.Builder builder = BytesValue.newBuilder().setValue(dataString);
       serializer.serialize(builder.build());
     }
 
     @Override
-    protected void deserializeStateData(ProcedureStateSerializer serializer)
-        throws IOException {
+    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
       BytesValue bytesValue = serializer.deserialize(BytesValue.class);
       ByteString dataString = bytesValue.getValue();
 
@@ -603,7 +606,7 @@ public final class ProcedureTestingUtility {
     }
 
     public boolean isRunnable(final long procId) {
-      for (Procedure proc: runnable) {
+      for (Procedure proc : runnable) {
         if (proc.getProcId() == procId) {
           return true;
         }
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureTree.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureTree.java
similarity index 93%
rename from hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureTree.java
rename to hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureTree.java
index 890d0e3..29d114a 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureTree.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureTree.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.procedure2.store.wal;
+package org.apache.hadoop.hbase.procedure2.store;
 
 import static org.junit.Assert.assertEquals;
 
@@ -41,11 +41,11 @@ import org.junit.experimental.categories.Category;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
 
 @Category({ MasterTests.class, SmallTests.class })
-public class TestWALProcedureTree {
+public class TestProcedureTree {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestWALProcedureTree.class);
+    HBaseClassTestRule.forClass(TestProcedureTree.class);
 
   public static final class TestProcedure extends Procedure<Void> {
 
@@ -123,7 +123,7 @@ public class TestWALProcedureTree {
     proc1.addStackIndex(1);
     TestProcedure proc2 = createProc(3, 2);
     proc2.addStackIndex(3);
-    WALProcedureTree tree = WALProcedureTree.build(toProtos(proc0, proc1, proc2));
+    ProcedureTree tree = ProcedureTree.build(toProtos(proc0, proc1, proc2));
     List<TestProcedure> validProcs = getProcs(tree.getValidProcs());
     assertEquals(0, validProcs.size());
     List<TestProcedure> corruptedProcs = getProcs(tree.getCorruptedProcs());
@@ -141,7 +141,7 @@ public class TestWALProcedureTree {
     proc1.addStackIndex(1);
     TestProcedure proc2 = createProc(3, 2);
     proc2.addStackIndex(1);
-    WALProcedureTree tree = WALProcedureTree.build(toProtos(proc0, proc1, proc2));
+    ProcedureTree tree = ProcedureTree.build(toProtos(proc0, proc1, proc2));
     List<TestProcedure> validProcs = getProcs(tree.getValidProcs());
     assertEquals(0, validProcs.size());
     List<TestProcedure> corruptedProcs = getProcs(tree.getCorruptedProcs());
@@ -161,7 +161,7 @@ public class TestWALProcedureTree {
     proc2.addStackIndex(0);
     TestProcedure proc3 = createProc(5, 4);
     proc3.addStackIndex(1);
-    WALProcedureTree tree = WALProcedureTree.build(toProtos(proc0, proc1, proc2, proc3));
+    ProcedureTree tree = ProcedureTree.build(toProtos(proc0, proc1, proc2, proc3));
     List<TestProcedure> validProcs = getProcs(tree.getValidProcs());
     assertEquals(3, validProcs.size());
     List<TestProcedure> corruptedProcs = getProcs(tree.getCorruptedProcs());
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java
index 2f37f0c..7ad26d7 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
 import org.apache.hadoop.hbase.procedure2.util.StringUtils;
 import org.apache.hadoop.hbase.util.AbstractHBaseTool;
 
@@ -246,7 +247,7 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
 
   private static class NoSyncWalProcedureStore extends WALProcedureStore {
     public NoSyncWalProcedureStore(final Configuration conf, final Path logDir) throws IOException {
-      super(conf, logDir, null, new WALProcedureStore.LeaseRecovery() {
+      super(conf, logDir, null, new LeaseRecovery() {
         @Override
         public void recoverFileLease(FileSystem fs, Path path) throws IOException {
           // no-op
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestBitSetNode.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestBitSetNode.java
similarity index 96%
rename from hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestBitSetNode.java
rename to hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestBitSetNode.java
index cfc3809..9d897cf 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestBitSetNode.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestBitSetNode.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.procedure2.store;
+package org.apache.hadoop.hbase.procedure2.store.wal;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -23,7 +23,7 @@ import static org.junit.Assert.assertTrue;
 
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker.DeleteState;
+import org.apache.hadoop.hbase.procedure2.store.wal.ProcedureStoreTracker.DeleteState;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.junit.ClassRule;
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestProcedureStoreTracker.java
similarity index 99%
rename from hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
rename to hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestProcedureStoreTracker.java
index 24c4ad0..25678e3 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestProcedureStoreTracker.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.procedure2.store;
+package org.apache.hadoop.hbase.procedure2.store.wal;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
index 5c7f532..9049ffe 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
@@ -42,8 +42,8 @@ import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
 import org.apache.hadoop.hbase.procedure2.SequentialProcedure;
+import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
-import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.io.IOUtils;
@@ -627,7 +627,7 @@ public class TestWALProcedureStore {
 
     // simulate another active master removing the wals
     procStore = new WALProcedureStore(htu.getConfiguration(), logDir, null,
-      new WALProcedureStore.LeaseRecovery() {
+      new LeaseRecovery() {
         private int count = 0;
 
         @Override
@@ -795,7 +795,7 @@ public class TestWALProcedureStore {
   }
 
   private WALProcedureStore createWALProcedureStore(Configuration conf) throws IOException {
-    return new WALProcedureStore(conf, new WALProcedureStore.LeaseRecovery() {
+    return new WALProcedureStore(conf, new LeaseRecovery() {
       @Override
       public void recoverFileLease(FileSystem fs, Path path) throws IOException {
         // no-op
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 40b0566..880cf80 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -161,8 +161,9 @@ import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
 import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
 import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener;
-import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
+import org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore;
 import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
 import org.apache.hadoop.hbase.quotas.MasterQuotasObserver;
 import org.apache.hadoop.hbase.quotas.QuotaObserverChore;
@@ -331,6 +332,10 @@ public class HMaster extends HRegionServer implements MasterServices {
     "hbase.master.wait.on.service.seconds";
   public static final int DEFAULT_HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS = 5 * 60;
 
+  public static final String HBASE_MASTER_CLEANER_INTERVAL = "hbase.master.cleaner.interval";
+
+  public static final int DEFAULT_HBASE_MASTER_CLEANER_INTERVAL = 600 * 1000;
+
   // Metrics for the HMaster
   final MetricsMaster metricsMaster;
   // file system manager for the master FS operations
@@ -425,7 +430,7 @@ public class HMaster extends HRegionServer implements MasterServices {
   private SnapshotQuotaObserverChore snapshotQuotaChore;
 
   private ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
-  private WALProcedureStore procedureStore;
+  private ProcedureStore procedureStore;
 
   // handle table states
   private TableStateManager tableStateManager;
@@ -909,10 +914,8 @@ public class HMaster extends HRegionServer implements MasterServices {
     this.masterActiveTime = System.currentTimeMillis();
     // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
 
-    // Only initialize the MemStoreLAB when master carry table
-    if (LoadBalancer.isTablesOnMaster(conf)) {
-      initializeMemStoreChunkCreator();
-    }
+    // always initialize the MemStoreLAB as we use a region to store procedure now.
+    initializeMemStoreChunkCreator();
     this.fileSystemManager = new MasterFileSystem(conf);
     this.walManager = new MasterWalManager(this);
 
@@ -1422,18 +1425,19 @@ public class HMaster extends HRegionServer implements MasterServices {
     this.executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS, conf.getInt(
       SnapshotManager.SNAPSHOT_POOL_THREADS_KEY, SnapshotManager.SNAPSHOT_POOL_THREADS_DEFAULT));
 
-   // We depend on there being only one instance of this executor running
-   // at a time.  To do concurrency, would need fencing of enable/disable of
-   // tables.
-   // Any time changing this maxThreads to > 1, pls see the comment at
-   // AccessController#postCompletedCreateTableAction
-   this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
-   startProcedureExecutor();
+    // We depend on there being only one instance of this executor running
+    // at a time. To do concurrency, would need fencing of enable/disable of
+    // tables.
+    // Any time changing this maxThreads to > 1, pls see the comment at
+    // AccessController#postCompletedCreateTableAction
+    this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
+    startProcedureExecutor();
 
     // Create cleaner thread pool
     cleanerPool = new DirScanPool(conf);
     // Start log cleaner thread
-    int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 600 * 1000);
+    int cleanerInterval =
+      conf.getInt(HBASE_MASTER_CLEANER_INTERVAL, DEFAULT_HBASE_MASTER_CLEANER_INTERVAL);
     this.logCleaner = new LogCleaner(cleanerInterval, this, conf,
       getMasterWalManager().getFileSystem(), getMasterWalManager().getOldLogDir(), cleanerPool);
     getChoreService().scheduleChore(logCleaner);
@@ -1537,7 +1541,7 @@ public class HMaster extends HRegionServer implements MasterServices {
   private void createProcedureExecutor() throws IOException {
     MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
     procedureStore =
-      new WALProcedureStore(conf, new MasterProcedureEnv.WALStoreLeaseRecovery(this));
+      new RegionProcedureStore(this, new MasterProcedureEnv.FsUtilsLeaseRecovery(this));
     procedureStore.registerListener(new ProcedureStoreListener() {
 
       @Override
@@ -2707,10 +2711,10 @@ public class HMaster extends HRegionServer implements MasterServices {
   }
 
   public int getNumWALFiles() {
-    return procedureStore != null ? procedureStore.getActiveLogs().size() : 0;
+    return 0;
   }
 
-  public WALProcedureStore getWalProcedureStore() {
+  public ProcedureStore getProcedureStore() {
     return procedureStore;
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
index 4fcf7e0..a3772fd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
 import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
 import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
-import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
+import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
 import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
@@ -47,10 +47,10 @@ public class MasterProcedureEnv implements ConfigurationObserver {
   private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureEnv.class);
 
   @InterfaceAudience.Private
-  public static class WALStoreLeaseRecovery implements WALProcedureStore.LeaseRecovery {
+  public static class FsUtilsLeaseRecovery implements LeaseRecovery {
     private final MasterServices master;
 
-    public WALStoreLeaseRecovery(final MasterServices master) {
+    public FsUtilsLeaseRecovery(final MasterServices master) {
       this.master = master;
     }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
index 1e488b6..49bf5c8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
@@ -146,8 +146,14 @@ public final class MasterProcedureUtil {
   /**
    * Pattern used to validate a Procedure WAL file name see
    * {@link #validateProcedureWALFilename(String)} for description.
+   * @deprecated Since 2.3.0, will be removed in 4.0.0. We do not use this style of procedure wal
+   *             file name any more.
    */
-  private static final Pattern pattern = Pattern.compile(".*pv2-\\d{20}.log");
+  @Deprecated
+  private static final Pattern PATTERN = Pattern.compile(".*pv2-\\d{20}.log");
+
+  // Use the character $ to let the log cleaner know that this is not the normal wal file.
+  public static final String ARCHIVED_PROC_WAL_SUFFIX = "$masterproc$";
 
   /**
    * A Procedure WAL file name is of the format: pv-&lt;wal-id&gt;.log where wal-id is 20 digits.
@@ -155,7 +161,7 @@ public final class MasterProcedureUtil {
    * @return <tt>true</tt> if the filename matches a Procedure WAL, <tt>false</tt> otherwise
    */
   public static boolean validateProcedureWALFilename(String filename) {
-    return pattern.matcher(filename).matches();
+    return PATTERN.matcher(filename).matches() || filename.endsWith(ARCHIVED_PROC_WAL_SUFFIX);
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionFlusherAndCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionFlusherAndCompactor.java
new file mode 100644
index 0000000..fb24802
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionFlusherAndCompactor.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2.store.region;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * As long as there is no RegionServerServices for the procedure store region, we need implement the
+ * flush and compaction logic by our own.
+ * <p/>
+ * The flush logic is very simple, every time after calling a modification method in
+ * {@link RegionProcedureStore}, we will call the {@link #onUpdate()} method below, and in this
+ * method, we will check the memstore size and if it is above the flush size, we will call
+ * {@link HRegion#flush(boolean)} to force flush all stores.
+ * <p/>
+ * And for compaction, the logic is also very simple. After flush, we will check the store file
+ * count, if it is above the compactMin, we will do a major compaction.
+ */
+@InterfaceAudience.Private
+class RegionFlusherAndCompactor implements Closeable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RegionFlusherAndCompactor.class);
+
+  static final String FLUSH_SIZE_KEY = "hbase.procedure.store.region.flush.size";
+
+  private static final long DEFAULT_FLUSH_SIZE = 16L * 1024 * 1024;
+
+  static final String FLUSH_PER_CHANGES_KEY = "hbase.procedure.store.region.flush.per.changes";
+
+  private static final long DEFAULT_FLUSH_PER_CHANGES = 1_000_000;
+
+  static final String FLUSH_INTERVAL_MS_KEY = "hbase.procedure.store.region.flush.interval.ms";
+
+  // default to flush every 15 minutes, for safety
+  private static final long DEFAULT_FLUSH_INTERVAL_MS = TimeUnit.MINUTES.toMillis(15);
+
+  static final String COMPACT_MIN_KEY = "hbase.procedure.store.region.compact.min";
+
+  private static final int DEFAULT_COMPACT_MIN = 4;
+
+  private final Abortable abortable;
+
+  private final HRegion region;
+
+  // as we can only count this outside the region's write/flush process so it is not accurate, but
+  // it is enough.
+  private final AtomicLong changesAfterLastFlush = new AtomicLong(0);
+
+  private final long flushSize;
+
+  private final long flushPerChanges;
+
+  private final long flushIntervalMs;
+
+  private final int compactMin;
+
+  private final Thread flushThread;
+
+  private final Lock flushLock = new ReentrantLock();
+
+  private final Condition flushCond = flushLock.newCondition();
+
+  private boolean flushRequest = false;
+
+  private long lastFlushTime;
+
+  private final ExecutorService compactExecutor;
+
+  private final Lock compactLock = new ReentrantLock();
+
+  private boolean compactRequest = false;
+
+  private volatile boolean closed = false;
+
+  RegionFlusherAndCompactor(Configuration conf, Abortable abortable, HRegion region) {
+    this.abortable = abortable;
+    this.region = region;
+    flushSize = conf.getLong(FLUSH_SIZE_KEY, DEFAULT_FLUSH_SIZE);
+    flushPerChanges = conf.getLong(FLUSH_PER_CHANGES_KEY, DEFAULT_FLUSH_PER_CHANGES);
+    flushIntervalMs = conf.getLong(FLUSH_INTERVAL_MS_KEY, DEFAULT_FLUSH_INTERVAL_MS);
+    compactMin = conf.getInt(COMPACT_MIN_KEY, DEFAULT_COMPACT_MIN);
+    flushThread = new Thread(this::flushLoop, "Procedure-Region-Store-Flusher");
+    flushThread.setDaemon(true);
+    flushThread.start();
+    compactExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+      .setNameFormat("Procedure-Region-Store-Compactor").setDaemon(true).build());
+  }
+
+  // inject our flush related configurations
+  static void setupConf(Configuration conf) {
+    long flushSize = conf.getLong(FLUSH_SIZE_KEY, DEFAULT_FLUSH_SIZE);
+    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSize);
+    long flushPerChanges = conf.getLong(FLUSH_PER_CHANGES_KEY, DEFAULT_FLUSH_PER_CHANGES);
+    conf.setLong(HRegion.MEMSTORE_FLUSH_PER_CHANGES, flushPerChanges);
+    long flushIntervalMs = conf.getLong(FLUSH_INTERVAL_MS_KEY, DEFAULT_FLUSH_INTERVAL_MS);
+    conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, flushIntervalMs);
+  }
+
+  private void compact() {
+    try {
+      region.compact(true);
+    } catch (IOException e) {
+      LOG.error("Failed to compact procedure store region", e);
+    }
+    compactLock.lock();
+    try {
+      if (needCompaction()) {
+        compactExecutor.execute(this::compact);
+      } else {
+        compactRequest = false;
+      }
+    } finally {
+      compactLock.unlock();
+    }
+  }
+
+  private boolean needCompaction() {
+    return Iterables.getOnlyElement(region.getStores()).getStorefilesCount() >= compactMin;
+  }
+
+  private void flushLoop() {
+    lastFlushTime = EnvironmentEdgeManager.currentTime();
+    while (!closed) {
+      flushLock.lock();
+      try {
+        while (!flushRequest) {
+          long waitTimeMs = lastFlushTime + flushIntervalMs - EnvironmentEdgeManager.currentTime();
+          if (waitTimeMs <= 0) {
+            flushRequest = true;
+            break;
+          }
+          flushCond.await(waitTimeMs, TimeUnit.MILLISECONDS);
+          if (closed) {
+            return;
+          }
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        continue;
+      } finally {
+        flushLock.unlock();
+      }
+      assert flushRequest;
+      changesAfterLastFlush.set(0);
+      try {
+        region.flush(true);
+      } catch (IOException e) {
+        LOG.error(HBaseMarkers.FATAL, "Failed to flush procedure store region, aborting...", e);
+        abortable.abort("Failed to flush procedure store region", e);
+        return;
+      }
+      compactLock.lock();
+      try {
+        if (!compactRequest && needCompaction()) {
+          compactRequest = true;
+          compactExecutor.execute(this::compact);
+        }
+      } finally {
+        compactLock.unlock();
+      }
+      flushLock.lock();
+      try {
+        // reset the flushRequest flag
+        if (!shouldFlush(changesAfterLastFlush.get())) {
+          flushRequest = false;
+        }
+      } finally {
+        flushLock.unlock();
+      }
+    }
+  }
+
+  private boolean shouldFlush(long changes) {
+    return region.getMemStoreHeapSize() + region.getMemStoreOffHeapSize() >= flushSize ||
+      changes > flushPerChanges;
+  }
+
+  void onUpdate() {
+    long changes = changesAfterLastFlush.incrementAndGet();
+    if (shouldFlush(changes)) {
+      requestFlush();
+    }
+  }
+
+  void requestFlush() {
+    flushLock.lock();
+    try {
+      if (flushRequest) {
+        return;
+      }
+      flushRequest = true;
+      flushCond.signalAll();
+    } finally {
+      flushLock.unlock();
+    }
+  }
+
+  @Override
+  public void close() {
+    closed = true;
+    flushThread.interrupt();
+    compactExecutor.shutdown();
+  }
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java
new file mode 100644
index 0000000..d96b356
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java
@@ -0,0 +1,584 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2.store.region;
+
+import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
+import static org.apache.hadoop.hbase.HConstants.HREGION_LOGDIR_NAME;
+import static org.apache.hadoop.hbase.HConstants.NO_NONCE;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
+import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureTree;
+import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
+
+/**
+ * A procedure store which uses a region to store all the procedures.
+ * <p/>
+ * FileSystem layout:
+ *
+ * <pre>
+ * hbase
+ *   |
+ *   --MasterProcs
+ *       |
+ *       --data
+ *       |  |
+ *       |  --/master/procedure/&lt;encoded-region-name&gt; <---- The region data
+ *       |      |
+ *       |      --replay <---- The edits to replay
+ *       |
+ *       --WALs
+ *          |
+ *          --&lt;master-server-name&gt; <---- The WAL dir for active master
+ *          |
+ *          --&lt;master-server-name&gt;-dead <---- The WAL dir dead master
+ * </pre>
+ *
+ * We use p:d column to store the serialized protobuf format procedure, and when deleting we
+ * will first fill the info:proc column with an empty byte array, and then actually delete them in
+ * the {@link #cleanup()} method. This is because that we need to retain the max procedure id, so we
+ * can not directly delete a procedure row as we do not know if it is the one with the max procedure
+ * id.
+ */
+@InterfaceAudience.Private
+public class RegionProcedureStore extends ProcedureStoreBase {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RegionProcedureStore.class);
+
+  static final String MAX_WALS_KEY = "hbase.procedure.store.region.maxwals";
+
+  private static final int DEFAULT_MAX_WALS = 10;
+
+  static final String USE_HSYNC_KEY = "hbase.procedure.store.region.wal.hsync";
+
+  static final String MASTER_PROCEDURE_DIR = "MasterProcs";
+
+  static final String LOGCLEANER_PLUGINS = "hbase.procedure.store.region.logcleaner.plugins";
+
+  private static final String DATA_DIR = "data";
+
+  private static final String REPLAY_EDITS_DIR = "replay";
+
+  private static final String DEAD_WAL_DIR_SUFFIX = "-dead";
+
+  private static final TableName TABLE_NAME = TableName.valueOf("master:procedure");
+
+  private static final byte[] FAMILY = Bytes.toBytes("p");
+
+  private static final byte[] PROC_QUALIFIER = Bytes.toBytes("d");
+
+  private static final int REGION_ID = 1;
+
+  private static final TableDescriptor TABLE_DESC = TableDescriptorBuilder.newBuilder(TABLE_NAME)
+    .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
+
+  private final Server server;
+
+  private final LeaseRecovery leaseRecovery;
+
+  private WALFactory walFactory;
+
+  @VisibleForTesting
+  HRegion region;
+
+  private RegionFlusherAndCompactor flusherAndCompactor;
+
+  @VisibleForTesting
+  RegionProcedureStoreWALRoller walRoller;
+
+  private int numThreads;
+
+  public RegionProcedureStore(Server server, LeaseRecovery leaseRecovery) {
+    this.server = server;
+    this.leaseRecovery = leaseRecovery;
+  }
+
+  @Override
+  public void start(int numThreads) throws IOException {
+    if (!setRunning(true)) {
+      return;
+    }
+    LOG.info("Starting the Region Procedure Store...");
+    this.numThreads = numThreads;
+  }
+
+  private void shutdownWAL() {
+    if (walFactory != null) {
+      try {
+        walFactory.shutdown();
+      } catch (IOException e) {
+        LOG.warn("Failed to shutdown WAL", e);
+      }
+    }
+  }
+
+  private void closeRegion(boolean abort) {
+    if (region != null) {
+      try {
+        region.close(abort);
+      } catch (IOException e) {
+        LOG.warn("Failed to close region", e);
+      }
+    }
+
+  }
+
+  @Override
+  public void stop(boolean abort) {
+    if (!setRunning(false)) {
+      return;
+    }
+    LOG.info("Stopping the Region Procedure Store, isAbort={}", abort);
+    if (flusherAndCompactor != null) {
+      flusherAndCompactor.close();
+    }
+    // if abort, we shutdown wal first to fail the ongoing updates to the region, and then close the
+    // region, otherwise there will be dead lock.
+    if (abort) {
+      shutdownWAL();
+      closeRegion(true);
+    } else {
+      closeRegion(false);
+      shutdownWAL();
+    }
+
+    if (walRoller != null) {
+      walRoller.close();
+    }
+  }
+
+  @Override
+  public int getNumThreads() {
+    return numThreads;
+  }
+
+  @Override
+  public int setRunningProcedureCount(int count) {
+    // useless for region based storage.
+    return count;
+  }
+
+  private WAL createWAL(FileSystem fs, Path rootDir, RegionInfo regionInfo) throws IOException {
+    String logName = AbstractFSWALProvider.getWALDirectoryName(server.getServerName().toString());
+    Path walDir = new Path(rootDir, logName);
+    LOG.debug("WALDir={}", walDir);
+    if (fs.exists(walDir)) {
+      throw new HBaseIOException(
+        "Master procedure store has already created directory at " + walDir);
+    }
+    if (!fs.mkdirs(walDir)) {
+      throw new IOException("Can not create master procedure wal directory " + walDir);
+    }
+    WAL wal = walFactory.getWAL(regionInfo);
+    walRoller.addWAL(wal);
+    return wal;
+  }
+
+  private HRegion bootstrap(Configuration conf, FileSystem fs, Path rootDir, Path dataDir)
+    throws IOException {
+    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(TABLE_NAME).setRegionId(REGION_ID).build();
+    Path tmpDataDir = new Path(dataDir.getParent(), dataDir.getName() + "-tmp");
+    if (fs.exists(tmpDataDir) && !fs.delete(tmpDataDir, true)) {
+      throw new IOException("Can not delete partial created proc region " + tmpDataDir);
+    }
+    Path tableDir = CommonFSUtils.getTableDir(tmpDataDir, TABLE_NAME);
+    HRegion.createHRegion(conf, regionInfo, fs, tableDir, TABLE_DESC).close();
+    if (!fs.rename(tmpDataDir, dataDir)) {
+      throw new IOException("Can not rename " + tmpDataDir + " to " + dataDir);
+    }
+    WAL wal = createWAL(fs, rootDir, regionInfo);
+    return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, TABLE_DESC, wal, null,
+      null);
+  }
+
+  private HRegion open(Configuration conf, FileSystem fs, Path rootDir, Path dataDir)
+    throws IOException {
+    String factoryId = server.getServerName().toString();
+    Path tableDir = CommonFSUtils.getTableDir(dataDir, TABLE_NAME);
+    Path regionDir =
+      fs.listStatus(tableDir, p -> RegionInfo.isEncodedRegionName(Bytes.toBytes(p.getName())))[0]
+        .getPath();
+    Path replayEditsDir = new Path(regionDir, REPLAY_EDITS_DIR);
+    if (!fs.exists(replayEditsDir) && !fs.mkdirs(replayEditsDir)) {
+      throw new IOException("Failed to create replay directory: " + replayEditsDir);
+    }
+    Path walsDir = new Path(rootDir, HREGION_LOGDIR_NAME);
+    for (FileStatus walDir : fs.listStatus(walsDir)) {
+      if (!walDir.isDirectory()) {
+        continue;
+      }
+      if (walDir.getPath().getName().startsWith(factoryId)) {
+        LOG.warn("This should not happen in real production as we have not created our WAL " +
+          "directory yet, ignore if you are running a procedure related UT");
+      }
+      Path deadWALDir;
+      if (!walDir.getPath().getName().endsWith(DEAD_WAL_DIR_SUFFIX)) {
+        deadWALDir =
+          new Path(walDir.getPath().getParent(), walDir.getPath().getName() + DEAD_WAL_DIR_SUFFIX);
+        if (!fs.rename(walDir.getPath(), deadWALDir)) {
+          throw new IOException("Can not rename " + walDir + " to " + deadWALDir +
+            " when recovering lease of proc store");
+        }
+        LOG.info("Renamed {} to {} as it is dead", walDir.getPath(), deadWALDir);
+      } else {
+        deadWALDir = walDir.getPath();
+        LOG.info("{} is already marked as dead", deadWALDir);
+      }
+      for (FileStatus walFile : fs.listStatus(deadWALDir)) {
+        Path replayEditsFile = new Path(replayEditsDir, walFile.getPath().getName());
+        leaseRecovery.recoverFileLease(fs, walFile.getPath());
+        if (!fs.rename(walFile.getPath(), replayEditsFile)) {
+          throw new IOException("Can not rename " + walFile.getPath() + " to " + replayEditsFile +
+            " when recovering lease of proc store");
+        }
+        LOG.info("Renamed {} to {}", walFile.getPath(), replayEditsFile);
+      }
+      LOG.info("Delete empty proc wal dir {}", deadWALDir);
+      fs.delete(deadWALDir, true);
+    }
+    RegionInfo regionInfo = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
+    WAL wal = createWAL(fs, rootDir, regionInfo);
+    conf.set(HRegion.SPECIAL_RECOVERED_EDITS_DIR,
+      replayEditsDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString());
+    return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, TABLE_DESC, wal, null,
+      null);
+  }
+
+  @SuppressWarnings("deprecation")
+  private void tryMigrate(FileSystem fs) throws IOException {
+    Configuration conf = server.getConfiguration();
+    Path procWALDir =
+      new Path(CommonFSUtils.getWALRootDir(conf), WALProcedureStore.MASTER_PROCEDURE_LOGDIR);
+    if (!fs.exists(procWALDir)) {
+      return;
+    }
+    LOG.info("The old procedure wal directory {} exists, start migrating", procWALDir);
+    WALProcedureStore store = new WALProcedureStore(conf, leaseRecovery);
+    store.start(numThreads);
+    store.recoverLease();
+    MutableLong maxProcIdSet = new MutableLong(-1);
+    MutableLong maxProcIdFromProcs = new MutableLong(-1);
+    store.load(new ProcedureLoader() {
+
+      @Override
+      public void setMaxProcId(long maxProcId) {
+        maxProcIdSet.setValue(maxProcId);
+      }
+
+      @Override
+      public void load(ProcedureIterator procIter) throws IOException {
+        long procCount = 0;
+        while (procIter.hasNext()) {
+          Procedure<?> proc = procIter.next();
+          update(proc);
+          procCount++;
+          if (proc.getProcId() > maxProcIdFromProcs.longValue()) {
+            maxProcIdFromProcs.setValue(proc.getProcId());
+          }
+        }
+        LOG.info("Migrated {} procedures", procCount);
+      }
+
+      @Override
+      public void handleCorrupted(ProcedureIterator procIter) throws IOException {
+        long corruptedCount = 0;
+        while (procIter.hasNext()) {
+          LOG.error("Corrupted procedure {}", procIter.next());
+          corruptedCount++;
+        }
+        if (corruptedCount > 0) {
+          throw new IOException("There are " + corruptedCount + " corrupted procedures when" +
+            " migrating from the old WAL based store to the new region based store, please" +
+            " fix them before upgrading again.");
+        }
+      }
+    });
+    LOG.info("The max pid is {}, and the max pid of all loaded procedures is {}",
+      maxProcIdSet.longValue(), maxProcIdFromProcs.longValue());
+    // Theoretically, the maxProcIdSet should be greater than or equal to maxProcIdFromProcs, but
+    // anyway, let's do a check here.
+    if (maxProcIdSet.longValue() > maxProcIdFromProcs.longValue()) {
+      if (maxProcIdSet.longValue() > 0) {
+        // let's add a fake row to retain the max proc id
+        region.put(new Put(Bytes.toBytes(maxProcIdSet.longValue())).addColumn(FAMILY,
+          PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
+      }
+    } else if (maxProcIdSet.longValue() < maxProcIdFromProcs.longValue()) {
+      LOG.warn("The max pid is less than the max pid of all loaded procedures");
+    }
+    if (!fs.delete(procWALDir, true)) {
+      throw new IOException("Failed to delete the migrated proc wal directory " + procWALDir);
+    }
+    LOG.info("Migration finished");
+  }
+
+  @Override
+  public void recoverLease() throws IOException {
+    LOG.debug("Starting Region Procedure Store lease recovery...");
+    Configuration baseConf = server.getConfiguration();
+    FileSystem fs = CommonFSUtils.getWALFileSystem(baseConf);
+    Path globalWALRootDir = CommonFSUtils.getWALRootDir(baseConf);
+    Path rootDir = new Path(globalWALRootDir, MASTER_PROCEDURE_DIR);
+    // we will override some configurations so create a new one.
+    Configuration conf = new Configuration(baseConf);
+    CommonFSUtils.setRootDir(conf, rootDir);
+    CommonFSUtils.setWALRootDir(conf, rootDir);
+    RegionFlusherAndCompactor.setupConf(conf);
+
+    walRoller = RegionProcedureStoreWALRoller.create(conf, server, fs, rootDir, globalWALRootDir);
+    walRoller.start();
+    conf.setInt(AbstractFSWAL.MAX_LOGS, conf.getInt(MAX_WALS_KEY, DEFAULT_MAX_WALS));
+    if (conf.get(USE_HSYNC_KEY) != null) {
+      conf.set(HRegion.WAL_HSYNC_CONF_KEY, conf.get(USE_HSYNC_KEY));
+    }
+    walFactory = new WALFactory(conf, server.getServerName().toString());
+    Path dataDir = new Path(rootDir, DATA_DIR);
+    if (fs.exists(dataDir)) {
+      // load the existing region.
+      region = open(conf, fs, rootDir, dataDir);
+    } else {
+      // bootstrapping...
+      region = bootstrap(conf, fs, rootDir, dataDir);
+    }
+    flusherAndCompactor = new RegionFlusherAndCompactor(conf, server, region);
+    walRoller.setFlusherAndCompactor(flusherAndCompactor);
+    tryMigrate(fs);
+  }
+
+  @Override
+  public void load(ProcedureLoader loader) throws IOException {
+    List<ProcedureProtos.Procedure> procs = new ArrayList<>();
+    long maxProcId = 0;
+
+    try (RegionScanner scanner = region.getScanner(new Scan().addColumn(FAMILY, PROC_QUALIFIER))) {
+      List<Cell> cells = new ArrayList<>();
+      boolean moreRows;
+      do {
+        moreRows = scanner.next(cells);
+        if (cells.isEmpty()) {
+          continue;
+        }
+        Cell cell = cells.get(0);
+        cells.clear();
+        maxProcId = Math.max(maxProcId,
+          Bytes.toLong(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
+        if (cell.getValueLength() > 0) {
+          ProcedureProtos.Procedure proto = ProcedureProtos.Procedure.parser()
+            .parseFrom(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+          procs.add(proto);
+        }
+      } while (moreRows);
+    }
+    loader.setMaxProcId(maxProcId);
+    ProcedureTree tree = ProcedureTree.build(procs);
+    loader.load(tree.getValidProcs());
+    loader.handleCorrupted(tree.getCorruptedProcs());
+  }
+
+  private void serializePut(Procedure<?> proc, List<Mutation> mutations, List<byte[]> rowsToLock)
+    throws IOException {
+    ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc);
+    byte[] row = Bytes.toBytes(proc.getProcId());
+    mutations.add(new Put(row).addColumn(FAMILY, PROC_QUALIFIER, proto.toByteArray()));
+    rowsToLock.add(row);
+  }
+
+  // As we need to keep the max procedure id, here we can not simply delete the procedure, just fill
+  // the proc column with an empty array.
+  private void serializeDelete(long procId, List<Mutation> mutations, List<byte[]> rowsToLock) {
+    byte[] row = Bytes.toBytes(procId);
+    mutations.add(new Put(row).addColumn(FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
+    rowsToLock.add(row);
+  }
+
+  @Override
+  public void insert(Procedure<?> proc, Procedure<?>[] subProcs) {
+    if (subProcs == null || subProcs.length == 0) {
+      // same with update, just insert a single procedure
+      update(proc);
+      return;
+    }
+    List<Mutation> mutations = new ArrayList<>(subProcs.length + 1);
+    List<byte[]> rowsToLock = new ArrayList<>(subProcs.length + 1);
+    try {
+      serializePut(proc, mutations, rowsToLock);
+      for (Procedure<?> subProc : subProcs) {
+        serializePut(subProc, mutations, rowsToLock);
+      }
+      region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
+    } catch (IOException e) {
+      LOG.error(HBaseMarkers.FATAL, "Failed to insert proc {}, sub procs {}", proc,
+        Arrays.toString(subProcs), e);
+      throw new UncheckedIOException(e);
+    }
+    flusherAndCompactor.onUpdate();
+  }
+
+  @Override
+  public void insert(Procedure<?>[] procs) {
+    List<Mutation> mutations = new ArrayList<>(procs.length);
+    List<byte[]> rowsToLock = new ArrayList<>(procs.length);
+    try {
+      for (Procedure<?> proc : procs) {
+        serializePut(proc, mutations, rowsToLock);
+      }
+      region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
+    } catch (IOException e) {
+      LOG.error(HBaseMarkers.FATAL, "Failed to insert procs {}", Arrays.toString(procs), e);
+      throw new UncheckedIOException(e);
+    }
+    flusherAndCompactor.onUpdate();
+  }
+
+  @Override
+  public void update(Procedure<?> proc) {
+    try {
+      ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc);
+      region.put(new Put(Bytes.toBytes(proc.getProcId())).addColumn(FAMILY, PROC_QUALIFIER,
+        proto.toByteArray()));
+    } catch (IOException e) {
+      LOG.error(HBaseMarkers.FATAL, "Failed to update proc {}", proc, e);
+      throw new UncheckedIOException(e);
+    }
+    flusherAndCompactor.onUpdate();
+  }
+
+  @Override
+  public void delete(long procId) {
+    try {
+      region
+        .put(new Put(Bytes.toBytes(procId)).addColumn(FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
+    } catch (IOException e) {
+      LOG.error(HBaseMarkers.FATAL, "Failed to delete pid={}", procId, e);
+      throw new UncheckedIOException(e);
+    }
+    flusherAndCompactor.onUpdate();
+  }
+
+  @Override
+  public void delete(Procedure<?> parentProc, long[] subProcIds) {
+    List<Mutation> mutations = new ArrayList<>(subProcIds.length + 1);
+    List<byte[]> rowsToLock = new ArrayList<>(subProcIds.length + 1);
+    try {
+      serializePut(parentProc, mutations, rowsToLock);
+      for (long subProcId : subProcIds) {
+        serializeDelete(subProcId, mutations, rowsToLock);
+      }
+      region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
+    } catch (IOException e) {
+      LOG.error(HBaseMarkers.FATAL, "Failed to delete parent proc {}, sub pids={}", parentProc,
+        Arrays.toString(subProcIds), e);
+      throw new UncheckedIOException(e);
+    }
+    flusherAndCompactor.onUpdate();
+  }
+
+  @Override
+  public void delete(long[] procIds, int offset, int count) {
+    if (count == 0) {
+      return;
+    }
+    if (count == 1) {
+      delete(procIds[offset]);
+      return;
+    }
+    List<Mutation> mutations = new ArrayList<>(count);
+    List<byte[]> rowsToLock = new ArrayList<>(count);
+    for (int i = 0; i < count; i++) {
+      long procId = procIds[offset + i];
+      serializeDelete(procId, mutations, rowsToLock);
+    }
+    try {
+      region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
+    } catch (IOException e) {
+      LOG.error(HBaseMarkers.FATAL, "Failed to delete pids={}", Arrays.toString(procIds), e);
+      throw new UncheckedIOException(e);
+    }
+    flusherAndCompactor.onUpdate();
+  }
+
+  @Override
+  public void cleanup() {
+    // actually delete the procedures if it is not the one with the max procedure id.
+    List<Cell> cells = new ArrayList<Cell>();
+    try (RegionScanner scanner =
+      region.getScanner(new Scan().addColumn(FAMILY, PROC_QUALIFIER).setReversed(true))) {
+      // skip the row with max procedure id
+      boolean moreRows = scanner.next(cells);
+      if (cells.isEmpty()) {
+        return;
+      }
+      cells.clear();
+      while (moreRows) {
+        moreRows = scanner.next(cells);
+        if (cells.isEmpty()) {
+          continue;
+        }
+        Cell cell = cells.get(0);
+        cells.clear();
+        if (cell.getValueLength() == 0) {
+          region.delete(new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
+        }
+      }
+    } catch (IOException e) {
+      LOG.warn("Failed to clean up delete procedures", e);
+    }
+  }
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreWALRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreWALRoller.java
new file mode 100644
index 0000000..fc84c27
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreWALRoller.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2.store.region;
+
+import static org.apache.hadoop.hbase.HConstants.HREGION_OLDLOGDIR_NAME;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.AbstractWALRoller;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * As long as there is no RegionServerServices for the procedure store region, we need implement log
+ * roller logic by our own.
+ * <p/>
+ * We can reuse most of the code for normal wal roller, the only difference is that there is only
+ * one region, so in {@link #scheduleFlush(String)} method we can just schedule flush for the
+ * procedure store region.
+ */
+@InterfaceAudience.Private
+final class RegionProcedureStoreWALRoller extends AbstractWALRoller<Abortable> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RegionProcedureStoreWALRoller.class);
+
+  static final String ROLL_PERIOD_MS_KEY = "hbase.procedure.store.region.walroll.period.ms";
+
+  private static final long DEFAULT_ROLL_PERIOD_MS = TimeUnit.MINUTES.toMillis(15);
+
+  private volatile RegionFlusherAndCompactor flusherAndCompactor;
+
+  private final FileSystem fs;
+
+  private final Path walArchiveDir;
+
+  private final Path globalWALArchiveDir;
+
+  private RegionProcedureStoreWALRoller(Configuration conf, Abortable abortable, FileSystem fs,
+    Path walRootDir, Path globalWALRootDir) {
+    super("RegionProcedureStoreWALRoller", conf, abortable);
+    this.fs = fs;
+    this.walArchiveDir = new Path(walRootDir, HREGION_OLDLOGDIR_NAME);
+    this.globalWALArchiveDir = new Path(globalWALRootDir, HREGION_OLDLOGDIR_NAME);
+  }
+
+  @Override
+  protected void afterRoll(WAL wal) {
+    // move the archived WAL files to the global archive path
+    try {
+      if (!fs.exists(globalWALArchiveDir) && !fs.mkdirs(globalWALArchiveDir)) {
+        LOG.warn("Failed to create global archive dir {}", globalWALArchiveDir);
+        return;
+      }
+      FileStatus[] archivedWALFiles = fs.listStatus(walArchiveDir);
+      if (archivedWALFiles == null) {
+        return;
+      }
+      for (FileStatus status : archivedWALFiles) {
+        Path file = status.getPath();
+        Path newFile = new Path(globalWALArchiveDir,
+          file.getName() + MasterProcedureUtil.ARCHIVED_PROC_WAL_SUFFIX);
+        if (fs.rename(file, newFile)) {
+          LOG.info("Successfully moved {} to {}", file, newFile);
+        } else {
+          LOG.warn("Failed to move archived wal from {} to global place {}", file, newFile);
+        }
+      }
+    } catch (IOException e) {
+      LOG.warn("Failed to move archived wals from {} to global dir {}", walArchiveDir,
+        globalWALArchiveDir, e);
+    }
+  }
+
+  @Override
+  protected void scheduleFlush(String encodedRegionName) {
+    RegionFlusherAndCompactor flusher = this.flusherAndCompactor;
+    if (flusher != null) {
+      flusher.requestFlush();
+    }
+  }
+
+  void setFlusherAndCompactor(RegionFlusherAndCompactor flusherAndCompactor) {
+    this.flusherAndCompactor = flusherAndCompactor;
+  }
+
+  static RegionProcedureStoreWALRoller create(Configuration conf, Abortable abortable,
+    FileSystem fs, Path walRootDir, Path globalWALRootDir) {
+    // we can not run with wal disabled, so force set it to true.
+    conf.setBoolean(WALFactory.WAL_ENABLED, true);
+    // we do not need this feature, so force disable it.
+    conf.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, false);
+    conf.setLong(WAL_ROLL_PERIOD_KEY, conf.getLong(ROLL_PERIOD_MS_KEY, DEFAULT_ROLL_PERIOD_MS));
+    return new RegionProcedureStoreWALRoller(conf, abortable, fs, walRootDir, globalWALRootDir);
+  }
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index e7b3b23..067ce2b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -43,6 +43,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NavigableMap;
 import java.util.NavigableSet;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.RandomAccess;
 import java.util.Set;
@@ -71,6 +72,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -160,8 +162,6 @@ import org.apache.hadoop.hbase.trace.TraceUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.ClassSize;
-import org.apache.hadoop.hbase.util.CompressionTest;
-import org.apache.hadoop.hbase.util.EncryptionTest;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.HashedBytes;
@@ -236,12 +236,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   public static final String HBASE_MAX_CELL_SIZE_KEY = "hbase.server.keyvalue.maxsize";
   public static final int DEFAULT_MAX_CELL_SIZE = 10485760;
 
-  /**
-   * This is the global default value for durability. All tables/mutations not
-   * defining a durability or using USE_DEFAULT will default to this value.
-   */
-  private static final Durability DEFAULT_DURABILITY = Durability.SYNC_WAL;
-
   public static final String HBASE_REGIONSERVER_MINIBATCH_SIZE =
       "hbase.regionserver.minibatch.size";
   public static final int DEFAULT_HBASE_REGIONSERVER_MINIBATCH_SIZE = 20000;
@@ -249,6 +243,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   public static final String WAL_HSYNC_CONF_KEY = "hbase.wal.hsync";
   public static final boolean DEFAULT_WAL_HSYNC = false;
 
+  /**
+   * This is for for using HRegion as a local storage, where we may put the recovered edits in a
+   * special place. Once this is set, we will only replay the recovered edits under this directory
+   * and ignore the original replay directory configs.
+   */
+  public static final String SPECIAL_RECOVERED_EDITS_DIR =
+    "hbase.hregion.special.recovered.edits.dir";
+
   final AtomicBoolean closed = new AtomicBoolean(false);
 
   /* Closing can take some time; use the closing flag if there is stuff we don't
@@ -4556,6 +4558,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     return size.getHeapSize() + size.getOffHeapSize() > getMemStoreFlushSize();
   }
 
+  private void deleteRecoveredEdits(FileSystem fs, Iterable<Path> files) throws IOException {
+    for (Path file : files) {
+      if (!fs.delete(file, false)) {
+        LOG.error("Failed delete of {}", file);
+      } else {
+        LOG.debug("Deleted recovered.edits file={}", file);
+      }
+    }
+  }
+
   /**
    * Read the edits put under this region by wal splitting process.  Put
    * the recovered edits back up into this region.
@@ -4587,11 +4599,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * the maxSeqId for the store to be applied, else its skipped.
    * @return the sequence id of the last edit added to this region out of the
    * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
-   * @throws IOException
    */
-  protected long replayRecoveredEditsIfAny(Map<byte[], Long> maxSeqIdInStores,
-      final CancelableProgressable reporter, final MonitoredTask status)
-      throws IOException {
+  @VisibleForTesting
+  long replayRecoveredEditsIfAny(Map<byte[], Long> maxSeqIdInStores,
+    final CancelableProgressable reporter, final MonitoredTask status) throws IOException {
     long minSeqIdForTheRegion = -1;
     for (Long maxSeqIdInStore : maxSeqIdInStores.values()) {
       if (maxSeqIdInStore < minSeqIdForTheRegion || minSeqIdForTheRegion == -1) {
@@ -4599,63 +4610,74 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
     }
     long seqId = minSeqIdForTheRegion;
-
-    FileSystem walFS = getWalFileSystem();
-    FileSystem rootFS = getFilesystem();
-    Path wrongRegionWALDir = FSUtils.getWrongWALRegionDir(conf, getRegionInfo().getTable(),
-      getRegionInfo().getEncodedName());
-    Path regionWALDir = getWALRegionDir();
-    Path regionDir = FSUtils.getRegionDirFromRootDir(FSUtils.getRootDir(conf), getRegionInfo());
-
-    // We made a mistake in HBASE-20734 so we need to do this dirty hack...
-    NavigableSet<Path> filesUnderWrongRegionWALDir =
-      WALSplitUtil.getSplitEditFilesSorted(walFS, wrongRegionWALDir);
-    seqId = Math.max(seqId, replayRecoveredEditsForPaths(minSeqIdForTheRegion, walFS,
-      filesUnderWrongRegionWALDir, reporter, regionDir));
-    // This is to ensure backwards compatability with HBASE-20723 where recovered edits can appear
-    // under the root dir even if walDir is set.
-    NavigableSet<Path> filesUnderRootDir = Collections.emptyNavigableSet();
-    if (!regionWALDir.equals(regionDir)) {
-      filesUnderRootDir = WALSplitUtil.getSplitEditFilesSorted(rootFS, regionDir);
-      seqId = Math.max(seqId, replayRecoveredEditsForPaths(minSeqIdForTheRegion, rootFS,
-        filesUnderRootDir, reporter, regionDir));
-    }
-
-    NavigableSet<Path> files = WALSplitUtil.getSplitEditFilesSorted(walFS, regionWALDir);
-    seqId = Math.max(seqId, replayRecoveredEditsForPaths(minSeqIdForTheRegion, walFS,
-        files, reporter, regionWALDir));
-
-    if (seqId > minSeqIdForTheRegion) {
-      // Then we added some edits to memory. Flush and cleanup split edit files.
-      internalFlushcache(null, seqId, stores.values(), status, false, FlushLifeCycleTracker.DUMMY);
-    }
-    // Now delete the content of recovered edits. We're done w/ them.
-    if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) {
-      // For debugging data loss issues!
-      // If this flag is set, make use of the hfile archiving by making recovered.edits a fake
-      // column family. Have to fake out file type too by casting our recovered.edits as storefiles
-      String fakeFamilyName = WALSplitUtil.getRegionDirRecoveredEditsDir(regionWALDir).getName();
-      Set<HStoreFile> fakeStoreFiles = new HashSet<>(files.size());
-      for (Path file : files) {
-        fakeStoreFiles.add(new HStoreFile(walFS, file, this.conf, null, null, true));
-      }
-      getRegionWALFileSystem().archiveRecoveredEdits(fakeFamilyName, fakeStoreFiles);
+    String specialRecoveredEditsDirStr = conf.get(SPECIAL_RECOVERED_EDITS_DIR);
+    if (org.apache.commons.lang3.StringUtils.isBlank(specialRecoveredEditsDirStr)) {
+      FileSystem walFS = getWalFileSystem();
+      FileSystem rootFS = getFilesystem();
+      Path wrongRegionWALDir = FSUtils.getWrongWALRegionDir(conf, getRegionInfo().getTable(),
+        getRegionInfo().getEncodedName());
+      Path regionWALDir = getWALRegionDir();
+      Path regionDir = FSUtils.getRegionDirFromRootDir(FSUtils.getRootDir(conf), getRegionInfo());
+
+      // We made a mistake in HBASE-20734 so we need to do this dirty hack...
+      NavigableSet<Path> filesUnderWrongRegionWALDir =
+        WALSplitUtil.getSplitEditFilesSorted(walFS, wrongRegionWALDir);
+      seqId = Math.max(seqId, replayRecoveredEditsForPaths(minSeqIdForTheRegion, walFS,
+        filesUnderWrongRegionWALDir, reporter, regionDir));
+      // This is to ensure backwards compatability with HBASE-20723 where recovered edits can appear
+      // under the root dir even if walDir is set.
+      NavigableSet<Path> filesUnderRootDir = Collections.emptyNavigableSet();
+      if (!regionWALDir.equals(regionDir)) {
+        filesUnderRootDir = WALSplitUtil.getSplitEditFilesSorted(rootFS, regionDir);
+        seqId = Math.max(seqId, replayRecoveredEditsForPaths(minSeqIdForTheRegion, rootFS,
+          filesUnderRootDir, reporter, regionDir));
+      }
+
+      NavigableSet<Path> files = WALSplitUtil.getSplitEditFilesSorted(walFS, regionWALDir);
+      seqId = Math.max(seqId,
+        replayRecoveredEditsForPaths(minSeqIdForTheRegion, walFS, files, reporter, regionWALDir));
+      if (seqId > minSeqIdForTheRegion) {
+        // Then we added some edits to memory. Flush and cleanup split edit files.
+        internalFlushcache(null, seqId, stores.values(), status, false,
+          FlushLifeCycleTracker.DUMMY);
+      }
+      // Now delete the content of recovered edits. We're done w/ them.
+      if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) {
+        // For debugging data loss issues!
+        // If this flag is set, make use of the hfile archiving by making recovered.edits a fake
+        // column family. Have to fake out file type too by casting our recovered.edits as
+        // storefiles
+        String fakeFamilyName = WALSplitUtil.getRegionDirRecoveredEditsDir(regionWALDir).getName();
+        Set<HStoreFile> fakeStoreFiles = new HashSet<>(files.size());
+        for (Path file : files) {
+          fakeStoreFiles.add(new HStoreFile(walFS, file, this.conf, null, null, true));
+        }
+        getRegionWALFileSystem().archiveRecoveredEdits(fakeFamilyName, fakeStoreFiles);
+      } else {
+        deleteRecoveredEdits(walFS, Iterables.concat(files, filesUnderWrongRegionWALDir));
+        deleteRecoveredEdits(rootFS, filesUnderRootDir);
+      }
     } else {
-      for (Path file : Iterables.concat(files, filesUnderWrongRegionWALDir)) {
-        if (!walFS.delete(file, false)) {
-          LOG.error("Failed delete of {}", file);
-        } else {
-          LOG.debug("Deleted recovered.edits file={}", file);
+      Path recoveredEditsDir = new Path(specialRecoveredEditsDirStr);
+      FileSystem fs = recoveredEditsDir.getFileSystem(conf);
+      FileStatus[] files = fs.listStatus(recoveredEditsDir);
+      LOG.debug("Found {} recovered edits file(s) under {}", files == null ? 0 : files.length,
+        recoveredEditsDir);
+      if (files != null) {
+        for (FileStatus file : files) {
+          seqId =
+            Math.max(seqId, replayRecoveredEdits(file.getPath(), maxSeqIdInStores, reporter, fs));
         }
       }
-      for (Path file : filesUnderRootDir) {
-        if (!rootFS.delete(file, false)) {
-          LOG.error("Failed delete of {}", file);
-        } else {
-          LOG.debug("Deleted recovered.edits file={}", file);
-        }
+      if (seqId > minSeqIdForTheRegion) {
+        // Then we added some edits to memory. Flush and cleanup split edit files.
+        internalFlushcache(null, seqId, stores.values(), status, false,
+          FlushLifeCycleTracker.DUMMY);
       }
+      deleteRecoveredEdits(fs,
+        Stream.of(files).map(FileStatus::getPath).collect(Collectors.toList()));
     }
+
     return seqId;
   }
 
@@ -4720,18 +4742,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     return seqid;
   }
 
-  /*
+  /**
    * @param edits File of recovered edits.
-   * @param maxSeqIdInStores Maximum sequenceid found in each store.  Edits in wal
-   * must be larger than this to be replayed for each store.
-   * @param reporter
-   * @return the sequence id of the last edit added to this region out of the
-   * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
-   * @throws IOException
+   * @param maxSeqIdInStores Maximum sequenceid found in each store. Edits in wal must be larger
+   *          than this to be replayed for each store.
+   * @return the sequence id of the last edit added to this region out of the recovered edits log or
+   *         <code>minSeqId</code> if nothing added from editlogs.
    */
-  private long replayRecoveredEdits(final Path edits,
-      Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter, FileSystem fs)
-    throws IOException {
+  private long replayRecoveredEdits(final Path edits, Map<byte[], Long> maxSeqIdInStores,
+    final CancelableProgressable reporter, FileSystem fs) throws IOException {
     String msg = "Replaying edits from " + edits;
     LOG.info(msg);
     MonitoredTask status = TaskMonitor.get().createStatus(msg);
@@ -7100,15 +7119,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @param wal shared WAL
    * @param initialize - true to initialize the region
    * @return new HRegion
-   * @throws IOException
    */
   public static HRegion createHRegion(final RegionInfo info, final Path rootDir,
-        final Configuration conf, final TableDescriptor hTableDescriptor,
-        final WAL wal, final boolean initialize)
-  throws IOException {
-    LOG.info("creating " + info
-        + ", tableDescriptor=" + (hTableDescriptor == null? "null": hTableDescriptor) +
-        ", regionDir=" + rootDir);
+    final Configuration conf, final TableDescriptor hTableDescriptor, final WAL wal,
+    final boolean initialize) throws IOException {
+    LOG.info("creating " + info + ", tableDescriptor=" +
+      (hTableDescriptor == null ? "null" : hTableDescriptor) + ", regionDir=" + rootDir);
     createRegionDir(conf, info, rootDir);
     FileSystem fs = rootDir.getFileSystem(conf);
     Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
@@ -7120,6 +7136,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   }
 
   /**
+   * Create a region under the given table directory.
+   */
+  public static HRegion createHRegion(Configuration conf, RegionInfo regionInfo, FileSystem fs,
+    Path tableDir, TableDescriptor tableDesc) throws IOException {
+    LOG.info("Creating {}, tableDescriptor={}, under table dir {}", regionInfo, tableDesc,
+      tableDir);
+    HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, regionInfo);
+    HRegion region = HRegion.newHRegion(tableDir, null, fs, conf, regionInfo, tableDesc, null);
+    return region;
+  }
+
+  /**
    * Create the region directory in the filesystem.
    */
   public static HRegionFileSystem createRegionDir(Configuration configuration, RegionInfo ri,
@@ -7266,18 +7294,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @return new HRegion
    */
   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
-      final Path rootDir, final RegionInfo info, final TableDescriptor htd, final WAL wal,
-      final RegionServerServices rsServices, final CancelableProgressable reporter)
-      throws IOException {
+    final Path rootDir, final RegionInfo info, final TableDescriptor htd, final WAL wal,
+    final RegionServerServices rsServices, final CancelableProgressable reporter)
+    throws IOException {
     Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
-    return openHRegion(conf, fs, rootDir, tableDir, info, htd, wal, rsServices, reporter);
+    return openHRegionFromTableDir(conf, fs, tableDir, info, htd, wal, rsServices, reporter);
   }
 
   /**
    * Open a Region.
    * @param conf The Configuration object to use.
    * @param fs Filesystem to use
-   * @param rootDir Root directory for HBase instance
    * @param info Info for region to be opened.
    * @param htd the table descriptor
    * @param wal WAL for region to use. This method will call
@@ -7288,15 +7315,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @param reporter An interface we can report progress against.
    * @return new HRegion
    */
-  public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
-      final Path rootDir, final Path tableDir, final RegionInfo info, final TableDescriptor htd,
-      final WAL wal, final RegionServerServices rsServices,
-      final CancelableProgressable reporter)
-      throws IOException {
-    if (info == null) throw new NullPointerException("Passed region info is null");
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Opening region: " + info);
-    }
+  public static HRegion openHRegionFromTableDir(final Configuration conf, final FileSystem fs,
+    final Path tableDir, final RegionInfo info, final TableDescriptor htd, final WAL wal,
+    final RegionServerServices rsServices, final CancelableProgressable reporter)
+    throws IOException {
+    Objects.requireNonNull(info, "RegionInfo cannot be null");
+    LOG.debug("Opening region: {}", info);
     HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices);
     return r.openHRegion(reporter);
   }
@@ -8819,4 +8843,4 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
     }
   }
-}
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
index 272925c..f5049c9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
@@ -18,23 +18,9 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
-import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
-import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.HasThread;
+import java.util.Map;
+import org.apache.hadoop.hbase.wal.AbstractWALRoller;
 import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.ipc.RemoteException;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,166 +38,21 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
  */
 @InterfaceAudience.Private
 @VisibleForTesting
-public class LogRoller extends HasThread implements Closeable {
+public class LogRoller extends AbstractWALRoller<RegionServerServices> {
   private static final Logger LOG = LoggerFactory.getLogger(LogRoller.class);
-  private final ConcurrentMap<WAL, Boolean> walNeedsRoll = new ConcurrentHashMap<>();
-  protected final RegionServerServices services;
-  private volatile long lastRollTime = System.currentTimeMillis();
-  // Period to roll log.
-  private final long rollPeriod;
-  private final int threadWakeFrequency;
-  // The interval to check low replication on hlog's pipeline
-  private long checkLowReplicationInterval;
-
-  private volatile boolean running = true;
-
-  public void addWAL(WAL wal) {
-    // check without lock first
-    if (walNeedsRoll.containsKey(wal)) {
-      return;
-    }
-    // this is to avoid race between addWAL and requestRollAll.
-    synchronized (this) {
-      if (walNeedsRoll.putIfAbsent(wal, Boolean.FALSE) == null) {
-        wal.registerWALActionsListener(new WALActionsListener() {
-          @Override
-          public void logRollRequested(WALActionsListener.RollRequestReason reason) {
-            // TODO logs will contend with each other here, replace with e.g. DelayedQueue
-            synchronized (LogRoller.this) {
-              walNeedsRoll.put(wal, Boolean.TRUE);
-              LogRoller.this.notifyAll();
-            }
-          }
-        });
-      }
-    }
-  }
-
-  public void requestRollAll() {
-    synchronized (this) {
-      List<WAL> wals = new ArrayList<WAL>(walNeedsRoll.keySet());
-      for (WAL wal : wals) {
-        walNeedsRoll.put(wal, Boolean.TRUE);
-      }
-      notifyAll();
-    }
-  }
 
   public LogRoller(RegionServerServices services) {
-    super("LogRoller");
-    this.services = services;
-    this.rollPeriod = this.services.getConfiguration().
-      getLong("hbase.regionserver.logroll.period", 3600000);
-    this.threadWakeFrequency = this.services.getConfiguration().
-      getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
-    this.checkLowReplicationInterval = this.services.getConfiguration().getLong(
-        "hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
-  }
-
-  /**
-   * we need to check low replication in period, see HBASE-18132
-   */
-  private void checkLowReplication(long now) {
-    try {
-      for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
-        WAL wal = entry.getKey();
-        boolean needRollAlready = entry.getValue();
-        if (needRollAlready || !(wal instanceof AbstractFSWAL)) {
-          continue;
-        }
-        ((AbstractFSWAL<?>) wal).checkLogLowReplication(checkLowReplicationInterval);
-      }
-    } catch (Throwable e) {
-      LOG.warn("Failed checking low replication", e);
-    }
-  }
-
-  private void abort(String reason, Throwable cause) {
-    // close all WALs before calling abort on RS.
-    // This is because AsyncFSWAL replies on us for rolling a new writer to make progress, and if we
-    // failed, AsyncFSWAL may be stuck, so we need to close it to let the upper layer know that it
-    // is already broken.
-    for (WAL wal : walNeedsRoll.keySet()) {
-      // shutdown rather than close here since we are going to abort the RS and the wals need to be
-      // split when recovery
-      try {
-        wal.shutdown();
-      } catch (IOException e) {
-        LOG.warn("Failed to shutdown wal", e);
-      }
-    }
-    this.services.abort(reason, cause);
+    super("LogRoller", services.getConfiguration(), services);
   }
 
-  @Override
-  public void run() {
-    while (running) {
-      boolean periodic = false;
-      long now = System.currentTimeMillis();
-      checkLowReplication(now);
-      periodic = (now - this.lastRollTime) > this.rollPeriod;
-      if (periodic) {
-        // Time for periodic roll, fall through
-        LOG.debug("WAL roll period {} ms elapsed", this.rollPeriod);
-      } else {
-        synchronized (this) {
-          if (walNeedsRoll.values().stream().anyMatch(Boolean::booleanValue)) {
-            // WAL roll requested, fall through
-            LOG.debug("WAL roll requested");
-          } else {
-            try {
-              wait(this.threadWakeFrequency);
-            } catch (InterruptedException e) {
-              // restore the interrupt state
-              Thread.currentThread().interrupt();
-            }
-            // goto the beginning to check whether again whether we should fall through to roll
-            // several WALs, and also check whether we should quit.
-            continue;
-          }
-        }
-      }
-      try {
-        this.lastRollTime = System.currentTimeMillis();
-        for (Iterator<Entry<WAL, Boolean>> iter = walNeedsRoll.entrySet().iterator(); iter
-            .hasNext();) {
-          Entry<WAL, Boolean> entry = iter.next();
-          WAL wal = entry.getKey();
-          // reset the flag in front to avoid missing roll request before we return from rollWriter.
-          walNeedsRoll.put(wal, Boolean.FALSE);
-          // Force the roll if the logroll.period is elapsed or if a roll was requested.
-          // The returned value is an array of actual region names.
-          byte[][] regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue());
-          if (regionsToFlush != null) {
-            for (byte[] r : regionsToFlush) {
-              scheduleFlush(Bytes.toString(r));
-            }
-          }
-        }
-      } catch (FailedLogCloseException | ConnectException e) {
-        abort("Failed log close in log roller", e);
-      } catch (IOException ex) {
-        // Abort if we get here.  We probably won't recover an IOE. HBASE-1132
-        abort("IOE in log roller",
-          ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex);
-      } catch (Exception ex) {
-        LOG.error("Log rolling failed", ex);
-        abort("Log rolling failed", ex);
-      }
-    }
-    LOG.info("LogRoller exiting.");
-  }
-
-  /**
-   * @param encodedRegionName Encoded name of region to flush.
-   */
-  private void scheduleFlush(String encodedRegionName) {
-    HRegion r = (HRegion) this.services.getRegion(encodedRegionName);
+  protected void scheduleFlush(String encodedRegionName) {
+    RegionServerServices services = this.abortable;
+    HRegion r = (HRegion) services.getRegion(encodedRegionName);
     if (r == null) {
       LOG.warn("Failed to schedule flush of {}, because it is not online on us", encodedRegionName);
       return;
     }
-    FlushRequester requester = this.services.getFlushRequester();
+    FlushRequester requester = services.getFlushRequester();
     if (requester == null) {
       LOG.warn("Failed to schedule flush of {}, region={}, because FlushRequester is null",
         encodedRegionName, r);
@@ -221,18 +62,8 @@ public class LogRoller extends HasThread implements Closeable {
     requester.requestFlush(r, true, FlushLifeCycleTracker.DUMMY);
   }
 
-  /**
-   * For testing only
-   * @return true if all WAL roll finished
-   */
   @VisibleForTesting
-  public boolean walRollFinished() {
-    return walNeedsRoll.values().stream().allMatch(needRoll -> !needRoll);
-  }
-
-  @Override
-  public void close() {
-    running = false;
-    interrupt();
+  Map<WAL, Boolean> getWalNeedsRoll() {
+    return this.walNeedsRoll;
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 6e8443f..422c217 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -132,6 +132,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
   protected static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout";
   protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min
 
+  public static final String MAX_LOGS = "hbase.regionserver.maxlogs";
+
   /**
    * file system instance
    */
@@ -434,8 +436,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
     this.blocksize = WALUtil.getWALBlockSize(this.conf, this.fs, this.walDir);
     float multiplier = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.5f);
     this.logrollsize = (long)(this.blocksize * multiplier);
-    this.maxLogs = conf.getInt("hbase.regionserver.maxlogs",
-      Math.max(32, calculateMaxLogFiles(conf, logrollsize)));
+    this.maxLogs = conf.getInt(MAX_LOGS, Math.max(32, calculateMaxLogFiles(conf, logrollsize)));
 
     LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" +
       StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" +
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java
similarity index 76%
copy from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
copy to hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java
index 272925c..cd4dc52 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.regionserver;
+package org.apache.hadoop.hbase.wal;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -27,35 +26,39 @@ import java.util.List;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
 import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.HasThread;
-import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
 /**
  * Runs periodically to determine if the WAL should be rolled.
- *
- * NOTE: This class extends Thread rather than Chore because the sleep time
- * can be interrupted when there is something to do, rather than the Chore
- * sleep time which is invariant.
- *
+ * <p/>
+ * NOTE: This class extends Thread rather than Chore because the sleep time can be interrupted when
+ * there is something to do, rather than the Chore sleep time which is invariant.
+ * <p/>
+ * The {@link #scheduleFlush(String)} is abstract here, as sometimes we hold a region without a
+ * region server but we still want to roll its WAL.
+ * <p/>
  * TODO: change to a pool of threads
  */
 @InterfaceAudience.Private
-@VisibleForTesting
-public class LogRoller extends HasThread implements Closeable {
-  private static final Logger LOG = LoggerFactory.getLogger(LogRoller.class);
-  private final ConcurrentMap<WAL, Boolean> walNeedsRoll = new ConcurrentHashMap<>();
-  protected final RegionServerServices services;
+public abstract class AbstractWALRoller<T extends Abortable> extends HasThread
+  implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractWALRoller.class);
+
+  protected static final String WAL_ROLL_PERIOD_KEY = "hbase.regionserver.logroll.period";
+
+  protected final ConcurrentMap<WAL, Boolean> walNeedsRoll = new ConcurrentHashMap<>();
+  protected final T abortable;
   private volatile long lastRollTime = System.currentTimeMillis();
   // Period to roll log.
   private final long rollPeriod;
@@ -77,9 +80,9 @@ public class LogRoller extends HasThread implements Closeable {
           @Override
           public void logRollRequested(WALActionsListener.RollRequestReason reason) {
             // TODO logs will contend with each other here, replace with e.g. DelayedQueue
-            synchronized (LogRoller.this) {
+            synchronized (AbstractWALRoller.this) {
               walNeedsRoll.put(wal, Boolean.TRUE);
-              LogRoller.this.notifyAll();
+              AbstractWALRoller.this.notifyAll();
             }
           }
         });
@@ -97,15 +100,13 @@ public class LogRoller extends HasThread implements Closeable {
     }
   }
 
-  public LogRoller(RegionServerServices services) {
-    super("LogRoller");
-    this.services = services;
-    this.rollPeriod = this.services.getConfiguration().
-      getLong("hbase.regionserver.logroll.period", 3600000);
-    this.threadWakeFrequency = this.services.getConfiguration().
-      getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
-    this.checkLowReplicationInterval = this.services.getConfiguration().getLong(
-        "hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
+  protected AbstractWALRoller(String name, Configuration conf, T abortable) {
+    super(name);
+    this.abortable = abortable;
+    this.rollPeriod = conf.getLong(WAL_ROLL_PERIOD_KEY, 3600000);
+    this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
+    this.checkLowReplicationInterval =
+      conf.getLong("hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
   }
 
   /**
@@ -140,7 +141,7 @@ public class LogRoller extends HasThread implements Closeable {
         LOG.warn("Failed to shutdown wal", e);
       }
     }
-    this.services.abort(reason, cause);
+    abortable.abort(reason, cause);
   }
 
   @Override
@@ -174,7 +175,7 @@ public class LogRoller extends HasThread implements Closeable {
       try {
         this.lastRollTime = System.currentTimeMillis();
         for (Iterator<Entry<WAL, Boolean>> iter = walNeedsRoll.entrySet().iterator(); iter
-            .hasNext();) {
+          .hasNext();) {
           Entry<WAL, Boolean> entry = iter.next();
           WAL wal = entry.getKey();
           // reset the flag in front to avoid missing roll request before we return from rollWriter.
@@ -187,11 +188,12 @@ public class LogRoller extends HasThread implements Closeable {
               scheduleFlush(Bytes.toString(r));
             }
           }
+          afterRoll(wal);
         }
       } catch (FailedLogCloseException | ConnectException e) {
         abort("Failed log close in log roller", e);
       } catch (IOException ex) {
-        // Abort if we get here.  We probably won't recover an IOE. HBASE-1132
+        // Abort if we get here. We probably won't recover an IOE. HBASE-1132
         abort("IOE in log roller",
           ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex);
       } catch (Exception ex) {
@@ -203,31 +205,35 @@ public class LogRoller extends HasThread implements Closeable {
   }
 
   /**
+   * Called after we finish rolling the give {@code wal}.
+   */
+  protected void afterRoll(WAL wal) {
+  }
+
+  /**
    * @param encodedRegionName Encoded name of region to flush.
    */
-  private void scheduleFlush(String encodedRegionName) {
-    HRegion r = (HRegion) this.services.getRegion(encodedRegionName);
-    if (r == null) {
-      LOG.warn("Failed to schedule flush of {}, because it is not online on us", encodedRegionName);
-      return;
-    }
-    FlushRequester requester = this.services.getFlushRequester();
-    if (requester == null) {
-      LOG.warn("Failed to schedule flush of {}, region={}, because FlushRequester is null",
-        encodedRegionName, r);
-      return;
-    }
-    // force flushing all stores to clean old logs
-    requester.requestFlush(r, true, FlushLifeCycleTracker.DUMMY);
+  protected abstract void scheduleFlush(String encodedRegionName);
+
+  private boolean isWaiting() {
+    Thread.State state = getThread().getState();
+    return state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING;
   }
 
   /**
-   * For testing only
    * @return true if all WAL roll finished
    */
-  @VisibleForTesting
   public boolean walRollFinished() {
-    return walNeedsRoll.values().stream().allMatch(needRoll -> !needRoll);
+    return walNeedsRoll.values().stream().allMatch(needRoll -> !needRoll) && isWaiting();
+  }
+
+  /**
+   * Wait until all wals have been rolled after calling {@link #requestRollAll()}.
+   */
+  public void waitUntilWalRollFinished() throws InterruptedException {
+    while (!walRollFinished()) {
+      Thread.sleep(100);
+    }
   }
 
   @Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index dab65f3..30bb77e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -83,6 +83,8 @@ public class WALFactory {
 
   public static final String META_WAL_PROVIDER = "hbase.wal.meta_provider";
 
+  public static final String WAL_ENABLED = "hbase.regionserver.hlog.enabled";
+
   final String factoryId;
   private final WALProvider provider;
   // The meta updates are written to a different wal. If this
@@ -194,7 +196,7 @@ public class WALFactory {
     this.conf = conf;
     this.factoryId = factoryId;
     // end required early initialization
-    if (conf.getBoolean("hbase.regionserver.hlog.enabled", true)) {
+    if (conf.getBoolean(WAL_ENABLED, true)) {
       provider = getProvider(WAL_PROVIDER, DEFAULT_WAL_PROVIDER, null);
     } else {
       // special handling of existing configuration behavior.
diff --git a/hbase-server/src/main/resources/hbase-webapps/master/procedures.jsp b/hbase-server/src/main/resources/hbase-webapps/master/procedures.jsp
index ea252cf..5000482 100644
--- a/hbase-server/src/main/resources/hbase-webapps/master/procedures.jsp
+++ b/hbase-server/src/main/resources/hbase-webapps/master/procedures.jsp
@@ -30,21 +30,12 @@
   import="org.apache.hadoop.hbase.procedure2.LockedResource"
   import="org.apache.hadoop.hbase.procedure2.Procedure"
   import="org.apache.hadoop.hbase.procedure2.ProcedureExecutor"
-  import="org.apache.hadoop.hbase.procedure2.store.wal.ProcedureWALFile"
-  import="org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore"
   import="org.apache.hadoop.hbase.procedure2.util.StringUtils"
   import="org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix"
 %>
 <%
   HMaster master = (HMaster) getServletContext().getAttribute(HMaster.MASTER);
   ProcedureExecutor<MasterProcedureEnv> procExecutor = master.getMasterProcedureExecutor();
-  WALProcedureStore walStore = master.getWalProcedureStore();
-
-  ArrayList<WALProcedureStore.SyncMetrics> syncMetricsBuff = walStore.getSyncMetrics();
-  long millisToNextRoll = walStore.getMillisToNextPeriodicRoll();
-  long millisFromLastRoll = walStore.getMillisFromLastRoll();
-  ArrayList<ProcedureWALFile> procedureWALFiles = walStore.getActiveLogs();
-  Set<ProcedureWALFile> corruptedWALFiles = walStore.getCorruptedLogs();
   List<Procedure<MasterProcedureEnv>> procedures = procExecutor.getProcedures();
   Collections.sort(procedures, new Comparator<Procedure>() {
     @Override
@@ -159,109 +150,4 @@
   <% } %>
 </div>
 <br />
-<div class="container-fluid content">
-  <div class="row">
-    <div class="page-header">
-      <h2>Procedure WAL State</h2>
-    </div>
-  </div>
-  <div class="tabbable">
-    <ul class="nav nav-pills">
-      <li class="active">
-        <a href="#tab_WALFiles" data-toggle="tab">WAL files</a>
-      </li>
-      <li class="">
-        <a href="#tab_WALFilesCorrupted" data-toggle="tab">Corrupted WAL files</a>
-      </li>
-      <li class="">
-        <a href="#tab_WALRollTime" data-toggle="tab">WAL roll time</a>
-      </li>
-      <li class="">
-        <a href="#tab_SyncStats" data-toggle="tab">Sync stats</a>
-      </li>
-    </ul>
-    <div class="tab-content" style="padding-bottom: 9px; border-bottom: 1px solid #ddd;">
-      <div class="tab-pane active" id="tab_WALFiles">
-        <% if (procedureWALFiles != null && procedureWALFiles.size() > 0) { %>
-          <table class="table table-striped">
-            <tr>
-              <th>LogID</th>
-              <th>Size</th>
-              <th>Timestamp</th>
-              <th>Path</th>
-            </tr>
-            <% for (int i = procedureWALFiles.size() - 1; i >= 0; --i) { %>
-            <%    ProcedureWALFile pwf = procedureWALFiles.get(i); %>
-            <tr>
-              <td> <%= pwf.getLogId() %></td>
-              <td> <%= TraditionalBinaryPrefix.long2String(pwf.getSize(), "B", 1) %> </td>
-              <td> <%= new Date(pwf.getTimestamp()) %> </td>
-              <td> <%= escapeXml(pwf.toString()) %> </td>
-            </tr>
-            <% } %>
-          </table>
-        <% } else {%>
-          <p> No WAL files</p>
-        <% } %>
-      </div>
-      <div class="tab-pane" id="tab_WALFilesCorrupted">
-      <% if (corruptedWALFiles != null && corruptedWALFiles.size() > 0) { %>
-        <table class="table table-striped">
-          <tr>
-            <th>LogID</th>
-            <th>Size</th>
-            <th>Timestamp</th>
-            <th>Path</th>
-          </tr>
-          <% for (ProcedureWALFile cwf:corruptedWALFiles) { %>
-          <tr>
-            <td> <%= cwf.getLogId() %></td>
-            <td> <%= TraditionalBinaryPrefix.long2String(cwf.getSize(), "B", 1) %> </td>
-            <td> <%= new Date(cwf.getTimestamp()) %> </td>
-            <td> <%= escapeXml(cwf.toString()) %> </td>
-          </tr>
-          <% } %>
-          </table>
-      <% } else {%>
-        <p> No corrupted WAL files</p>
-      <% } %>
-      </div>
-      <div class="tab-pane" id="tab_WALRollTime">
-        <table class="table table-striped">
-          <tr>
-            <th> Milliseconds to next roll</th>
-            <th> Milliseconds from last roll</th>
-          </tr>
-          <tr>
-            <td> <%=StringUtils.humanTimeDiff(millisToNextRoll)  %></td>
-            <td> <%=StringUtils.humanTimeDiff(millisFromLastRoll) %></td>
-          </tr>
-        </table>
-      </div>
-      <div class="tab-pane" id="tab_SyncStats">
-        <table class="table table-striped">
-          <tr>
-            <th> Time</th>
-            <th> Sync Wait</th>
-            <th> Last num of synced entries</th>
-            <th> Total Synced</th>
-            <th> Synced per second</th>
-          </tr>
-          <% for (int i = syncMetricsBuff.size() - 1; i >= 0; --i) { %>
-          <%    WALProcedureStore.SyncMetrics syncMetrics = syncMetricsBuff.get(i); %>
-          <tr>
-            <td> <%= new Date(syncMetrics.getTimestamp()) %></td>
-            <td> <%= StringUtils.humanTimeDiff(syncMetrics.getSyncWaitMs()) %></td>
-            <td> <%= syncMetrics.getSyncedEntries() %></td>
-            <td> <%= TraditionalBinaryPrefix.long2String(syncMetrics.getTotalSyncedBytes(), "B", 1) %></td>
-            <td> <%= TraditionalBinaryPrefix.long2String((long)syncMetrics.getSyncedPerSec(), "B", 1) %></td>
-          </tr>
-          <%} %>
-        </table>
-        </div>
-      </div>
-  </div>
-</div>
-<br />
-
 <jsp:include page="footer.jsp" />
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestLoadProcedureError.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestLoadProcedureError.java
index 0a57dba..9dbc587 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestLoadProcedureError.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestLoadProcedureError.java
@@ -126,7 +126,7 @@ public class TestLoadProcedureError {
     ARRIVE.await();
     FAIL_LOAD = true;
     // do not persist the store tracker
-    UTIL.getMiniHBaseCluster().getMaster().getWalProcedureStore().stop(true);
+    UTIL.getMiniHBaseCluster().getMaster().getProcedureStore().stop(true);
     UTIL.getMiniHBaseCluster().getMaster().abort("for testing");
     waitNoMaster();
     // restart twice, and should fail twice, as we will throw an exception in the afterReplay above
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterMetricsWrapper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterMetricsWrapper.java
index 395095e..10d56b8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterMetricsWrapper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterMetricsWrapper.java
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hbase.master;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
 import java.util.AbstractMap.SimpleImmutableEntry;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -89,7 +89,9 @@ public class TestMasterMetricsWrapper {
     }
     assertEquals(regionServerCount - 1, info.getNumRegionServers());
     assertEquals(1, info.getNumDeadRegionServers());
-    assertEquals(1, info.getNumWALFiles());
+    // now we do not expose this information as WALProcedureStore is not the only ProcedureStore
+    // implementation any more.
+    assertEquals(0, info.getNumWALFiles());
   }
 
   @Test
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureWalLease.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureWalLease.java
deleted file mode 100644
index 37bc6de..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureWalLease.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master.procedure;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.StartMiniClusterOption;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
-import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
-import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
-import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.testclassification.MasterTests;
-import org.apache.hadoop.hbase.util.ModifyRegionUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Category({MasterTests.class, LargeTests.class})
-@Ignore
-public class TestMasterProcedureWalLease {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestMasterProcedureWalLease.class);
-
-  private static final Logger LOG = LoggerFactory.getLogger(TestMasterProcedureWalLease.class);
-
-  @Rule
-  public TestName name = new TestName();
-
-  protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
-
-  private static void setupConf(Configuration conf) {
-    // don't waste time retrying with the roll, the test is already slow enough.
-    conf.setInt(WALProcedureStore.MAX_RETRIES_BEFORE_ROLL_CONF_KEY, 1);
-    conf.setInt(WALProcedureStore.WAIT_BEFORE_ROLL_CONF_KEY, 0);
-    conf.setInt(WALProcedureStore.ROLL_RETRIES_CONF_KEY, 1);
-    conf.setInt(WALProcedureStore.MAX_SYNC_FAILURE_ROLL_CONF_KEY, 1);
-  }
-
-  @Before
-  public void setup() throws Exception {
-    setupConf(UTIL.getConfiguration());
-    StartMiniClusterOption option = StartMiniClusterOption.builder()
-        .numMasters(2).numRegionServers(3).numDataNodes(3).build();
-    UTIL.startMiniCluster(option);
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    try {
-      UTIL.shutdownMiniCluster();
-    } catch (Exception e) {
-      LOG.warn("failure shutting down cluster", e);
-    }
-  }
-
-  @Test
-  public void testWalRecoverLease() throws Exception {
-    final ProcedureStore masterStore = getMasterProcedureExecutor().getStore();
-    assertTrue("expected WALStore for this test", masterStore instanceof WALProcedureStore);
-
-    HMaster firstMaster = UTIL.getHBaseCluster().getMaster();
-    // Abort Latch for the master store
-    final CountDownLatch masterStoreAbort = new CountDownLatch(1);
-    masterStore.registerListener(new ProcedureStore.ProcedureStoreListener() {
-      @Override
-      public void postSync() {}
-
-      @Override
-      public void abortProcess() {
-        LOG.debug("Abort store of Master");
-        masterStoreAbort.countDown();
-      }
-    });
-
-    // startup a fake master the new WAL store will take the lease
-    // and the active master should abort.
-    HMaster backupMaster3 = Mockito.mock(HMaster.class);
-    Mockito.doReturn(firstMaster.getConfiguration()).when(backupMaster3).getConfiguration();
-    Mockito.doReturn(true).when(backupMaster3).isActiveMaster();
-    final WALProcedureStore backupStore3 = new WALProcedureStore(firstMaster.getConfiguration(),
-        ((WALProcedureStore)masterStore).getWALDir(),
-        null,
-        new MasterProcedureEnv.WALStoreLeaseRecovery(backupMaster3));
-    // Abort Latch for the test store
-    final CountDownLatch backupStore3Abort = new CountDownLatch(1);
-    backupStore3.registerListener(new ProcedureStore.ProcedureStoreListener() {
-      @Override
-      public void postSync() {}
-
-      @Override
-      public void abortProcess() {
-        LOG.debug("Abort store of backupMaster3");
-        backupStore3Abort.countDown();
-        backupStore3.stop(true);
-      }
-    });
-    backupStore3.start(1);
-    backupStore3.recoverLease();
-
-    // Try to trigger a command on the master (WAL lease expired on the active one)
-    TableDescriptor htd = MasterProcedureTestingUtility.createHTD(TableName.valueOf(name.getMethodName()), "f");
-    RegionInfo[] regions = ModifyRegionUtils.createRegionInfos(htd, null);
-    LOG.debug("submit proc");
-    try {
-      getMasterProcedureExecutor().submitProcedure(
-          new CreateTableProcedure(getMasterProcedureExecutor().getEnvironment(), htd, regions));
-      fail("expected RuntimeException 'sync aborted'");
-    } catch (RuntimeException e) {
-      LOG.info("got " + e.getMessage());
-    }
-    LOG.debug("wait master store abort");
-    masterStoreAbort.await();
-
-    // Now the real backup master should start up
-    LOG.debug("wait backup master to startup");
-    MasterProcedureTestingUtility.waitBackupMaster(UTIL, firstMaster);
-    assertEquals(true, firstMaster.isStopped());
-
-    // wait the store in here to abort (the test will fail due to timeout if it doesn't)
-    LOG.debug("wait the store to abort");
-    backupStore3.getStoreTracker().setDeleted(1, false);
-    try {
-      backupStore3.delete(1);
-      fail("expected RuntimeException 'sync aborted'");
-    } catch (RuntimeException e) {
-      LOG.info("got " + e.getMessage());
-    }
-    backupStore3Abort.await();
-  }
-
-  /**
-   * Tests proper fencing in case the current WAL store is fenced
-   */
-  @Test
-  public void testWALfencingWithoutWALRolling() throws IOException {
-    testWALfencing(false);
-  }
-
-  /**
-   * Tests proper fencing in case the current WAL store does not receive writes until after the
-   * new WAL does a couple of WAL rolls.
-   */
-  @Test
-  public void testWALfencingWithWALRolling() throws IOException {
-    testWALfencing(true);
-  }
-
-  public void testWALfencing(boolean walRolls) throws IOException {
-    final ProcedureStore procStore = getMasterProcedureExecutor().getStore();
-    assertTrue("expected WALStore for this test", procStore instanceof WALProcedureStore);
-
-    HMaster firstMaster = UTIL.getHBaseCluster().getMaster();
-
-    // cause WAL rolling after a delete in WAL:
-    firstMaster.getConfiguration().setLong(WALProcedureStore.ROLL_THRESHOLD_CONF_KEY, 1);
-
-    HMaster backupMaster3 = Mockito.mock(HMaster.class);
-    Mockito.doReturn(firstMaster.getConfiguration()).when(backupMaster3).getConfiguration();
-    Mockito.doReturn(true).when(backupMaster3).isActiveMaster();
-    final WALProcedureStore procStore2 = new WALProcedureStore(firstMaster.getConfiguration(),
-        ((WALProcedureStore)procStore).getWALDir(),
-        null,
-        new MasterProcedureEnv.WALStoreLeaseRecovery(backupMaster3));
-
-    // start a second store which should fence the first one out
-    LOG.info("Starting new WALProcedureStore");
-    procStore2.start(1);
-    procStore2.recoverLease();
-
-    // before writing back to the WAL store, optionally do a couple of WAL rolls (which causes
-    // to delete the old WAL files).
-    if (walRolls) {
-      LOG.info("Inserting into second WALProcedureStore, causing WAL rolls");
-      for (int i = 0; i < 512; i++) {
-        // insert something to the second store then delete it, causing a WAL roll(s)
-        Procedure proc2 = new TestProcedure(i);
-        procStore2.insert(proc2, null);
-        procStore2.delete(proc2.getProcId()); // delete the procedure so that the WAL is removed later
-      }
-    }
-
-    // Now, insert something to the first store, should fail.
-    // If the store does a WAL roll and continue with another logId without checking higher logIds
-    // it will incorrectly succeed.
-    LOG.info("Inserting into first WALProcedureStore");
-    try {
-      procStore.insert(new TestProcedure(11), null);
-      fail("Inserting into Procedure Store should have failed");
-    } catch (Exception ex) {
-      LOG.info("Received expected exception", ex);
-    }
-  }
-
-  // ==========================================================================
-  //  Helpers
-  // ==========================================================================
-  private ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
-    return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor();
-  }
-}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestHelper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestHelper.java
new file mode 100644
index 0000000..d550e7f
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestHelper.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2.store.region;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
+
+final class RegionProcedureStoreTestHelper {
+
+  private RegionProcedureStoreTestHelper() {
+  }
+
+  static RegionProcedureStore createStore(Configuration conf, ProcedureLoader loader)
+    throws IOException {
+    Server server = mock(Server.class);
+    when(server.getConfiguration()).thenReturn(conf);
+    when(server.getServerName())
+      .thenReturn(ServerName.valueOf("localhost", 12345, System.currentTimeMillis()));
+    RegionProcedureStore store = new RegionProcedureStore(server, new LeaseRecovery() {
+
+      @Override
+      public void recoverFileLease(FileSystem fs, Path path) throws IOException {
+      }
+    });
+    store.start(1);
+    store.recoverLease();
+    store.load(loader);
+    return store;
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestProcedure.java
new file mode 100644
index 0000000..f81d193
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestProcedure.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2.store.region;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.Int64Value;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
+
+public class RegionProcedureStoreTestProcedure extends NoopProcedure<Void> {
+  private static long SEQ_ID = 0;
+
+  public RegionProcedureStoreTestProcedure() {
+    setProcId(++SEQ_ID);
+  }
+
+  @Override
+  protected Procedure<Void>[] execute(Void env) {
+    return null;
+  }
+
+  @Override
+  protected void rollback(Void env) {
+  }
+
+  @Override
+  protected boolean abort(Void env) {
+    return false;
+  }
+
+  @Override
+  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
+    long procId = getProcId();
+    if (procId % 2 == 0) {
+      Int64Value.Builder builder = Int64Value.newBuilder().setValue(procId);
+      serializer.serialize(builder.build());
+    }
+  }
+
+  @Override
+  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
+    long procId = getProcId();
+    if (procId % 2 == 0) {
+      Int64Value value = serializer.deserialize(Int64Value.class);
+      assertEquals(procId, value.getValue());
+    }
+  }
+
+  public void setParent(Procedure<?> proc) {
+    super.setParentProcId(proc.getProcId());
+  }
+
+  public void finish() {
+    setState(ProcedureState.SUCCESS);
+  }
+}
\ No newline at end of file
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java
new file mode 100644
index 0000000..b3daa89
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2.store.region;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
+import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ MasterTests.class, MediumTests.class })
+public class TestRegionProcedureStore {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRegionProcedureStore.class);
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestRegionProcedureStore.class);
+
+  private HBaseCommonTestingUtility htu;
+
+  private RegionProcedureStore store;
+
+  @Before
+  public void setUp() throws IOException {
+    htu = new HBaseCommonTestingUtility();
+    htu.getConfiguration().setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
+    Path testDir = htu.getDataTestDir();
+    CommonFSUtils.setWALRootDir(htu.getConfiguration(), testDir);
+    store = RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), new LoadCounter());
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    store.stop(true);
+    htu.cleanupTestDir();
+  }
+
+  private void verifyProcIdsOnRestart(final Set<Long> procIds) throws Exception {
+    LOG.debug("expected: " + procIds);
+    LoadCounter loader = new LoadCounter();
+    ProcedureTestingUtility.storeRestart(store, true, loader);
+    assertEquals(procIds.size(), loader.getLoadedCount());
+    assertEquals(0, loader.getCorruptedCount());
+  }
+
+  @Test
+  public void testLoad() throws Exception {
+    Set<Long> procIds = new HashSet<>();
+
+    // Insert something in the log
+    RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure();
+    procIds.add(proc1.getProcId());
+    store.insert(proc1, null);
+
+    RegionProcedureStoreTestProcedure proc2 = new RegionProcedureStoreTestProcedure();
+    RegionProcedureStoreTestProcedure proc3 = new RegionProcedureStoreTestProcedure();
+    proc3.setParent(proc2);
+    RegionProcedureStoreTestProcedure proc4 = new RegionProcedureStoreTestProcedure();
+    proc4.setParent(proc2);
+
+    procIds.add(proc2.getProcId());
+    procIds.add(proc3.getProcId());
+    procIds.add(proc4.getProcId());
+    store.insert(proc2, new Procedure[] { proc3, proc4 });
+
+    // Verify that everything is there
+    verifyProcIdsOnRestart(procIds);
+
+    // Update and delete something
+    proc1.finish();
+    store.update(proc1);
+    proc4.finish();
+    store.update(proc4);
+    store.delete(proc4.getProcId());
+    procIds.remove(proc4.getProcId());
+
+    // Verify that everything is there
+    verifyProcIdsOnRestart(procIds);
+  }
+
+  @Test
+  public void testCleanup() throws Exception {
+    RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure();
+    store.insert(proc1, null);
+    RegionProcedureStoreTestProcedure proc2 = new RegionProcedureStoreTestProcedure();
+    store.insert(proc2, null);
+    RegionProcedureStoreTestProcedure proc3 = new RegionProcedureStoreTestProcedure();
+    store.insert(proc3, null);
+    LoadCounter loader = new LoadCounter();
+    store.load(loader);
+    assertEquals(proc3.getProcId(), loader.getMaxProcId());
+    assertEquals(3, loader.getRunnableCount());
+
+    store.delete(proc3.getProcId());
+    store.delete(proc2.getProcId());
+    loader = new LoadCounter();
+    store.load(loader);
+    assertEquals(proc3.getProcId(), loader.getMaxProcId());
+    assertEquals(1, loader.getRunnableCount());
+
+    // the row should still be there
+    assertTrue(store.region
+      .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
+    assertTrue(store.region
+      .get(new Get(Bytes.toBytes(proc2.getProcId())).setCheckExistenceOnly(true)).getExists());
+
+    // proc2 will be deleted after cleanup, but proc3 should still be there as it holds the max proc
+    // id
+    store.cleanup();
+    assertTrue(store.region
+      .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
+    assertFalse(store.region
+      .get(new Get(Bytes.toBytes(proc2.getProcId())).setCheckExistenceOnly(true)).getExists());
+
+    RegionProcedureStoreTestProcedure proc4 = new RegionProcedureStoreTestProcedure();
+    store.insert(proc4, null);
+    store.cleanup();
+    // proc3 should also be deleted as now proc4 holds the max proc id
+    assertFalse(store.region
+      .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
+  }
+}
\ No newline at end of file
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreMigration.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreMigration.java
new file mode 100644
index 0000000..475ae59
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreMigration.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2.store.region;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
+import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
+import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
+import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@SuppressWarnings("deprecation")
+@Category({ MasterTests.class, MediumTests.class })
+public class TestRegionProcedureStoreMigration {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRegionProcedureStoreMigration.class);
+
+  private HBaseCommonTestingUtility htu;
+
+  private RegionProcedureStore store;
+
+  private WALProcedureStore walStore;
+
+  @Before
+  public void setUp() throws IOException {
+    htu = new HBaseCommonTestingUtility();
+    Configuration conf = htu.getConfiguration();
+    conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
+    Path testDir = htu.getDataTestDir();
+    CommonFSUtils.setWALRootDir(conf, testDir);
+    walStore = new WALProcedureStore(conf, new LeaseRecovery() {
+
+      @Override
+      public void recoverFileLease(FileSystem fs, Path path) throws IOException {
+      }
+    });
+    walStore.start(1);
+    walStore.recoverLease();
+    walStore.load(new LoadCounter());
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (store != null) {
+      store.stop(true);
+    }
+    walStore.stop(true);
+    htu.cleanupTestDir();
+  }
+
+  @Test
+  public void test() throws IOException {
+    List<RegionProcedureStoreTestProcedure> procs = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      RegionProcedureStoreTestProcedure proc = new RegionProcedureStoreTestProcedure();
+      walStore.insert(proc, null);
+      procs.add(proc);
+    }
+    for (int i = 5; i < 10; i++) {
+      walStore.delete(procs.get(i).getProcId());
+    }
+    walStore.stop(true);
+    SortedSet<RegionProcedureStoreTestProcedure> loadedProcs =
+      new TreeSet<>((p1, p2) -> Long.compare(p1.getProcId(), p2.getProcId()));
+    MutableLong maxProcIdSet = new MutableLong(0);
+    store =
+      RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), new ProcedureLoader() {
+
+        @Override
+        public void setMaxProcId(long maxProcId) {
+          maxProcIdSet.setValue(maxProcId);
+        }
+
+        @Override
+        public void load(ProcedureIterator procIter) throws IOException {
+          while (procIter.hasNext()) {
+            RegionProcedureStoreTestProcedure proc =
+              (RegionProcedureStoreTestProcedure) procIter.next();
+            loadedProcs.add(proc);
+          }
+        }
+
+        @Override
+        public void handleCorrupted(ProcedureIterator procIter) throws IOException {
+          if (procIter.hasNext()) {
+            fail("Found corrupted procedures");
+          }
+        }
+      });
+    assertEquals(10, maxProcIdSet.longValue());
+    assertEquals(5, loadedProcs.size());
+    int procId = 1;
+    for (RegionProcedureStoreTestProcedure proc : loadedProcs) {
+      assertEquals(procId, proc.getProcId());
+      procId++;
+    }
+    Path testDir = htu.getDataTestDir();
+    FileSystem fs = testDir.getFileSystem(htu.getConfiguration());
+    Path oldProcWALDir = new Path(testDir, WALProcedureStore.MASTER_PROCEDURE_LOGDIR);
+    // make sure the old proc wal directory has been deleted.
+    assertFalse(fs.exists(oldProcWALDir));
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreWALCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreWALCleaner.java
new file mode 100644
index 0000000..826c763
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreWALCleaner.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2.store.region;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
+import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
+import org.apache.hadoop.hbase.master.cleaner.TimeToLiveProcedureWALCleaner;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
+import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MasterTests.class, MediumTests.class })
+public class TestRegionProcedureStoreWALCleaner {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRegionProcedureStoreWALCleaner.class);
+
+  private HBaseCommonTestingUtility htu;
+
+  private FileSystem fs;
+
+  private RegionProcedureStore store;
+
+  private ChoreService choreService;
+
+  private DirScanPool dirScanPool;
+
+  private LogCleaner logCleaner;
+
+  private Path globalWALArchiveDir;
+
+  @Before
+  public void setUp() throws IOException {
+    htu = new HBaseCommonTestingUtility();
+    Configuration conf = htu.getConfiguration();
+    conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
+    Path testDir = htu.getDataTestDir();
+    fs = testDir.getFileSystem(conf);
+    CommonFSUtils.setWALRootDir(conf, testDir);
+    globalWALArchiveDir = new Path(testDir, HConstants.HREGION_OLDLOGDIR_NAME);
+    choreService = new ChoreService("Region-Procedure-Store");
+    dirScanPool = new DirScanPool(conf);
+    conf.setLong(TimeToLiveProcedureWALCleaner.TTL_CONF_KEY, 5000);
+    conf.setInt(HMaster.HBASE_MASTER_CLEANER_INTERVAL, 1000);
+    logCleaner = new LogCleaner(1000, new Stoppable() {
+
+      private volatile boolean stopped = false;
+
+      @Override
+      public void stop(String why) {
+        stopped = true;
+      }
+
+      @Override
+      public boolean isStopped() {
+        return stopped;
+      }
+    }, conf, fs, globalWALArchiveDir, dirScanPool);
+    choreService.scheduleChore(logCleaner);
+    store = RegionProcedureStoreTestHelper.createStore(conf, new LoadCounter());
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    store.stop(true);
+    logCleaner.cancel();
+    dirScanPool.shutdownNow();
+    choreService.shutdown();
+    htu.cleanupTestDir();
+  }
+
+  @Test
+  public void test() throws IOException, InterruptedException {
+    RegionProcedureStoreTestProcedure proc = new RegionProcedureStoreTestProcedure();
+    store.insert(proc, null);
+    store.region.flush(true);
+    // no archived wal files yet
+    assertFalse(fs.exists(globalWALArchiveDir));
+    store.walRoller.requestRollAll();
+    store.walRoller.waitUntilWalRollFinished();
+    // should have one
+    FileStatus[] files = fs.listStatus(globalWALArchiveDir);
+    assertEquals(1, files.length);
+    Thread.sleep(2000);
+    // should still be there
+    assertTrue(fs.exists(files[0].getPath()));
+    Thread.sleep(6000);
+    // should have been cleaned
+    assertEquals(0, fs.listStatus(globalWALArchiveDir).length);
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerCrashDisableWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerCrashDisableWAL.java
index c4571ac..208fa98 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerCrashDisableWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerCrashDisableWAL.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.wal.WALFactory;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -59,7 +60,7 @@ public class TestRegionServerCrashDisableWAL {
   @BeforeClass
   public static void setUp() throws Exception {
     UTIL.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
-    UTIL.getConfiguration().setBoolean("hbase.regionserver.hlog.enabled", false);
+    UTIL.getConfiguration().setBoolean(WALFactory.WAL_ENABLED, false);
     UTIL.startMiniCluster(2);
     UTIL.createTable(TABLE_NAME, CF);
     UTIL.waitTableAvailable(TABLE_NAME);