You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2020/05/23 08:44:49 UTC

[hbase] branch branch-2.3 updated: HBASE-24408 Introduce a general 'local region' to store data on master (#1753)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
     new 9f69ad4  HBASE-24408 Introduce a general 'local region' to store data on master (#1753)
9f69ad4 is described below

commit 9f69ad474655a70c7d3989e4d6bbfc550b347a0a
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Sat May 23 15:59:51 2020 +0800

    HBASE-24408 Introduce a general 'local region' to store data on master (#1753)
    
    Signed-off-by: stack <st...@apache.org>
---
 hbase-common/src/main/resources/hbase-default.xml  |  22 +-
 .../hbase/procedure2/store/ProcedureStore.java     |   5 +
 .../java/org/apache/hadoop/hbase/io/FileLink.java  |   5 +-
 .../java/org/apache/hadoop/hbase/io/HFileLink.java |   4 +-
 .../org/apache/hadoop/hbase/master/HMaster.java    |  21 +-
 ...Cleaner.java => BaseTimeToLiveFileCleaner.java} |  72 +++--
 .../hadoop/hbase/master/cleaner/HFileCleaner.java  |   8 +-
 .../hadoop/hbase/master/cleaner/LogCleaner.java    |   6 +-
 .../master/cleaner/TimeToLiveHFileCleaner.java     |  37 +--
 .../hbase/master/cleaner/TimeToLiveLogCleaner.java |  59 +---
 .../TimeToLiveMasterLocalStoreHFileCleaner.java    |  48 +++
 .../TimeToLiveMasterLocalStoreWALCleaner.java      |  47 +++
 .../cleaner/TimeToLiveProcedureWALCleaner.java     |  56 +---
 .../master/procedure/MasterProcedureUtil.java      |   5 +-
 .../hadoop/hbase/master/store/LocalRegion.java     | 325 +++++++++++++++++++++
 .../store/LocalRegionFlusherAndCompactor.java}     |  92 +++---
 .../hbase/master/store/LocalRegionParams.java      | 172 +++++++++++
 .../hbase/master/store/LocalRegionUtils.java       |  56 ++++
 .../store/LocalRegionWALRoller.java}               |  64 ++--
 .../hadoop/hbase/master/store/LocalStore.java      | 151 ++++++++++
 .../hbase/master/store/UpdateLocalRegion.java      |  29 ++
 .../store/region/HFileProcedurePrettyPrinter.java  |   8 +-
 .../store/region/RegionProcedureStore.java         | 295 ++-----------------
 .../store/region/WALProcedurePrettyPrinter.java    |   6 +-
 .../apache/hadoop/hbase/util/HFileArchiveUtil.java |  40 +--
 .../hbase/master/store/LocalRegionTestBase.java    | 109 +++++++
 .../master/store/TestLocalRegionCompaction.java    | 144 +++++++++
 .../store/TestLocalRegionFlush.java}               |  29 +-
 .../store/TestLocalRegionOnTwoFileSystems.java     | 216 ++++++++++++++
 .../store/TestLocalRegionWALCleaner.java}          |  70 ++---
 .../RegionProcedureStorePerformanceEvaluation.java |  20 +-
 .../store/region/RegionProcedureStoreTestBase.java |  35 +--
 .../region/RegionProcedureStoreTestHelper.java     |  14 +-
 .../region/TestHFileProcedurePrettyPrinter.java    |  10 +-
 .../store/region/TestRegionProcedureStore.java     |  12 +-
 .../region/TestRegionProcedureStoreCompaction.java | 102 -------
 .../region/TestRegionProcedureStoreMigration.java  |  61 ++--
 .../region/TestWALProcedurePrettyPrinter.java      |   9 +-
 38 files changed, 1652 insertions(+), 812 deletions(-)

diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 76fc7e7..8f93ae0 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -125,7 +125,7 @@ possible configurations would overwhelm and obscure the important.
   </property>
   <property>
     <name>hbase.master.logcleaner.plugins</name>
-    <value>org.apache.hadoop.hbase.master.cleaner.TimeToLiveLogCleaner,org.apache.hadoop.hbase.master.cleaner.TimeToLiveProcedureWALCleaner</value>
+    <value>org.apache.hadoop.hbase.master.cleaner.TimeToLiveLogCleaner,org.apache.hadoop.hbase.master.cleaner.TimeToLiveProcedureWALCleaner,org.apache.hadoop.hbase.master.cleaner.TimeToLiveMasterLocalStoreWALCleaner</value>
     <description>A comma-separated list of BaseLogCleanerDelegate invoked by
     the LogsCleaner service. These WAL cleaners are called in order,
     so put the cleaner that prunes the most files in front. To
@@ -140,15 +140,8 @@ possible configurations would overwhelm and obscure the important.
     after which it will be cleaned by a Master thread. The value is in milliseconds.</description>
   </property>
   <property>
-    <name>hbase.master.procedurewalcleaner.ttl</name>
-    <value>604800000</value>
-    <description>How long a Procedure WAL will remain in the
-    archive directory, after which it will be cleaned
-    by a Master thread. The value is in milliseconds.</description>
-  </property>
-  <property>
     <name>hbase.master.hfilecleaner.plugins</name>
-    <value>org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner</value>
+    <value>org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner,org.apache.hadoop.hbase.master.cleaner.TimeToLiveMasterLocalStoreHFileCleaner</value>
     <description>A comma-separated list of BaseHFileCleanerDelegate invoked by
     the HFileCleaner service. These HFiles cleaners are called in order,
     so put the cleaner that prunes the most files in front. To
@@ -158,17 +151,6 @@ possible configurations would overwhelm and obscure the important.
     hbase-site.xml.</description>
   </property>
   <property>
-    <name>hbase.procedure.store.region.hfilecleaner.plugins</name>
-    <value>org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner</value>
-    <description>A comma-separated list of BaseHFileCleanerDelegate invoked by
-    the RegionProcedureStore HFileCleaner service. These HFiles cleaners are
-    called in order, so put the cleaner that prunes the most files in front. To
-    implement your own BaseHFileCleanerDelegate, just put it in HBase's classpath
-    and add the fully qualified class name here. Always add the above
-    default hfile cleaners in the list as they will be overwritten in
-    hbase-site.xml.</description>
-  </property>
-  <property>
     <name>hbase.master.infoserver.redirect</name>
     <value>true</value>
     <description>Whether or not the Master listens to the Master web
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
index 4965e17..c1eaa73 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
@@ -186,7 +186,12 @@ public interface ProcedureStore {
 
   /**
    * Acquire the lease for the procedure store.
+   * @deprecated since 2.3.0, will be removed in 4.0.0 along with
+   *             {@link org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore}. As now we
+   *             will store the procedure data in a master local region, and master itself will deal
+   *             with the lease recovery of the region.
    */
+  @Deprecated
   void recoverLease() throws IOException;
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
index 1e7e93b..ba84606 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
@@ -521,12 +521,13 @@ public class FileLink {
 
   /**
    * Checks if the specified directory path is a back reference links folder.
-   *
    * @param dirPath Directory path to verify
    * @return True if the specified directory is a link references folder
    */
   public static boolean isBackReferencesDir(final Path dirPath) {
-    if (dirPath == null) return false;
+    if (dirPath == null) {
+      return false;
+    }
     return dirPath.getName().startsWith(BACK_REFERENCES_DIRECTORY_PREFIX);
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java
index cc723ee..e2c80b2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java
@@ -209,7 +209,9 @@ public class HFileLink extends FileLink {
    */
   public static boolean isHFileLink(String fileName) {
     Matcher m = LINK_NAME_PATTERN.matcher(fileName);
-    if (!m.matches()) return false;
+    if (!m.matches()) {
+      return false;
+    }
     return m.groupCount() > 2 && m.group(4) != null && m.group(3) != null && m.group(2) != null;
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 26f0092..58dfb8a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -145,6 +145,7 @@ import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
 import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
 import org.apache.hadoop.hbase.master.slowlog.SlowLogMasterService;
 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
+import org.apache.hadoop.hbase.master.store.LocalStore;
 import org.apache.hadoop.hbase.master.zksyncer.MasterAddressSyncer;
 import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
 import org.apache.hadoop.hbase.mob.MobConstants;
@@ -434,6 +435,9 @@ public class HMaster extends HRegionServer implements MasterServices {
   private ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
   private ProcedureStore procedureStore;
 
+  // the master local storage to store procedure data, etc.
+  private LocalStore localStore;
+
   // handle table states
   private TableStateManager tableStateManager;
 
@@ -887,7 +891,8 @@ public class HMaster extends HRegionServer implements MasterServices {
     this.masterActiveTime = System.currentTimeMillis();
     // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
 
-    // always initialize the MemStoreLAB as we use a region to store procedure now.
+    // always initialize the MemStoreLAB as we use a region to store data in master now, see
+    // localStore.
     initializeMemStoreChunkCreator();
     this.fileSystemManager = new MasterFileSystem(conf);
     this.walManager = new MasterWalManager(this);
@@ -930,6 +935,9 @@ public class HMaster extends HRegionServer implements MasterServices {
       DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
       this.splitWALManager = new SplitWALManager(this);
     }
+
+    // initialize local store
+    localStore = LocalStore.create(this);
     createProcedureExecutor();
     Map<Class<?>, List<Procedure<MasterProcedureEnv>>> procsByType =
       procedureExecutor.getActiveProceduresNoCopy().stream()
@@ -1412,6 +1420,8 @@ public class HMaster extends HRegionServer implements MasterServices {
     this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
     startProcedureExecutor();
 
+    // Create cleaner thread pool
+    cleanerPool = new DirScanPool(conf);
     // Start log cleaner thread
     int cleanerInterval =
       conf.getInt(HBASE_MASTER_CLEANER_INTERVAL, DEFAULT_HBASE_MASTER_CLEANER_INTERVAL);
@@ -1501,6 +1511,9 @@ public class HMaster extends HRegionServer implements MasterServices {
 
     stopProcedureExecutor();
 
+    if (localStore != null) {
+      localStore.close(isAborted());
+    }
     if (this.walManager != null) {
       this.walManager.stop();
     }
@@ -1517,10 +1530,8 @@ public class HMaster extends HRegionServer implements MasterServices {
 
   private void createProcedureExecutor() throws IOException {
     MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
-    // Create cleaner thread pool
-    cleanerPool = new DirScanPool(conf);
-    procedureStore = new RegionProcedureStore(this, cleanerPool,
-      new MasterProcedureEnv.FsUtilsLeaseRecovery(this));
+    procedureStore =
+      new RegionProcedureStore(this, localStore, new MasterProcedureEnv.FsUtilsLeaseRecovery(this));
     procedureStore.registerListener(new ProcedureStoreListener() {
 
       @Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseTimeToLiveFileCleaner.java
similarity index 55%
copy from hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveLogCleaner.java
copy to hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseTimeToLiveFileCleaner.java
index 670bd88..28f95a8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveLogCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseTimeToLiveFileCleaner.java
@@ -17,57 +17,63 @@
  */
 package org.apache.hadoop.hbase.master.cleaner;
 
-import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * Log cleaner that uses the timestamp of the wal to determine if it should
- * be deleted. By default they are allowed to live for 10 minutes.
+ * Base class for time to live file cleaner.
  */
-@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class TimeToLiveLogCleaner extends BaseLogCleanerDelegate {
-  private static final Logger LOG = LoggerFactory.getLogger(TimeToLiveLogCleaner.class.getName());
-  public static final String TTL_CONF_KEY = "hbase.master.logcleaner.ttl";
-  // default ttl = 10 minutes
-  public static final long DEFAULT_TTL = 600_000L;
+@InterfaceAudience.Private
+public abstract class BaseTimeToLiveFileCleaner extends BaseLogCleanerDelegate {
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(BaseTimeToLiveFileCleaner.class.getName());
+
+  private static final DateTimeFormatter FORMATTER =
+    DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneOffset.systemDefault());
+
   // Configured time a log can be kept after it was closed
-  private long ttl;
-  private boolean stopped = false;
+  private long ttlMs;
+
+  private volatile boolean stopped = false;
 
   @Override
-  public boolean isFileDeletable(FileStatus fStat) {
+  public final void setConf(Configuration conf) {
+    super.setConf(conf);
+    this.ttlMs = getTtlMs(conf);
+  }
+
+  @Override
+  public boolean isFileDeletable(FileStatus status) {
     // Files are validated for the second time here,
     // if it causes a bottleneck this logic needs refactored
-    if (!AbstractFSWALProvider.validateWALFilename(fStat.getPath().getName())) {
+    if (!valiateFilename(status.getPath())) {
       return true;
     }
-
     long currentTime = EnvironmentEdgeManager.currentTime();
-    long time = fStat.getModificationTime();
+    long time = status.getModificationTime();
     long life = currentTime - time;
 
     if (LOG.isTraceEnabled()) {
-      LOG.trace("Log life:" + life + ", ttl:" + ttl + ", current:" + currentTime + ", from: "
-          + time);
+      LOG.trace("File life:{}ms, ttl:{}ms, current:{}, from{}", life, ttlMs,
+        FORMATTER.format(Instant.ofEpochMilli(currentTime)),
+        FORMATTER.format(Instant.ofEpochMilli(time)));
     }
     if (life < 0) {
-      LOG.warn("Found a log (" + fStat.getPath() + ") newer than current time (" + currentTime
-          + " < " + time + "), probably a clock skew");
+      LOG.warn("Found a file ({}) newer than current time ({} < {}), probably a clock skew",
+        status.getPath(), FORMATTER.format(Instant.ofEpochMilli(currentTime)),
+        FORMATTER.format(Instant.ofEpochMilli(time)));
       return false;
     }
-    return life > ttl;
-  }
-
-  @Override
-  public void setConf(Configuration conf) {
-    super.setConf(conf);
-    this.ttl = conf.getLong(TTL_CONF_KEY, DEFAULT_TTL);
+    return life > ttlMs;
   }
 
   @Override
@@ -79,4 +85,8 @@ public class TimeToLiveLogCleaner extends BaseLogCleanerDelegate {
   public boolean isStopped() {
     return this.stopped;
   }
-}
+
+  protected abstract long getTtlMs(Configuration conf);
+
+  protected abstract boolean valiateFilename(Path file);
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
index 0ddc882..eb5f1c1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.master.store.LocalStore;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.hadoop.hbase.util.StealJobQueue;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -158,10 +159,9 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate>
 
   @Override
   protected boolean validate(Path file) {
-    if (HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent())) {
-      return true;
-    }
-    return StoreFileInfo.validateStoreFileName(file.getName());
+    return HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent()) ||
+      StoreFileInfo.validateStoreFileName(file.getName()) ||
+      file.getName().endsWith(LocalStore.ARCHIVED_HFILE_SUFFIX);
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
index 38e0e7b..c0e071e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
+import org.apache.hadoop.hbase.master.store.LocalStore;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -86,8 +87,9 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate>
 
   @Override
   protected boolean validate(Path file) {
-    return AbstractFSWALProvider.validateWALFilename(file.getName())
-        || MasterProcedureUtil.validateProcedureWALFilename(file.getName());
+    return AbstractFSWALProvider.validateWALFilename(file.getName()) ||
+      MasterProcedureUtil.validateProcedureWALFilename(file.getName()) ||
+      file.getName().endsWith(LocalStore.ARCHIVED_WAL_SUFFIX);
   }
 
   @Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveHFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveHFileCleaner.java
index e789752..4424eb8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveHFileCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveHFileCleaner.java
@@ -17,48 +17,33 @@
  */
 package org.apache.hadoop.hbase.master.cleaner;
 
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * HFile cleaner that uses the timestamp of the hfile to determine if it should be deleted. By
  * default they are allowed to live for {@value #DEFAULT_TTL}
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class TimeToLiveHFileCleaner extends BaseHFileCleanerDelegate {
+public class TimeToLiveHFileCleaner extends BaseTimeToLiveFileCleaner {
 
-  private static final Logger LOG = LoggerFactory.getLogger(TimeToLiveHFileCleaner.class.getName());
   public static final String TTL_CONF_KEY = "hbase.master.hfilecleaner.ttl";
+
   // default ttl = 5 minutes
   public static final long DEFAULT_TTL = 60000 * 5;
-  // Configured time a hfile can be kept after it was moved to the archive
-  private long ttl;
 
   @Override
-  public void setConf(Configuration conf) {
-    this.ttl = conf.getLong(TTL_CONF_KEY, DEFAULT_TTL);
-    super.setConf(conf);
+  protected long getTtlMs(Configuration conf) {
+    return conf.getLong(TTL_CONF_KEY, DEFAULT_TTL);
   }
 
   @Override
-  public boolean isFileDeletable(FileStatus fStat) {
-    long currentTime = EnvironmentEdgeManager.currentTime();
-    long time = fStat.getModificationTime();
-    long life = currentTime - time;
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("HFile life:" + life + ", ttl:" + ttl + ", current:" + currentTime + ", from: "
-          + time);
-    }
-    if (life < 0) {
-      LOG.warn("Found a hfile (" + fStat.getPath() + ") newer than current time (" + currentTime
-          + " < " + time + "), probably a clock skew");
-      return false;
-    }
-    return life > ttl;
+  protected boolean valiateFilename(Path file) {
+    return HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent()) ||
+      StoreFileInfo.validateStoreFileName(file.getName());
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveLogCleaner.java
index 670bd88..75fe857 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveLogCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveLogCleaner.java
@@ -17,66 +17,31 @@
  */
 package org.apache.hadoop.hbase.master.cleaner;
 
-import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
- * Log cleaner that uses the timestamp of the wal to determine if it should
- * be deleted. By default they are allowed to live for 10 minutes.
+ * Log cleaner that uses the timestamp of the wal to determine if it should be deleted. By default
+ * they are allowed to live for 10 minutes.
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class TimeToLiveLogCleaner extends BaseLogCleanerDelegate {
-  private static final Logger LOG = LoggerFactory.getLogger(TimeToLiveLogCleaner.class.getName());
+public class TimeToLiveLogCleaner extends BaseTimeToLiveFileCleaner {
+
   public static final String TTL_CONF_KEY = "hbase.master.logcleaner.ttl";
+
   // default ttl = 10 minutes
   public static final long DEFAULT_TTL = 600_000L;
-  // Configured time a log can be kept after it was closed
-  private long ttl;
-  private boolean stopped = false;
-
-  @Override
-  public boolean isFileDeletable(FileStatus fStat) {
-    // Files are validated for the second time here,
-    // if it causes a bottleneck this logic needs refactored
-    if (!AbstractFSWALProvider.validateWALFilename(fStat.getPath().getName())) {
-      return true;
-    }
-
-    long currentTime = EnvironmentEdgeManager.currentTime();
-    long time = fStat.getModificationTime();
-    long life = currentTime - time;
-
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Log life:" + life + ", ttl:" + ttl + ", current:" + currentTime + ", from: "
-          + time);
-    }
-    if (life < 0) {
-      LOG.warn("Found a log (" + fStat.getPath() + ") newer than current time (" + currentTime
-          + " < " + time + "), probably a clock skew");
-      return false;
-    }
-    return life > ttl;
-  }
-
-  @Override
-  public void setConf(Configuration conf) {
-    super.setConf(conf);
-    this.ttl = conf.getLong(TTL_CONF_KEY, DEFAULT_TTL);
-  }
 
   @Override
-  public void stop(String why) {
-    this.stopped = true;
+  protected long getTtlMs(Configuration conf) {
+    return conf.getLong(TTL_CONF_KEY, DEFAULT_TTL);
   }
 
   @Override
-  public boolean isStopped() {
-    return this.stopped;
+  protected boolean valiateFilename(Path file) {
+    return AbstractFSWALProvider.validateWALFilename(file.getName());
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveMasterLocalStoreHFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveMasterLocalStoreHFileCleaner.java
new file mode 100644
index 0000000..843361c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveMasterLocalStoreHFileCleaner.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.cleaner;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.master.store.LocalStore;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Master local storage HFile cleaner that uses the timestamp of the HFile to determine if it should
+ * be deleted. By default they are allowed to live for {@value #DEFAULT_TTL}
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class TimeToLiveMasterLocalStoreHFileCleaner extends BaseTimeToLiveFileCleaner {
+
+  public static final String TTL_CONF_KEY = "hbase.master.local.store.hfilecleaner.ttl";
+
+  // default ttl = 7 days
+  public static final long DEFAULT_TTL = 604_800_000L;
+
+  @Override
+  protected long getTtlMs(Configuration conf) {
+    return conf.getLong(TTL_CONF_KEY, DEFAULT_TTL);
+  }
+
+  @Override
+  protected boolean valiateFilename(Path file) {
+    return file.getName().endsWith(LocalStore.ARCHIVED_HFILE_SUFFIX);
+  }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveMasterLocalStoreWALCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveMasterLocalStoreWALCleaner.java
new file mode 100644
index 0000000..e7f6147
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveMasterLocalStoreWALCleaner.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.cleaner;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.master.store.LocalStore;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Master local storage WAL cleaner that uses the timestamp of the WAL file to determine if it
+ * should be deleted. By default they are allowed to live for {@value #DEFAULT_TTL}
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class TimeToLiveMasterLocalStoreWALCleaner extends BaseTimeToLiveFileCleaner {
+
+  public static final String TTL_CONF_KEY = "hbase.master.local.store.walcleaner.ttl";
+
+  // default ttl = 7 days
+  public static final long DEFAULT_TTL = 604_800_000L;
+
+  @Override
+  protected long getTtlMs(Configuration conf) {
+    return conf.getLong(TTL_CONF_KEY, DEFAULT_TTL);
+  }
+
+  @Override
+  protected boolean valiateFilename(Path file) {
+    return file.getName().endsWith(LocalStore.ARCHIVED_WAL_SUFFIX);
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveProcedureWALCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveProcedureWALCleaner.java
index 5535a4b..ece9f77 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveProcedureWALCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveProcedureWALCleaner.java
@@ -18,65 +18,33 @@
 package org.apache.hadoop.hbase.master.cleaner;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Procedure WAL cleaner that uses the timestamp of the Procedure WAL to determine if it should be
- * deleted. By default they are allowed to live for {@value #DEFAULT_TTL}
+ * deleted. By default they are allowed to live for {@value #DEFAULT_TTL}.
+ * @deprecated Since 2.3.0, will be removed in 4.0.0. We will not use the procedure wal to store
+ *             procedure any more.
  */
+@Deprecated
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class TimeToLiveProcedureWALCleaner extends BaseLogCleanerDelegate {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(TimeToLiveProcedureWALCleaner.class.getName());
+public class TimeToLiveProcedureWALCleaner extends BaseTimeToLiveFileCleaner {
+
   public static final String TTL_CONF_KEY = "hbase.master.procedurewalcleaner.ttl";
+
   // default ttl = 7 days
   public static final long DEFAULT_TTL = 604_800_000L;
-  // Configured time a procedure log can be kept after it was moved to the archive
-  private long ttl;
-  private boolean stopped = false;
-
-  @Override
-  public void setConf(Configuration conf) {
-    this.ttl = conf.getLong(TTL_CONF_KEY, DEFAULT_TTL);
-    super.setConf(conf);
-  }
-
-  @Override
-  public boolean isFileDeletable(FileStatus fStat) {
-    // Files are validated for the second time here,
-    // if it causes a bottleneck this logic needs refactored
-    if (!MasterProcedureUtil.validateProcedureWALFilename(fStat.getPath().getName())) {
-      return true;
-    }
-
-    long currentTime = EnvironmentEdgeManager.currentTime();
-    long time = fStat.getModificationTime();
-    long life = currentTime - time;
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Procedure log life:" + life + ", ttl:" + ttl + ", current:" + currentTime +
-          ", from: " + time);
-    }
-    if (life < 0) {
-      LOG.warn("Found a procedure log (" + fStat.getPath() + ") newer than current time ("
-          + currentTime + " < " + time + "), probably a clock skew");
-      return false;
-    }
-    return life > ttl;
-  }
 
   @Override
-  public void stop(String why) {
-    this.stopped = true;
+  protected long getTtlMs(Configuration conf) {
+    return conf.getLong(TTL_CONF_KEY, DEFAULT_TTL);
   }
 
   @Override
-  public boolean isStopped() {
-    return this.stopped;
+  protected boolean valiateFilename(Path file) {
+    return MasterProcedureUtil.validateProcedureWALFilename(file.getName());
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
index 49bf5c8..23d6ecb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
@@ -152,16 +152,13 @@ public final class MasterProcedureUtil {
   @Deprecated
   private static final Pattern PATTERN = Pattern.compile(".*pv2-\\d{20}.log");
 
-  // Use the character $ to let the log cleaner know that this is not the normal wal file.
-  public static final String ARCHIVED_PROC_WAL_SUFFIX = "$masterproc$";
-
   /**
    * A Procedure WAL file name is of the format: pv-&lt;wal-id&gt;.log where wal-id is 20 digits.
    * @param filename name of the file to validate
    * @return <tt>true</tt> if the filename matches a Procedure WAL, <tt>false</tt> otherwise
    */
   public static boolean validateProcedureWALFilename(String filename) {
-    return PATTERN.matcher(filename).matches() || filename.endsWith(ARCHIVED_PROC_WAL_SUFFIX);
+    return PATTERN.matcher(filename).matches();
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalRegion.java
new file mode 100644
index 0000000..506de30
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalRegion.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.store;
+
+import static org.apache.hadoop.hbase.HConstants.HREGION_LOGDIR_NAME;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.math.IntMath;
+
+/**
+ * A region that stores data in a separated directory.
+ * <p/>
+ * FileSystem layout:
+ *
+ * <pre>
+ * hbase
+ *   |
+ *   --&lt;region dir&gt;
+ *       |
+ *       --data
+ *       |  |
+ *       |  --/&lt;ns&gt/&lt;table&gt/&lt;encoded-region-name&gt; <---- The region data
+ *       |      |
+ *       |      --replay <---- The edits to replay
+ *       |
+ *       --WALs
+ *          |
+ *          --&lt;master-server-name&gt; <---- The WAL dir for active master
+ *          |
+ *          --&lt;master-server-name&gt;-dead <---- The WAL dir for dead master
+ * </pre>
+ *
+ * Notice that, you can use different root file system and WAL file system. Then the above directory
+ * will be on two file systems, the root file system will have the data directory while the WAL
+ * filesystem will have the WALs directory. The archived HFile will be moved to the global HFile
+ * archived directory with the {@link LocalRegionParams#archivedWalSuffix()} suffix. The archived
+ * WAL will be moved to the global WAL archived directory with the
+ * {@link LocalRegionParams#archivedHFileSuffix()} suffix.
+ */
+@InterfaceAudience.Private
+public final class LocalRegion {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LocalRegion.class);
+
+  private static final String REPLAY_EDITS_DIR = "recovered.wals";
+
+  private static final String DEAD_WAL_DIR_SUFFIX = "-dead";
+
+  private static final int REGION_ID = 1;
+
+  private final WALFactory walFactory;
+
+  @VisibleForTesting
+  final HRegion region;
+
+  @VisibleForTesting
+  final LocalRegionFlusherAndCompactor flusherAndCompactor;
+
+  private LocalRegionWALRoller walRoller;
+
+  private LocalRegion(HRegion region, WALFactory walFactory,
+    LocalRegionFlusherAndCompactor flusherAndCompactor, LocalRegionWALRoller walRoller) {
+    this.region = region;
+    this.walFactory = walFactory;
+    this.flusherAndCompactor = flusherAndCompactor;
+    this.walRoller = walRoller;
+  }
+
+  private void closeRegion(boolean abort) {
+    try {
+      region.close(abort);
+    } catch (IOException e) {
+      LOG.warn("Failed to close region", e);
+    }
+  }
+
+  private void shutdownWAL() {
+    try {
+      walFactory.shutdown();
+    } catch (IOException e) {
+      LOG.warn("Failed to shutdown WAL", e);
+    }
+  }
+
+  public void update(UpdateLocalRegion action) throws IOException {
+    action.update(region);
+    flusherAndCompactor.onUpdate();
+  }
+
+  public Result get(Get get) throws IOException {
+    return region.get(get);
+  }
+
+  public RegionScanner getScanner(Scan scan) throws IOException {
+    return region.getScanner(scan);
+  }
+
+  @VisibleForTesting
+  FlushResult flush(boolean force) throws IOException {
+    return region.flush(force);
+  }
+
+  @VisibleForTesting
+  void requestRollAll() {
+    walRoller.requestRollAll();
+  }
+
+  @VisibleForTesting
+  void waitUntilWalRollFinished() throws InterruptedException {
+    walRoller.waitUntilWalRollFinished();
+  }
+
+  public void close(boolean abort) {
+    LOG.info("Closing local region {}, isAbort={}", region.getRegionInfo(), abort);
+    if (flusherAndCompactor != null) {
+      flusherAndCompactor.close();
+    }
+    // if abort, we shutdown wal first to fail the ongoing updates to the region, and then close the
+    // region, otherwise there will be dead lock.
+    if (abort) {
+      shutdownWAL();
+      closeRegion(true);
+    } else {
+      closeRegion(false);
+      shutdownWAL();
+    }
+
+    if (walRoller != null) {
+      walRoller.close();
+    }
+  }
+
+  private static WAL createWAL(WALFactory walFactory, LocalRegionWALRoller walRoller,
+    String serverName, FileSystem walFs, Path walRootDir, RegionInfo regionInfo)
+    throws IOException {
+    String logName = AbstractFSWALProvider.getWALDirectoryName(serverName);
+    Path walDir = new Path(walRootDir, logName);
+    LOG.debug("WALDir={}", walDir);
+    if (walFs.exists(walDir)) {
+      throw new HBaseIOException(
+        "Already created wal directory at " + walDir + " for local region " + regionInfo);
+    }
+    if (!walFs.mkdirs(walDir)) {
+      throw new IOException(
+        "Can not create wal directory " + walDir + " for local region " + regionInfo);
+    }
+    WAL wal = walFactory.getWAL(regionInfo);
+    walRoller.addWAL(wal);
+    return wal;
+  }
+
+  private static HRegion bootstrap(Configuration conf, TableDescriptor td, FileSystem fs,
+    Path rootDir, FileSystem walFs, Path walRootDir, WALFactory walFactory,
+    LocalRegionWALRoller walRoller, String serverName) throws IOException {
+    TableName tn = td.getTableName();
+    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setRegionId(REGION_ID).build();
+    Path tmpTableDir = CommonFSUtils.getTableDir(rootDir,
+      TableName.valueOf(tn.getNamespaceAsString(), tn.getQualifierAsString() + "-tmp"));
+    if (fs.exists(tmpTableDir) && !fs.delete(tmpTableDir, true)) {
+      throw new IOException("Can not delete partial created proc region " + tmpTableDir);
+    }
+    HRegion.createHRegion(conf, regionInfo, fs, tmpTableDir, td).close();
+    Path tableDir = CommonFSUtils.getTableDir(rootDir, tn);
+    if (!fs.rename(tmpTableDir, tableDir)) {
+      throw new IOException("Can not rename " + tmpTableDir + " to " + tableDir);
+    }
+    WAL wal = createWAL(walFactory, walRoller, serverName, walFs, walRootDir, regionInfo);
+    return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, td, wal, null, null);
+  }
+
+  private static HRegion open(Configuration conf, TableDescriptor td, FileSystem fs, Path rootDir,
+    FileSystem walFs, Path walRootDir, WALFactory walFactory, LocalRegionWALRoller walRoller,
+    String serverName) throws IOException {
+    Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName());
+    Path regionDir =
+      fs.listStatus(tableDir, p -> RegionInfo.isEncodedRegionName(Bytes.toBytes(p.getName())))[0]
+        .getPath();
+    RegionInfo regionInfo = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
+
+    Path walRegionDir = FSUtils.getRegionDirFromRootDir(walRootDir, regionInfo);
+    Path replayEditsDir = new Path(walRegionDir, REPLAY_EDITS_DIR);
+    if (!walFs.exists(replayEditsDir) && !walFs.mkdirs(replayEditsDir)) {
+      throw new IOException("Failed to create replay directory: " + replayEditsDir);
+    }
+    Path walsDir = new Path(walRootDir, HREGION_LOGDIR_NAME);
+    for (FileStatus walDir : walFs.listStatus(walsDir)) {
+      if (!walDir.isDirectory()) {
+        continue;
+      }
+      if (walDir.getPath().getName().startsWith(serverName)) {
+        LOG.warn("This should not happen in real production as we have not created our WAL " +
+          "directory yet, ignore if you are running a local region related UT");
+      }
+      Path deadWALDir;
+      if (!walDir.getPath().getName().endsWith(DEAD_WAL_DIR_SUFFIX)) {
+        deadWALDir =
+          new Path(walDir.getPath().getParent(), walDir.getPath().getName() + DEAD_WAL_DIR_SUFFIX);
+        if (!walFs.rename(walDir.getPath(), deadWALDir)) {
+          throw new IOException("Can not rename " + walDir + " to " + deadWALDir +
+            " when recovering lease of proc store");
+        }
+        LOG.info("Renamed {} to {} as it is dead", walDir.getPath(), deadWALDir);
+      } else {
+        deadWALDir = walDir.getPath();
+        LOG.info("{} is already marked as dead", deadWALDir);
+      }
+      for (FileStatus walFile : walFs.listStatus(deadWALDir)) {
+        Path replayEditsFile = new Path(replayEditsDir, walFile.getPath().getName());
+        RecoverLeaseFSUtils.recoverFileLease(walFs, walFile.getPath(), conf);
+        if (!walFs.rename(walFile.getPath(), replayEditsFile)) {
+          throw new IOException("Can not rename " + walFile.getPath() + " to " + replayEditsFile +
+            " when recovering lease for local region");
+        }
+        LOG.info("Renamed {} to {}", walFile.getPath(), replayEditsFile);
+      }
+      LOG.info("Delete empty local region wal dir {}", deadWALDir);
+      walFs.delete(deadWALDir, true);
+    }
+
+    WAL wal = createWAL(walFactory, walRoller, serverName, walFs, walRootDir, regionInfo);
+    conf.set(HRegion.SPECIAL_RECOVERED_EDITS_DIR,
+      replayEditsDir.makeQualified(walFs.getUri(), walFs.getWorkingDirectory()).toString());
+    return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, td, wal, null, null);
+  }
+
+  public static LocalRegion create(LocalRegionParams params) throws IOException {
+    TableDescriptor td = params.tableDescriptor();
+    LOG.info("Create or load local region for table " + td);
+    Server server = params.server();
+    Configuration baseConf = server.getConfiguration();
+    FileSystem fs = CommonFSUtils.getRootDirFileSystem(baseConf);
+    FileSystem walFs = CommonFSUtils.getWALFileSystem(baseConf);
+    Path globalRootDir = CommonFSUtils.getRootDir(baseConf);
+    Path globalWALRootDir = CommonFSUtils.getWALRootDir(baseConf);
+    Path rootDir = new Path(globalRootDir, params.regionDirName());
+    Path walRootDir = new Path(globalWALRootDir, params.regionDirName());
+    // we will override some configurations so create a new one.
+    Configuration conf = new Configuration(baseConf);
+    CommonFSUtils.setRootDir(conf, rootDir);
+    CommonFSUtils.setWALRootDir(conf, walRootDir);
+    LocalRegionFlusherAndCompactor.setupConf(conf, params.flushSize(), params.flushPerChanges(),
+      params.flushIntervalMs());
+    conf.setInt(AbstractFSWAL.MAX_LOGS, params.maxWals());
+    if (params.useHsync() != null) {
+      conf.setBoolean(HRegion.WAL_HSYNC_CONF_KEY, params.useHsync());
+    }
+    conf.setInt(AbstractFSWAL.RING_BUFFER_SLOT_COUNT,
+      IntMath.ceilingPowerOfTwo(params.ringBufferSlotCount()));
+
+    LocalRegionWALRoller walRoller = LocalRegionWALRoller.create(td.getTableName() + "-WAL-Roller",
+      conf, server, walFs, walRootDir, globalWALRootDir, params.archivedWalSuffix(),
+      params.rollPeriodMs(), params.flushSize());
+    walRoller.start();
+
+    WALFactory walFactory = new WALFactory(conf, server.getServerName().toString());
+    Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName());
+    HRegion region;
+    if (fs.exists(tableDir)) {
+      // load the existing region.
+      region = open(conf, td, fs, rootDir, walFs, walRootDir, walFactory, walRoller,
+        server.getServerName().toString());
+    } else {
+      // bootstrapping...
+      region = bootstrap(conf, td, fs, rootDir, walFs, walRootDir, walFactory, walRoller,
+        server.getServerName().toString());
+    }
+    Path globalArchiveDir = HFileArchiveUtil.getArchivePath(baseConf);
+    LocalRegionFlusherAndCompactor flusherAndCompactor = new LocalRegionFlusherAndCompactor(conf,
+      server, region, params.flushSize(), params.flushPerChanges(), params.flushIntervalMs(),
+      params.compactMin(), globalArchiveDir, params.archivedHFileSuffix());
+    walRoller.setFlusherAndCompactor(flusherAndCompactor);
+    Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
+    if (!fs.mkdirs(archiveDir)) {
+      LOG.warn("Failed to create archive directory {}. Usually this should not happen but it will" +
+        " be created again when we actually archive the hfiles later, so continue", archiveDir);
+    }
+    return new LocalRegion(region, walFactory, flusherAndCompactor, walRoller);
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionFlusherAndCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalRegionFlusherAndCompactor.java
similarity index 71%
rename from hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionFlusherAndCompactor.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalRegionFlusherAndCompactor.java
index 57e62dd..b177a59 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionFlusherAndCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalRegionFlusherAndCompactor.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.procedure2.store.region;
+package org.apache.hadoop.hbase.master.store;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -27,22 +27,26 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
 import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
- * As long as there is no RegionServerServices for the procedure store region, we need implement the
- * flush and compaction logic by our own.
+ * As long as there is no RegionServerServices for a 'local' region, we need implement the flush and
+ * compaction logic by our own.
  * <p/>
  * The flush logic is very simple, every time after calling a modification method in
  * {@link RegionProcedureStore}, we will call the {@link #onUpdate()} method below, and in this
@@ -53,26 +57,11 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
  * count, if it is above the compactMin, we will do a major compaction.
  */
 @InterfaceAudience.Private
-class RegionFlusherAndCompactor implements Closeable {
+class LocalRegionFlusherAndCompactor implements Closeable {
 
-  private static final Logger LOG = LoggerFactory.getLogger(RegionFlusherAndCompactor.class);
+  private static final Logger LOG = LoggerFactory.getLogger(LocalRegionFlusherAndCompactor.class);
 
-  static final String FLUSH_SIZE_KEY = "hbase.procedure.store.region.flush.size";
-
-  static final long DEFAULT_FLUSH_SIZE = TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE;
-
-  static final String FLUSH_PER_CHANGES_KEY = "hbase.procedure.store.region.flush.per.changes";
-
-  private static final long DEFAULT_FLUSH_PER_CHANGES = 1_000_000;
-
-  static final String FLUSH_INTERVAL_MS_KEY = "hbase.procedure.store.region.flush.interval.ms";
-
-  // default to flush every 15 minutes, for safety
-  private static final long DEFAULT_FLUSH_INTERVAL_MS = TimeUnit.MINUTES.toMillis(15);
-
-  static final String COMPACT_MIN_KEY = "hbase.procedure.store.region.compact.min";
-
-  private static final int DEFAULT_COMPACT_MIN = 4;
+  private final Configuration conf;
 
   private final Abortable abortable;
 
@@ -90,6 +79,10 @@ class RegionFlusherAndCompactor implements Closeable {
 
   private final int compactMin;
 
+  private final Path globalArchivePath;
+
+  private final String archivedHFileSuffix;
+
   private final Thread flushThread;
 
   private final Lock flushLock = new ReentrantLock();
@@ -108,38 +101,60 @@ class RegionFlusherAndCompactor implements Closeable {
 
   private volatile boolean closed = false;
 
-  RegionFlusherAndCompactor(Configuration conf, Abortable abortable, HRegion region) {
+  LocalRegionFlusherAndCompactor(Configuration conf, Abortable abortable, HRegion region,
+    long flushSize, long flushPerChanges, long flushIntervalMs, int compactMin,
+    Path globalArchivePath, String archivedHFileSuffix) {
+    this.conf = conf;
     this.abortable = abortable;
     this.region = region;
-    flushSize = conf.getLong(FLUSH_SIZE_KEY, DEFAULT_FLUSH_SIZE);
-    flushPerChanges = conf.getLong(FLUSH_PER_CHANGES_KEY, DEFAULT_FLUSH_PER_CHANGES);
-    flushIntervalMs = conf.getLong(FLUSH_INTERVAL_MS_KEY, DEFAULT_FLUSH_INTERVAL_MS);
-    compactMin = conf.getInt(COMPACT_MIN_KEY, DEFAULT_COMPACT_MIN);
-    flushThread = new Thread(this::flushLoop, "Procedure-Region-Store-Flusher");
+    this.flushSize = flushSize;
+    this.flushPerChanges = flushPerChanges;
+    this.flushIntervalMs = flushIntervalMs;
+    this.compactMin = compactMin;
+    this.globalArchivePath = globalArchivePath;
+    this.archivedHFileSuffix = archivedHFileSuffix;
+    flushThread = new Thread(this::flushLoop, region.getRegionInfo().getTable() + "-Flusher");
     flushThread.setDaemon(true);
     flushThread.start();
     compactExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
-      .setNameFormat("Procedure-Region-Store-Compactor").setDaemon(true).build());
+      .setNameFormat(region.getRegionInfo().getTable() + "-Store-Compactor").setDaemon(true)
+      .build());
     LOG.info("Constructor flushSize={}, flushPerChanges={}, flushIntervalMs={}, compactMin={}",
       flushSize, flushPerChanges, flushIntervalMs, compactMin);
   }
 
   // inject our flush related configurations
-  static void setupConf(Configuration conf) {
-    long flushSize = conf.getLong(FLUSH_SIZE_KEY, DEFAULT_FLUSH_SIZE);
+  static void setupConf(Configuration conf, long flushSize, long flushPerChanges,
+    long flushIntervalMs) {
     conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSize);
-    long flushPerChanges = conf.getLong(FLUSH_PER_CHANGES_KEY, DEFAULT_FLUSH_PER_CHANGES);
     conf.setLong(HRegion.MEMSTORE_FLUSH_PER_CHANGES, flushPerChanges);
-    long flushIntervalMs = conf.getLong(FLUSH_INTERVAL_MS_KEY, DEFAULT_FLUSH_INTERVAL_MS);
     conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, flushIntervalMs);
     LOG.info("Injected flushSize={}, flushPerChanges={}, flushIntervalMs={}", flushSize,
       flushPerChanges, flushIntervalMs);
   }
 
+  private void moveHFileToGlobalArchiveDir() throws IOException {
+    FileSystem fs = region.getRegionFileSystem().getFileSystem();
+    for (HStore store : region.getStores()) {
+      store.closeAndArchiveCompactedFiles();
+      Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, region.getRegionInfo(),
+        store.getColumnFamilyDescriptor().getName());
+      Path globalStoreArchiveDir = HFileArchiveUtil.getStoreArchivePathForArchivePath(
+        globalArchivePath, region.getRegionInfo(), store.getColumnFamilyDescriptor().getName());
+      try {
+        LocalRegionUtils.moveFilesUnderDir(fs, storeArchiveDir, globalStoreArchiveDir,
+          archivedHFileSuffix);
+      } catch (IOException e) {
+        LOG.warn("Failed to move archived hfiles from {} to global dir {}", storeArchiveDir,
+          globalStoreArchiveDir, e);
+      }
+    }
+  }
+
   private void compact() {
     try {
       region.compact(true);
-      Iterables.getOnlyElement(region.getStores()).closeAndArchiveCompactedFiles();
+      moveHFileToGlobalArchiveDir();
     } catch (IOException e) {
       LOG.error("Failed to compact procedure store region", e);
     }
@@ -156,7 +171,12 @@ class RegionFlusherAndCompactor implements Closeable {
   }
 
   private boolean needCompaction() {
-    return Iterables.getOnlyElement(region.getStores()).getStorefilesCount() >= compactMin;
+    for (Store store : region.getStores()) {
+      if (store.getStorefilesCount() >= compactMin) {
+        return true;
+      }
+    }
+    return false;
   }
 
   private void flushLoop() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalRegionParams.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalRegionParams.java
new file mode 100644
index 0000000..85bcaae
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalRegionParams.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.store;
+
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * The parameters for constructing {@link LocalRegion}.
+ */
+@InterfaceAudience.Private
+public class LocalRegionParams {
+
+  private Server server;
+
+  private String regionDirName;
+
+  private TableDescriptor tableDescriptor;
+
+  private Long flushSize;
+
+  private Long flushPerChanges;
+
+  private Long flushIntervalMs;
+
+  private Integer compactMin;
+
+  private Integer maxWals;
+
+  private Boolean useHsync;
+
+  private Integer ringBufferSlotCount;
+
+  private Long rollPeriodMs;
+
+  private String archivedWalSuffix;
+
+  private String archivedHFileSuffix;
+
+  public LocalRegionParams server(Server server) {
+    this.server = server;
+    return this;
+  }
+
+  public LocalRegionParams regionDirName(String regionDirName) {
+    this.regionDirName = regionDirName;
+    return this;
+  }
+
+  public LocalRegionParams tableDescriptor(TableDescriptor tableDescriptor) {
+    this.tableDescriptor = tableDescriptor;
+    return this;
+  }
+
+  public LocalRegionParams flushSize(long flushSize) {
+    this.flushSize = flushSize;
+    return this;
+  }
+
+  public LocalRegionParams flushPerChanges(long flushPerChanges) {
+    this.flushPerChanges = flushPerChanges;
+    return this;
+  }
+
+  public LocalRegionParams flushIntervalMs(long flushIntervalMs) {
+    this.flushIntervalMs = flushIntervalMs;
+    return this;
+  }
+
+  public LocalRegionParams compactMin(int compactMin) {
+    this.compactMin = compactMin;
+    return this;
+  }
+
+  public LocalRegionParams maxWals(int maxWals) {
+    this.maxWals = maxWals;
+    return this;
+  }
+
+  public LocalRegionParams useHsync(boolean useHsync) {
+    this.useHsync = useHsync;
+    return this;
+  }
+
+  public LocalRegionParams ringBufferSlotCount(int ringBufferSlotCount) {
+    this.ringBufferSlotCount = ringBufferSlotCount;
+    return this;
+  }
+
+  public LocalRegionParams rollPeriodMs(long rollPeriodMs) {
+    this.rollPeriodMs = rollPeriodMs;
+    return this;
+  }
+
+  public LocalRegionParams archivedWalSuffix(String archivedWalSuffix) {
+    this.archivedWalSuffix = archivedWalSuffix;
+    return this;
+  }
+
+  public LocalRegionParams archivedHFileSuffix(String archivedHFileSuffix) {
+    this.archivedHFileSuffix = archivedHFileSuffix;
+    return this;
+  }
+
+  public Server server() {
+    return server;
+  }
+
+  public String regionDirName() {
+    return regionDirName;
+  }
+
+  public TableDescriptor tableDescriptor() {
+    return tableDescriptor;
+  }
+
+  public long flushSize() {
+    return flushSize;
+  }
+
+  public long flushPerChanges() {
+    return flushPerChanges;
+  }
+
+  public long flushIntervalMs() {
+    return flushIntervalMs;
+  }
+
+  public int compactMin() {
+    return compactMin;
+  }
+
+  public int maxWals() {
+    return maxWals;
+  }
+
+  public Boolean useHsync() {
+    return useHsync;
+  }
+
+  public int ringBufferSlotCount() {
+    return ringBufferSlotCount;
+  }
+
+  public long rollPeriodMs() {
+    return rollPeriodMs;
+  }
+
+  public String archivedWalSuffix() {
+    return archivedWalSuffix;
+  }
+
+  public String archivedHFileSuffix() {
+    return archivedHFileSuffix;
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalRegionUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalRegionUtils.java
new file mode 100644
index 0000000..a538db6
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalRegionUtils.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.store;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+final class LocalRegionUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LocalRegionUtils.class);
+
+  private LocalRegionUtils() {
+  }
+
+  static void moveFilesUnderDir(FileSystem fs, Path src, Path dst, String suffix)
+    throws IOException {
+    if (!fs.exists(dst) && !fs.mkdirs(dst)) {
+      LOG.warn("Failed to create dir {}", dst);
+      return;
+    }
+    FileStatus[] archivedWALFiles = fs.listStatus(src);
+    if (archivedWALFiles == null) {
+      return;
+    }
+    for (FileStatus status : archivedWALFiles) {
+      Path file = status.getPath();
+      Path newFile = new Path(dst, file.getName() + suffix);
+      if (fs.rename(file, newFile)) {
+        LOG.info("Moved {} to {}", file, newFile);
+      } else {
+        LOG.warn("Failed to move from {} to {}", file, newFile);
+      }
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreWALRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalRegionWALRoller.java
similarity index 56%
rename from hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreWALRoller.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalRegionWALRoller.java
index d24924f..2880a38 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreWALRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalRegionWALRoller.java
@@ -15,18 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.procedure2.store.region;
+package org.apache.hadoop.hbase.master.store;
 
 import static org.apache.hadoop.hbase.HConstants.HREGION_OLDLOGDIR_NAME;
 
 import java.io.IOException;
-import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
 import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
@@ -38,23 +35,19 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * As long as there is no RegionServerServices for the procedure store region, we need implement log
- * roller logic by our own.
+ * As long as there is no RegionServerServices for a local region, we need implement log roller
+ * logic by our own.
  * <p/>
  * We can reuse most of the code for normal wal roller, the only difference is that there is only
  * one region, so in {@link #scheduleFlush(String)} method we can just schedule flush for the
  * procedure store region.
  */
 @InterfaceAudience.Private
-final class RegionProcedureStoreWALRoller extends AbstractWALRoller<Abortable> {
+public final class LocalRegionWALRoller extends AbstractWALRoller<Abortable> {
 
-  private static final Logger LOG = LoggerFactory.getLogger(RegionProcedureStoreWALRoller.class);
+  private static final Logger LOG = LoggerFactory.getLogger(LocalRegionWALRoller.class);
 
-  static final String ROLL_PERIOD_MS_KEY = "hbase.procedure.store.region.walroll.period.ms";
-
-  private static final long DEFAULT_ROLL_PERIOD_MS = TimeUnit.MINUTES.toMillis(15);
-
-  private volatile RegionFlusherAndCompactor flusherAndCompactor;
+  private volatile LocalRegionFlusherAndCompactor flusherAndCompactor;
 
   private final FileSystem fs;
 
@@ -62,36 +55,22 @@ final class RegionProcedureStoreWALRoller extends AbstractWALRoller<Abortable> {
 
   private final Path globalWALArchiveDir;
 
-  private RegionProcedureStoreWALRoller(Configuration conf, Abortable abortable, FileSystem fs,
-    Path walRootDir, Path globalWALRootDir) {
-    super("RegionProcedureStoreWALRoller", conf, abortable);
+  private final String archivedWALSuffix;
+
+  private LocalRegionWALRoller(String name, Configuration conf, Abortable abortable, FileSystem fs,
+    Path walRootDir, Path globalWALRootDir, String archivedWALSuffix) {
+    super(name, conf, abortable);
     this.fs = fs;
     this.walArchiveDir = new Path(walRootDir, HREGION_OLDLOGDIR_NAME);
     this.globalWALArchiveDir = new Path(globalWALRootDir, HREGION_OLDLOGDIR_NAME);
+    this.archivedWALSuffix = archivedWALSuffix;
   }
 
   @Override
   protected void afterRoll(WAL wal) {
     // move the archived WAL files to the global archive path
     try {
-      if (!fs.exists(globalWALArchiveDir) && !fs.mkdirs(globalWALArchiveDir)) {
-        LOG.warn("Failed to create global archive dir {}", globalWALArchiveDir);
-        return;
-      }
-      FileStatus[] archivedWALFiles = fs.listStatus(walArchiveDir);
-      if (archivedWALFiles == null) {
-        return;
-      }
-      for (FileStatus status : archivedWALFiles) {
-        Path file = status.getPath();
-        Path newFile = new Path(globalWALArchiveDir,
-          file.getName() + MasterProcedureUtil.ARCHIVED_PROC_WAL_SUFFIX);
-        if (fs.rename(file, newFile)) {
-          LOG.info("Moved {} to {}", file, newFile);
-        } else {
-          LOG.warn("Failed to move archived wal from {} to global place {}", file, newFile);
-        }
-      }
+      LocalRegionUtils.moveFilesUnderDir(fs, walArchiveDir, globalWALArchiveDir, archivedWALSuffix);
     } catch (IOException e) {
       LOG.warn("Failed to move archived wals from {} to global dir {}", walArchiveDir,
         globalWALArchiveDir, e);
@@ -100,28 +79,29 @@ final class RegionProcedureStoreWALRoller extends AbstractWALRoller<Abortable> {
 
   @Override
   protected void scheduleFlush(String encodedRegionName) {
-    RegionFlusherAndCompactor flusher = this.flusherAndCompactor;
+    LocalRegionFlusherAndCompactor flusher = this.flusherAndCompactor;
     if (flusher != null) {
       flusher.requestFlush();
     }
   }
 
-  void setFlusherAndCompactor(RegionFlusherAndCompactor flusherAndCompactor) {
+  void setFlusherAndCompactor(LocalRegionFlusherAndCompactor flusherAndCompactor) {
     this.flusherAndCompactor = flusherAndCompactor;
   }
 
-  static RegionProcedureStoreWALRoller create(Configuration conf, Abortable abortable,
-    FileSystem fs, Path walRootDir, Path globalWALRootDir) {
+  static LocalRegionWALRoller create(String name, Configuration conf, Abortable abortable,
+    FileSystem fs, Path walRootDir, Path globalWALRootDir, String archivedWALSuffix,
+    long rollPeriodMs, long flushSize) {
     // we can not run with wal disabled, so force set it to true.
     conf.setBoolean(WALFactory.WAL_ENABLED, true);
     // we do not need this feature, so force disable it.
     conf.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, false);
-    conf.setLong(WAL_ROLL_PERIOD_KEY, conf.getLong(ROLL_PERIOD_MS_KEY, DEFAULT_ROLL_PERIOD_MS));
-    long flushSize = conf.getLong(RegionFlusherAndCompactor.FLUSH_SIZE_KEY,
-      RegionFlusherAndCompactor.DEFAULT_FLUSH_SIZE);
+    conf.setLong(WAL_ROLL_PERIOD_KEY, rollPeriodMs);
     // make the roll size the same with the flush size, as we only have one region here
     conf.setLong(WALUtil.WAL_BLOCK_SIZE, flushSize * 2);
     conf.setFloat(AbstractFSWAL.WAL_ROLL_MULTIPLIER, 0.5f);
-    return new RegionProcedureStoreWALRoller(conf, abortable, fs, walRootDir, globalWALRootDir);
+    return new LocalRegionWALRoller(name, conf, abortable, fs, walRootDir, globalWALRootDir,
+      archivedWALSuffix);
   }
+
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalStore.java
new file mode 100644
index 0000000..d2c6884
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/LocalStore.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.store;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Used for storing data at master side. The data will be stored in a {@link LocalRegion}.
+ */
+@InterfaceAudience.Private
+public final class LocalStore {
+
+  // Use the character $ to let the log cleaner know that this is not the normal wal file.
+  public static final String ARCHIVED_WAL_SUFFIX = "$masterlocalwal$";
+
+  // this is a bit trick that in StoreFileInfo.validateStoreFileName, we just test if the file name
+  // contains '-' to determine if it is a valid store file, so here we have to add '-'in the file
+  // name to avoid being processed by normal TimeToLiveHFileCleaner.
+  public static final String ARCHIVED_HFILE_SUFFIX = "$-masterlocalhfile-$";
+
+  private static final String MAX_WALS_KEY = "hbase.master.store.region.maxwals";
+
+  private static final int DEFAULT_MAX_WALS = 10;
+
+  public static final String USE_HSYNC_KEY = "hbase.master.store.region.wal.hsync";
+
+  public static final String MASTER_STORE_DIR = "MasterData";
+
+  private static final String FLUSH_SIZE_KEY = "hbase.master.store.region.flush.size";
+
+  private static final long DEFAULT_FLUSH_SIZE = TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE;
+
+  private static final String FLUSH_PER_CHANGES_KEY = "hbase.master.store.region.flush.per.changes";
+
+  private static final long DEFAULT_FLUSH_PER_CHANGES = 1_000_000;
+
+  private static final String FLUSH_INTERVAL_MS_KEY = "hbase.master.store.region.flush.interval.ms";
+
+  // default to flush every 15 minutes, for safety
+  private static final long DEFAULT_FLUSH_INTERVAL_MS = TimeUnit.MINUTES.toMillis(15);
+
+  private static final String COMPACT_MIN_KEY = "hbase.master.store.region.compact.min";
+
+  private static final int DEFAULT_COMPACT_MIN = 4;
+
+  private static final String ROLL_PERIOD_MS_KEY = "hbase.master.store.region.walroll.period.ms";
+
+  private static final long DEFAULT_ROLL_PERIOD_MS = TimeUnit.MINUTES.toMillis(15);
+
+  private static final String RING_BUFFER_SLOT_COUNT = "hbase.master.store.ringbuffer.slot.count";
+
+  private static final int DEFAULT_RING_BUFFER_SLOT_COUNT = 128;
+
+  public static final TableName TABLE_NAME = TableName.valueOf("master:store");
+
+  public static final byte[] PROC_FAMILY = Bytes.toBytes("proc");
+
+  private static final TableDescriptor TABLE_DESC = TableDescriptorBuilder.newBuilder(TABLE_NAME)
+    .setColumnFamily(ColumnFamilyDescriptorBuilder.of(PROC_FAMILY)).build();
+
+  private final LocalRegion region;
+
+  private LocalStore(LocalRegion region) {
+    this.region = region;
+  }
+
+  public void update(UpdateLocalRegion action) throws IOException {
+    region.update(action);
+  }
+
+  public Result get(Get get) throws IOException {
+    return region.get(get);
+  }
+
+  public RegionScanner getScanner(Scan scan) throws IOException {
+    return region.getScanner(scan);
+  }
+
+  public void close(boolean abort) {
+    region.close(abort);
+  }
+
+  @VisibleForTesting
+  public FlushResult flush(boolean force) throws IOException {
+    return region.flush(force);
+  }
+
+  @VisibleForTesting
+  public void requestRollAll() {
+    region.requestRollAll();
+  }
+
+  @VisibleForTesting
+  public void waitUntilWalRollFinished() throws InterruptedException {
+    region.waitUntilWalRollFinished();
+  }
+
+  public static LocalStore create(Server server) throws IOException {
+    LocalRegionParams params = new LocalRegionParams().server(server)
+      .regionDirName(MASTER_STORE_DIR).tableDescriptor(TABLE_DESC);
+    Configuration conf = server.getConfiguration();
+    long flushSize = conf.getLong(FLUSH_SIZE_KEY, DEFAULT_FLUSH_SIZE);
+    long flushPerChanges = conf.getLong(FLUSH_PER_CHANGES_KEY, DEFAULT_FLUSH_PER_CHANGES);
+    long flushIntervalMs = conf.getLong(FLUSH_INTERVAL_MS_KEY, DEFAULT_FLUSH_INTERVAL_MS);
+    int compactMin = conf.getInt(COMPACT_MIN_KEY, DEFAULT_COMPACT_MIN);
+    params.flushSize(flushSize).flushPerChanges(flushPerChanges).flushIntervalMs(flushIntervalMs)
+      .compactMin(compactMin);
+    int maxWals = conf.getInt(MAX_WALS_KEY, DEFAULT_MAX_WALS);
+    params.maxWals(maxWals);
+    if (conf.get(USE_HSYNC_KEY) != null) {
+      params.useHsync(conf.getBoolean(USE_HSYNC_KEY, false));
+    }
+    params.ringBufferSlotCount(conf.getInt(RING_BUFFER_SLOT_COUNT, DEFAULT_RING_BUFFER_SLOT_COUNT));
+    long rollPeriodMs = conf.getLong(ROLL_PERIOD_MS_KEY, DEFAULT_ROLL_PERIOD_MS);
+    params.rollPeriodMs(rollPeriodMs).archivedWalSuffix(ARCHIVED_WAL_SUFFIX)
+      .archivedHFileSuffix(ARCHIVED_HFILE_SUFFIX);
+    LocalRegion region = LocalRegion.create(params);
+    return new LocalStore(region);
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/UpdateLocalRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/UpdateLocalRegion.java
new file mode 100644
index 0000000..bfd279c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/store/UpdateLocalRegion.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.store;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+@FunctionalInterface
+public interface UpdateLocalRegion {
+
+  void update(HRegion region) throws IOException;
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/HFileProcedurePrettyPrinter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/HFileProcedurePrettyPrinter.java
index 110547f..4aac5da 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/HFileProcedurePrettyPrinter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/HFileProcedurePrettyPrinter.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.master.store.LocalStore;
 import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
 import org.apache.hadoop.hbase.util.AbstractHBaseTool;
@@ -76,16 +77,15 @@ public class HFileProcedurePrettyPrinter extends AbstractHBaseTool {
     addOptWithArg("w", "seekToPid", "Seek to this procedure id and print this procedure only");
     OptionGroup files = new OptionGroup();
     files.addOption(new Option("f", "file", true,
-      "File to scan. Pass full-path; e.g. hdfs://a:9000/MasterProcs/master/procedure/p/xxx"));
+      "File to scan. Pass full-path; e.g. hdfs://a:9000/MasterProcs/master/local/proc/xxx"));
     files.addOption(new Option("a", "all", false, "Scan the whole procedure region."));
     files.setRequired(true);
     options.addOptionGroup(files);
   }
 
   private void addAllHFiles() throws IOException {
-    Path masterProcDir =
-      new Path(CommonFSUtils.getWALRootDir(conf), RegionProcedureStore.MASTER_PROCEDURE_DIR);
-    Path tableDir = CommonFSUtils.getTableDir(masterProcDir, RegionProcedureStore.TABLE_NAME);
+    Path masterProcDir = new Path(CommonFSUtils.getRootDir(conf), LocalStore.MASTER_STORE_DIR);
+    Path tableDir = CommonFSUtils.getTableDir(masterProcDir, LocalStore.TABLE_NAME);
     FileSystem fs = tableDir.getFileSystem(conf);
     Path regionDir =
       fs.listStatus(tableDir, p -> RegionInfo.isEncodedRegionName(Bytes.toBytes(p.getName())))[0]
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java
index 3cdd817..45f411b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java
@@ -18,8 +18,8 @@
 package org.apache.hadoop.hbase.procedure2.store.region;
 
 import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
-import static org.apache.hadoop.hbase.HConstants.HREGION_LOGDIR_NAME;
 import static org.apache.hadoop.hbase.HConstants.NO_NONCE;
+import static org.apache.hadoop.hbase.master.store.LocalStore.PROC_FAMILY;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
@@ -30,86 +30,48 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-
 import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.ipc.RpcCall;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
-import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
 import org.apache.hadoop.hbase.master.assignment.MoveRegionProcedure;
 import org.apache.hadoop.hbase.master.assignment.UnassignProcedure;
-import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
-import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
 import org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure;
 import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
+import org.apache.hadoop.hbase.master.store.LocalStore;
 import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
 import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureTree;
 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
-import org.apache.hadoop.hbase.util.HFileArchiveUtil;
-import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
-import org.apache.hbase.thirdparty.com.google.common.math.IntMath;
 
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
 
 /**
- * A procedure store which uses a region to store all the procedures.
+ * A procedure store which uses the master local store to store all the procedures.
  * <p/>
- * FileSystem layout:
- *
- * <pre>
- * hbase
- *   |
- *   --MasterProcs
- *       |
- *       --data
- *       |  |
- *       |  --/master/procedure/&lt;encoded-region-name&gt; <---- The region data
- *       |      |
- *       |      --replay <---- The edits to replay
- *       |
- *       --WALs
- *          |
- *          --&lt;master-server-name&gt; <---- The WAL dir for active master
- *          |
- *          --&lt;master-server-name&gt;-dead <---- The WAL dir dead master
- * </pre>
- *
- * We use p:d column to store the serialized protobuf format procedure, and when deleting we will
+ * We use proc:d column to store the serialized protobuf format procedure, and when deleting we will
  * first fill the info:proc column with an empty byte array, and then actually delete them in the
  * {@link #cleanup()} method. This is because that we need to retain the max procedure id, so we can
  * not directly delete a procedure row as we do not know if it is the one with the max procedure id.
@@ -119,58 +81,20 @@ public class RegionProcedureStore extends ProcedureStoreBase {
 
   private static final Logger LOG = LoggerFactory.getLogger(RegionProcedureStore.class);
 
-  static final String MAX_WALS_KEY = "hbase.procedure.store.region.maxwals";
-
-  private static final int DEFAULT_MAX_WALS = 10;
-
-  static final String USE_HSYNC_KEY = "hbase.procedure.store.region.wal.hsync";
-
-  static final String MASTER_PROCEDURE_DIR = "MasterProcs";
-
-  static final String HFILECLEANER_PLUGINS = "hbase.procedure.store.region.hfilecleaner.plugins";
-
-  private static final String REPLAY_EDITS_DIR = "recovered.wals";
-
-  private static final String DEAD_WAL_DIR_SUFFIX = "-dead";
-
-  static final TableName TABLE_NAME = TableName.valueOf("master:procedure");
-
-  static final byte[] FAMILY = Bytes.toBytes("p");
-
   static final byte[] PROC_QUALIFIER = Bytes.toBytes("d");
 
-  private static final int REGION_ID = 1;
-
-  private static final TableDescriptor TABLE_DESC = TableDescriptorBuilder.newBuilder(TABLE_NAME)
-    .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
-
   private final Server server;
 
-  private final DirScanPool cleanerPool;
-
   private final LeaseRecovery leaseRecovery;
 
-  // Used to delete the compacted hfiles. Since we put all data on WAL filesystem, it is not
-  // possible to move the compacted hfiles to the global hfile archive directory, we have to do it
-  // by ourselves.
-  private HFileCleaner cleaner;
-
-  private WALFactory walFactory;
-
-  @VisibleForTesting
-  HRegion region;
-
-  @VisibleForTesting
-  RegionFlusherAndCompactor flusherAndCompactor;
-
   @VisibleForTesting
-  RegionProcedureStoreWALRoller walRoller;
+  final LocalStore localStore;
 
   private int numThreads;
 
-  public RegionProcedureStore(Server server, DirScanPool cleanerPool, LeaseRecovery leaseRecovery) {
+  public RegionProcedureStore(Server server, LocalStore localStore, LeaseRecovery leaseRecovery) {
     this.server = server;
-    this.cleanerPool = cleanerPool;
+    this.localStore = localStore;
     this.leaseRecovery = leaseRecovery;
   }
 
@@ -183,52 +107,12 @@ public class RegionProcedureStore extends ProcedureStoreBase {
     this.numThreads = numThreads;
   }
 
-  private void shutdownWAL() {
-    if (walFactory != null) {
-      try {
-        walFactory.shutdown();
-      } catch (IOException e) {
-        LOG.warn("Failed to shutdown WAL", e);
-      }
-    }
-  }
-
-  private void closeRegion(boolean abort) {
-    if (region != null) {
-      try {
-        region.close(abort);
-      } catch (IOException e) {
-        LOG.warn("Failed to close region", e);
-      }
-    }
-
-  }
-
   @Override
   public void stop(boolean abort) {
     if (!setRunning(false)) {
       return;
     }
     LOG.info("Stopping the Region Procedure Store, isAbort={}", abort);
-    if (cleaner != null) {
-      cleaner.cancel(abort);
-    }
-    if (flusherAndCompactor != null) {
-      flusherAndCompactor.close();
-    }
-    // if abort, we shutdown wal first to fail the ongoing updates to the region, and then close the
-    // region, otherwise there will be dead lock.
-    if (abort) {
-      shutdownWAL();
-      closeRegion(true);
-    } else {
-      closeRegion(false);
-      shutdownWAL();
-    }
-
-    if (walRoller != null) {
-      walRoller.close();
-    }
   }
 
   @Override
@@ -242,91 +126,6 @@ public class RegionProcedureStore extends ProcedureStoreBase {
     return count;
   }
 
-  private WAL createWAL(FileSystem fs, Path rootDir, RegionInfo regionInfo) throws IOException {
-    String logName = AbstractFSWALProvider.getWALDirectoryName(server.getServerName().toString());
-    Path walDir = new Path(rootDir, logName);
-    LOG.debug("WALDir={}", walDir);
-    if (fs.exists(walDir)) {
-      throw new HBaseIOException(
-        "Master procedure store has already created directory at " + walDir);
-    }
-    if (!fs.mkdirs(walDir)) {
-      throw new IOException("Can not create master procedure wal directory " + walDir);
-    }
-    WAL wal = walFactory.getWAL(regionInfo);
-    walRoller.addWAL(wal);
-    return wal;
-  }
-
-  private HRegion bootstrap(Configuration conf, FileSystem fs, Path rootDir) throws IOException {
-    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(TABLE_NAME).setRegionId(REGION_ID).build();
-    Path tmpTableDir = CommonFSUtils.getTableDir(rootDir, TableName
-      .valueOf(TABLE_NAME.getNamespaceAsString(), TABLE_NAME.getQualifierAsString() + "-tmp"));
-    if (fs.exists(tmpTableDir) && !fs.delete(tmpTableDir, true)) {
-      throw new IOException("Can not delete partial created proc region " + tmpTableDir);
-    }
-    HRegion.createHRegion(conf, regionInfo, fs, tmpTableDir, TABLE_DESC).close();
-    Path tableDir = CommonFSUtils.getTableDir(rootDir, TABLE_NAME);
-    if (!fs.rename(tmpTableDir, tableDir)) {
-      throw new IOException("Can not rename " + tmpTableDir + " to " + tableDir);
-    }
-    WAL wal = createWAL(fs, rootDir, regionInfo);
-    return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, TABLE_DESC, wal, null,
-      null);
-  }
-
-  private HRegion open(Configuration conf, FileSystem fs, Path rootDir) throws IOException {
-    String factoryId = server.getServerName().toString();
-    Path tableDir = CommonFSUtils.getTableDir(rootDir, TABLE_NAME);
-    Path regionDir =
-      fs.listStatus(tableDir, p -> RegionInfo.isEncodedRegionName(Bytes.toBytes(p.getName())))[0]
-        .getPath();
-    Path replayEditsDir = new Path(regionDir, REPLAY_EDITS_DIR);
-    if (!fs.exists(replayEditsDir) && !fs.mkdirs(replayEditsDir)) {
-      throw new IOException("Failed to create replay directory: " + replayEditsDir);
-    }
-    Path walsDir = new Path(rootDir, HREGION_LOGDIR_NAME);
-    for (FileStatus walDir : fs.listStatus(walsDir)) {
-      if (!walDir.isDirectory()) {
-        continue;
-      }
-      if (walDir.getPath().getName().startsWith(factoryId)) {
-        LOG.warn("This should not happen in real production as we have not created our WAL " +
-          "directory yet, ignore if you are running a procedure related UT");
-      }
-      Path deadWALDir;
-      if (!walDir.getPath().getName().endsWith(DEAD_WAL_DIR_SUFFIX)) {
-        deadWALDir =
-          new Path(walDir.getPath().getParent(), walDir.getPath().getName() + DEAD_WAL_DIR_SUFFIX);
-        if (!fs.rename(walDir.getPath(), deadWALDir)) {
-          throw new IOException("Can not rename " + walDir + " to " + deadWALDir +
-            " when recovering lease of proc store");
-        }
-        LOG.info("Renamed {} to {} as it is dead", walDir.getPath(), deadWALDir);
-      } else {
-        deadWALDir = walDir.getPath();
-        LOG.info("{} is already marked as dead", deadWALDir);
-      }
-      for (FileStatus walFile : fs.listStatus(deadWALDir)) {
-        Path replayEditsFile = new Path(replayEditsDir, walFile.getPath().getName());
-        leaseRecovery.recoverFileLease(fs, walFile.getPath());
-        if (!fs.rename(walFile.getPath(), replayEditsFile)) {
-          throw new IOException("Can not rename " + walFile.getPath() + " to " + replayEditsFile +
-            " when recovering lease of proc store");
-        }
-        LOG.info("Renamed {} to {}", walFile.getPath(), replayEditsFile);
-      }
-      LOG.info("Delete empty proc wal dir {}", deadWALDir);
-      fs.delete(deadWALDir, true);
-    }
-    RegionInfo regionInfo = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
-    WAL wal = createWAL(fs, rootDir, regionInfo);
-    conf.set(HRegion.SPECIAL_RECOVERED_EDITS_DIR,
-      replayEditsDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString());
-    return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, TABLE_DESC, wal, null,
-      null);
-  }
-
   @SuppressWarnings("deprecation")
   private static final ImmutableSet<Class<?>> UNSUPPORTED_PROCEDURES =
     ImmutableSet.of(RecoverMetaProcedure.class, AssignProcedure.class, UnassignProcedure.class,
@@ -437,8 +236,8 @@ public class RegionProcedureStore extends ProcedureStoreBase {
     if (maxProcIdSet.longValue() > maxProcIdFromProcs.longValue()) {
       if (maxProcIdSet.longValue() > 0) {
         // let's add a fake row to retain the max proc id
-        region.put(new Put(Bytes.toBytes(maxProcIdSet.longValue())).addColumn(FAMILY,
-          PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
+        localStore.update(r -> r.put(new Put(Bytes.toBytes(maxProcIdSet.longValue()))
+          .addColumn(PROC_FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY)));
       }
     } else if (maxProcIdSet.longValue() < maxProcIdFromProcs.longValue()) {
       LOG.warn("The WALProcedureStore max pid is less than the max pid of all loaded procedures");
@@ -453,46 +252,8 @@ public class RegionProcedureStore extends ProcedureStoreBase {
 
   @Override
   public void recoverLease() throws IOException {
-    LOG.debug("Starting Region Procedure Store lease recovery...");
-    Configuration baseConf = server.getConfiguration();
-    FileSystem fs = CommonFSUtils.getWALFileSystem(baseConf);
-    Path globalWALRootDir = CommonFSUtils.getWALRootDir(baseConf);
-    Path rootDir = new Path(globalWALRootDir, MASTER_PROCEDURE_DIR);
-    // we will override some configurations so create a new one.
-    Configuration conf = new Configuration(baseConf);
-    CommonFSUtils.setRootDir(conf, rootDir);
-    CommonFSUtils.setWALRootDir(conf, rootDir);
-    RegionFlusherAndCompactor.setupConf(conf);
-    conf.setInt(AbstractFSWAL.MAX_LOGS, conf.getInt(MAX_WALS_KEY, DEFAULT_MAX_WALS));
-    if (conf.get(USE_HSYNC_KEY) != null) {
-      conf.set(HRegion.WAL_HSYNC_CONF_KEY, conf.get(USE_HSYNC_KEY));
-    }
-    conf.setInt(AbstractFSWAL.RING_BUFFER_SLOT_COUNT, IntMath.ceilingPowerOfTwo(16 * numThreads));
-
-    walRoller = RegionProcedureStoreWALRoller.create(conf, server, fs, rootDir, globalWALRootDir);
-    walRoller.start();
-
-    walFactory = new WALFactory(conf, server.getServerName().toString());
-    Path tableDir = CommonFSUtils.getTableDir(rootDir, TABLE_NAME);
-    if (fs.exists(tableDir)) {
-      // load the existing region.
-      region = open(conf, fs, rootDir);
-    } else {
-      // bootstrapping...
-      region = bootstrap(conf, fs, rootDir);
-    }
-    flusherAndCompactor = new RegionFlusherAndCompactor(conf, server, region);
-    walRoller.setFlusherAndCompactor(flusherAndCompactor);
-    int cleanerInterval = conf.getInt(HMaster.HBASE_MASTER_CLEANER_INTERVAL,
-      HMaster.DEFAULT_HBASE_MASTER_CLEANER_INTERVAL);
-    Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
-    if (!fs.mkdirs(archiveDir)) {
-      LOG.warn("Failed to create archive directory {}. Usually this should not happen but it will" +
-        " be created again when we actually archive the hfiles later, so continue", archiveDir);
-    }
-    cleaner = new HFileCleaner("RegionProcedureStoreHFileCleaner", cleanerInterval, server, conf,
-      fs, archiveDir, HFILECLEANER_PLUGINS, cleanerPool, Collections.emptyMap());
-    server.getChoreService().scheduleChore(cleaner);
+    LOG.info("Starting Region Procedure Store lease recovery...");
+    FileSystem fs = CommonFSUtils.getWALFileSystem(server.getConfiguration());
     tryMigrate(fs);
   }
 
@@ -501,7 +262,8 @@ public class RegionProcedureStore extends ProcedureStoreBase {
     List<ProcedureProtos.Procedure> procs = new ArrayList<>();
     long maxProcId = 0;
 
-    try (RegionScanner scanner = region.getScanner(new Scan().addColumn(FAMILY, PROC_QUALIFIER))) {
+    try (RegionScanner scanner =
+      localStore.getScanner(new Scan().addColumn(PROC_FAMILY, PROC_QUALIFIER))) {
       List<Cell> cells = new ArrayList<>();
       boolean moreRows;
       do {
@@ -530,7 +292,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
     throws IOException {
     ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc);
     byte[] row = Bytes.toBytes(proc.getProcId());
-    mutations.add(new Put(row).addColumn(FAMILY, PROC_QUALIFIER, proto.toByteArray()));
+    mutations.add(new Put(row).addColumn(PROC_FAMILY, PROC_QUALIFIER, proto.toByteArray()));
     rowsToLock.add(row);
   }
 
@@ -538,7 +300,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
   // the proc column with an empty array.
   private void serializeDelete(long procId, List<Mutation> mutations, List<byte[]> rowsToLock) {
     byte[] row = Bytes.toBytes(procId);
-    mutations.add(new Put(row).addColumn(FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
+    mutations.add(new Put(row).addColumn(PROC_FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
     rowsToLock.add(row);
   }
 
@@ -571,14 +333,13 @@ public class RegionProcedureStore extends ProcedureStoreBase {
         for (Procedure<?> subProc : subProcs) {
           serializePut(subProc, mutations, rowsToLock);
         }
-        region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
+        localStore.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
       } catch (IOException e) {
         LOG.error(HBaseMarkers.FATAL, "Failed to insert proc {}, sub procs {}", proc,
           Arrays.toString(subProcs), e);
         throw new UncheckedIOException(e);
       }
     });
-    flusherAndCompactor.onUpdate();
   }
 
   @Override
@@ -590,13 +351,12 @@ public class RegionProcedureStore extends ProcedureStoreBase {
         for (Procedure<?> proc : procs) {
           serializePut(proc, mutations, rowsToLock);
         }
-        region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
+        localStore.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
       } catch (IOException e) {
         LOG.error(HBaseMarkers.FATAL, "Failed to insert procs {}", Arrays.toString(procs), e);
         throw new UncheckedIOException(e);
       }
     });
-    flusherAndCompactor.onUpdate();
   }
 
   @Override
@@ -604,26 +364,24 @@ public class RegionProcedureStore extends ProcedureStoreBase {
     runWithoutRpcCall(() -> {
       try {
         ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc);
-        region.put(new Put(Bytes.toBytes(proc.getProcId())).addColumn(FAMILY, PROC_QUALIFIER,
-          proto.toByteArray()));
+        localStore.update(r -> r.put(new Put(Bytes.toBytes(proc.getProcId())).addColumn(PROC_FAMILY,
+          PROC_QUALIFIER, proto.toByteArray())));
       } catch (IOException e) {
         LOG.error(HBaseMarkers.FATAL, "Failed to update proc {}", proc, e);
         throw new UncheckedIOException(e);
       }
     });
-    flusherAndCompactor.onUpdate();
   }
 
   @Override
   public void delete(long procId) {
     try {
-      region
-        .put(new Put(Bytes.toBytes(procId)).addColumn(FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
+      localStore.update(r -> r.put(
+        new Put(Bytes.toBytes(procId)).addColumn(PROC_FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY)));
     } catch (IOException e) {
       LOG.error(HBaseMarkers.FATAL, "Failed to delete pid={}", procId, e);
       throw new UncheckedIOException(e);
     }
-    flusherAndCompactor.onUpdate();
   }
 
   @Override
@@ -635,13 +393,12 @@ public class RegionProcedureStore extends ProcedureStoreBase {
       for (long subProcId : subProcIds) {
         serializeDelete(subProcId, mutations, rowsToLock);
       }
-      region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
+      localStore.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
     } catch (IOException e) {
       LOG.error(HBaseMarkers.FATAL, "Failed to delete parent proc {}, sub pids={}", parentProc,
         Arrays.toString(subProcIds), e);
       throw new UncheckedIOException(e);
     }
-    flusherAndCompactor.onUpdate();
   }
 
   @Override
@@ -660,12 +417,11 @@ public class RegionProcedureStore extends ProcedureStoreBase {
       serializeDelete(procId, mutations, rowsToLock);
     }
     try {
-      region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
+      localStore.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
     } catch (IOException e) {
       LOG.error(HBaseMarkers.FATAL, "Failed to delete pids={}", Arrays.toString(procIds), e);
       throw new UncheckedIOException(e);
     }
-    flusherAndCompactor.onUpdate();
   }
 
   @Override
@@ -673,7 +429,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
     // actually delete the procedures if it is not the one with the max procedure id.
     List<Cell> cells = new ArrayList<Cell>();
     try (RegionScanner scanner =
-      region.getScanner(new Scan().addColumn(FAMILY, PROC_QUALIFIER).setReversed(true))) {
+      localStore.getScanner(new Scan().addColumn(PROC_FAMILY, PROC_QUALIFIER).setReversed(true))) {
       // skip the row with max procedure id
       boolean moreRows = scanner.next(cells);
       if (cells.isEmpty()) {
@@ -688,7 +444,8 @@ public class RegionProcedureStore extends ProcedureStoreBase {
         Cell cell = cells.get(0);
         cells.clear();
         if (cell.getValueLength() == 0) {
-          region.delete(new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
+          localStore.update(r -> r
+            .delete(new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())));
         }
       }
     } catch (IOException e) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/WALProcedurePrettyPrinter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/WALProcedurePrettyPrinter.java
index 1e5c142..b36e1b5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/WALProcedurePrettyPrinter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/WALProcedurePrettyPrinter.java
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hbase.procedure2.store.region;
 
-import static org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore.FAMILY;
+import static org.apache.hadoop.hbase.master.store.LocalStore.PROC_FAMILY;
 
 import java.io.PrintStream;
 import java.time.Instant;
@@ -102,8 +102,8 @@ public class WALProcedurePrettyPrinter extends AbstractHBaseTool {
           String.format(KEY_TMPL, sequenceId, FORMATTER.format(Instant.ofEpochMilli(writeTime))));
         for (Cell cell : edit.getCells()) {
           Map<String, Object> op = WALPrettyPrinter.toStringMap(cell);
-          if (!Bytes.equals(FAMILY, 0, FAMILY.length, cell.getFamilyArray(), cell.getFamilyOffset(),
-            cell.getFamilyLength())) {
+          if (!Bytes.equals(PROC_FAMILY, 0, PROC_FAMILY.length, cell.getFamilyArray(),
+            cell.getFamilyOffset(), cell.getFamilyLength())) {
             // We could have cells other than procedure edits, for example, a flush marker
             WALPrettyPrinter.printCell(out, op, false);
             continue;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java
index 684d90d..450dd6f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java
@@ -42,12 +42,11 @@ public final class HFileArchiveUtil {
    * @param tableName table name under which the store currently lives
    * @param regionName region encoded name under which the store currently lives
    * @param familyName name of the family in the store
-   * @return {@link Path} to the directory to archive the given store or
-   *         <tt>null</tt> if it should not be archived
+   * @return {@link Path} to the directory to archive the given store or <tt>null</tt> if it should
+   *         not be archived
    */
-  public static Path getStoreArchivePath(final Configuration conf,
-                                         final TableName tableName,
-      final String regionName, final String familyName) throws IOException {
+  public static Path getStoreArchivePath(final Configuration conf, final TableName tableName,
+    final String regionName, final String familyName) throws IOException {
     Path tableArchiveDir = getTableArchivePath(conf, tableName);
     return HStore.getStoreHomedir(tableArchiveDir, regionName, Bytes.toBytes(familyName));
   }
@@ -61,10 +60,8 @@ public final class HFileArchiveUtil {
    * @return {@link Path} to the directory to archive the given store or <tt>null</tt> if it should
    *         not be archived
    */
-  public static Path getStoreArchivePath(Configuration conf,
-                                         RegionInfo region,
-                                         Path tabledir,
-      byte[] family) throws IOException {
+  public static Path getStoreArchivePath(Configuration conf, RegionInfo region, Path tabledir,
+    byte[] family) throws IOException {
     return getStoreArchivePath(conf, region, family);
   }
 
@@ -84,22 +81,27 @@ public final class HFileArchiveUtil {
   }
 
   /**
-   * Gets the archive directory under specified root dir. One scenario where this is useful is
-   * when WAL and root dir are configured under different file systems,
-   * i.e. root dir on S3 and WALs on HDFS.
-   * This is mostly useful for archiving recovered edits, when
+   * Gets the archive directory under specified root dir. One scenario where this is useful is when
+   * WAL and root dir are configured under different file systems, i.e. root dir on S3 and WALs on
+   * HDFS. This is mostly useful for archiving recovered edits, when
    * <b>hbase.region.archive.recovered.edits</b> is enabled.
    * @param rootDir {@link Path} the root dir under which archive path should be created.
    * @param region parent region information under which the store currently lives
    * @param family name of the family in the store
-   * @return {@link Path} to the WAL FS directory to archive the given store
-   *         or <tt>null</tt> if it should not be archived
+   * @return {@link Path} to the WAL FS directory to archive the given store or <tt>null</tt> if it
+   *         should not be archived
    */
   public static Path getStoreArchivePathForRootDir(Path rootDir, RegionInfo region, byte[] family) {
     Path tableArchiveDir = getTableArchivePath(rootDir, region.getTable());
     return HStore.getStoreHomedir(tableArchiveDir, region, family);
   }
 
+  public static Path getStoreArchivePathForArchivePath(Path archivePath, RegionInfo region,
+    byte[] family) {
+    Path tableArchiveDir = CommonFSUtils.getTableDir(archivePath, region.getTable());
+    return HStore.getStoreHomedir(tableArchiveDir, region, family);
+  }
+
   /**
    * Get the archive directory for a given region under the specified table
    * @param tableName the table name. Cannot be null.
@@ -107,9 +109,7 @@ public final class HFileArchiveUtil {
    * @return {@link Path} to the directory to archive the given region, or <tt>null</tt> if it
    *         should not be archived
    */
-  public static Path getRegionArchiveDir(Path rootDir,
-                                         TableName tableName,
-                                         Path regiondir) {
+  public static Path getRegionArchiveDir(Path rootDir, TableName tableName, Path regiondir) {
     // get the archive directory for a table
     Path archiveDir = getTableArchivePath(rootDir, tableName);
 
@@ -126,8 +126,8 @@ public final class HFileArchiveUtil {
    * @return {@link Path} to the directory to archive the given region, or <tt>null</tt> if it
    *         should not be archived
    */
-  public static Path getRegionArchiveDir(Path rootDir,
-                                         TableName tableName, String encodedRegionName) {
+  public static Path getRegionArchiveDir(Path rootDir, TableName tableName,
+    String encodedRegionName) {
     // get the archive directory for a table
     Path archiveDir = getTableArchivePath(rootDir, tableName);
     return HRegion.getRegionDir(archiveDir, encodedRegionName);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/store/LocalRegionTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/store/LocalRegionTestBase.java
new file mode 100644
index 0000000..70c69a8
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/store/LocalRegionTestBase.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.store;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
+import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.junit.After;
+import org.junit.Before;
+
+public class LocalRegionTestBase {
+
+  protected HBaseCommonTestingUtility htu;
+
+  protected LocalRegion region;
+
+  protected ChoreService choreService;
+
+  protected DirScanPool cleanerPool;
+
+  protected static byte[] CF1 = Bytes.toBytes("f1");
+
+  protected static byte[] CF2 = Bytes.toBytes("f2");
+
+  protected static byte[] QUALIFIER = Bytes.toBytes("q");
+
+  protected static String REGION_DIR_NAME = "local";
+
+  protected static TableDescriptor TD =
+    TableDescriptorBuilder.newBuilder(TableName.valueOf("test:local"))
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1))
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF2)).build();
+
+  protected void configure(Configuration conf) throws IOException {
+  }
+
+  protected void configure(LocalRegionParams params) {
+  }
+
+  protected void postSetUp() throws IOException {
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    htu = new HBaseCommonTestingUtility();
+    htu.getConfiguration().setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
+    // Runs on local filesystem. Test does not need sync. Turn off checks.
+    htu.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
+    configure(htu.getConfiguration());
+    choreService = new ChoreService(getClass().getSimpleName());
+    cleanerPool = new DirScanPool(htu.getConfiguration());
+    Server server = mock(Server.class);
+    when(server.getConfiguration()).thenReturn(htu.getConfiguration());
+    when(server.getServerName())
+      .thenReturn(ServerName.valueOf("localhost", 12345, System.currentTimeMillis()));
+    when(server.getChoreService()).thenReturn(choreService);
+    Path testDir = htu.getDataTestDir();
+    CommonFSUtils.setRootDir(htu.getConfiguration(), testDir);
+    LocalRegionParams params = new LocalRegionParams();
+    params.server(server).regionDirName(REGION_DIR_NAME).tableDescriptor(TD)
+      .flushSize(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE).flushPerChanges(1_000_000)
+      .flushIntervalMs(TimeUnit.MINUTES.toMillis(15)).compactMin(4).maxWals(32).useHsync(false)
+      .ringBufferSlotCount(16).rollPeriodMs(TimeUnit.MINUTES.toMillis(15))
+      .archivedWalSuffix(LocalStore.ARCHIVED_WAL_SUFFIX)
+      .archivedHFileSuffix(LocalStore.ARCHIVED_HFILE_SUFFIX);
+    configure(params);
+    region = LocalRegion.create(params);
+    postSetUp();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    region.close(true);
+    cleanerPool.shutdownNow();
+    choreService.shutdown();
+    htu.cleanupTestDir();
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/store/TestLocalRegionCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/store/TestLocalRegionCompaction.java
new file mode 100644
index 0000000..d1b4058
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/store/TestLocalRegionCompaction.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.store;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
+import org.apache.hadoop.hbase.master.cleaner.TimeToLiveMasterLocalStoreHFileCleaner;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MasterTests.class, MediumTests.class })
+public class TestLocalRegionCompaction extends LocalRegionTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestLocalRegionCompaction.class);
+
+  private int compactMin = 4;
+
+  private HFileCleaner hfileCleaner;
+
+  @Override
+  protected void configure(LocalRegionParams params) {
+    params.compactMin(compactMin);
+  }
+
+  @Override
+  protected void postSetUp() throws IOException {
+    Configuration conf = htu.getConfiguration();
+    conf.setLong(TimeToLiveMasterLocalStoreHFileCleaner.TTL_CONF_KEY, 5000);
+    Path testDir = htu.getDataTestDir();
+    FileSystem fs = testDir.getFileSystem(conf);
+    Path globalArchivePath = HFileArchiveUtil.getArchivePath(conf);
+    hfileCleaner = new HFileCleaner(500, new Stoppable() {
+
+      private volatile boolean stopped = false;
+
+      @Override
+      public void stop(String why) {
+        stopped = true;
+      }
+
+      @Override
+      public boolean isStopped() {
+        return stopped;
+      }
+    }, conf, fs, globalArchivePath, cleanerPool);
+    choreService.scheduleChore(hfileCleaner);
+  }
+
+  private int getStorefilesCount() {
+    return region.region.getStores().stream().mapToInt(Store::getStorefilesCount).sum();
+  }
+
+  private void assertFileCount(FileSystem fs, Path storeArchiveDir, int expected)
+    throws IOException {
+    FileStatus[] compactedHFiles = fs.listStatus(storeArchiveDir);
+    assertEquals(expected, compactedHFiles.length);
+  }
+
+  @Test
+  public void test() throws IOException, InterruptedException {
+    for (int i = 0; i < compactMin - 1; i++) {
+      final int index = i;
+      region.update(
+        r -> r.put(new Put(Bytes.toBytes(index)).addColumn(CF1, QUALIFIER, Bytes.toBytes(index))
+          .addColumn(CF2, QUALIFIER, Bytes.toBytes(index))));
+      region.flush(true);
+    }
+    assertEquals(2 * (compactMin - 1), getStorefilesCount());
+    region.update(r -> r.put(new Put(Bytes.toBytes(compactMin - 1)).addColumn(CF1, QUALIFIER,
+      Bytes.toBytes(compactMin - 1))));
+    region.flusherAndCompactor.requestFlush();
+    htu.waitFor(15000, () -> getStorefilesCount() == 2);
+    Path store1ArchiveDir = HFileArchiveUtil.getStoreArchivePathForRootDir(htu.getDataTestDir(),
+      region.region.getRegionInfo(), CF1);
+    Path store2ArchiveDir = HFileArchiveUtil.getStoreArchivePathForRootDir(htu.getDataTestDir(),
+      region.region.getRegionInfo(), CF2);
+    FileSystem fs = store1ArchiveDir.getFileSystem(htu.getConfiguration());
+    // after compaction, the old hfiles should have been compacted
+    htu.waitFor(15000, () -> {
+      try {
+        FileStatus[] fses1 = fs.listStatus(store1ArchiveDir);
+        FileStatus[] fses2 = fs.listStatus(store2ArchiveDir);
+        return fses1 != null && fses1.length == compactMin && fses2 != null &&
+          fses2.length == compactMin - 1;
+      } catch (FileNotFoundException e) {
+        return false;
+      }
+    });
+    // ttl has not expired, so should not delete any files
+    Thread.sleep(1000);
+    FileStatus[] compactedHFiles = fs.listStatus(store1ArchiveDir);
+    assertEquals(compactMin, compactedHFiles.length);
+    assertFileCount(fs, store2ArchiveDir, compactMin - 1);
+    Thread.sleep(2000);
+    // touch one file
+
+    long currentTime = System.currentTimeMillis();
+    fs.setTimes(compactedHFiles[0].getPath(), currentTime, currentTime);
+    Thread.sleep(3000);
+    // only the touched file is still there after clean up
+    FileStatus[] remainingHFiles = fs.listStatus(store1ArchiveDir);
+    assertEquals(1, remainingHFiles.length);
+    assertEquals(compactedHFiles[0].getPath(), remainingHFiles[0].getPath());
+    assertFalse(fs.exists(store2ArchiveDir));
+    Thread.sleep(6000);
+    // the touched file should also be cleaned up and then the cleaner will delete the parent
+    // directory since it is empty.
+    assertFalse(fs.exists(store1ArchiveDir));
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/store/TestLocalRegionFlush.java
similarity index 82%
rename from hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreFlush.java
rename to hbase-server/src/test/java/org/apache/hadoop/hbase/master/store/TestLocalRegionFlush.java
index 288696e..48bbba3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreFlush.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/store/TestLocalRegionFlush.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.procedure2.store.region;
+package org.apache.hadoop.hbase.master.store;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -25,13 +25,17 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
@@ -43,17 +47,17 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 @Category({ MasterTests.class, MediumTests.class })
-public class TestRegionProcedureStoreFlush {
+public class TestLocalRegionFlush {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestRegionProcedureStoreFlush.class);
+    HBaseClassTestRule.forClass(TestLocalRegionFlush.class);
 
   private Configuration conf;
 
   private HRegion region;
 
-  private RegionFlusherAndCompactor flusher;
+  private LocalRegionFlusherAndCompactor flusher;
 
   private AtomicInteger flushCalled;
 
@@ -68,6 +72,8 @@ public class TestRegionProcedureStoreFlush {
     HStore store = mock(HStore.class);
     when(store.getStorefilesCount()).thenReturn(1);
     when(region.getStores()).thenReturn(Collections.singletonList(store));
+    when(region.getRegionInfo())
+      .thenReturn(RegionInfoBuilder.newBuilder(TableName.valueOf("hbase:local")).build());
     flushCalled = new AtomicInteger(0);
     memstoreHeapSize = new AtomicLong(0);
     memstoreOffHeapSize = new AtomicLong(0);
@@ -90,8 +96,8 @@ public class TestRegionProcedureStoreFlush {
     }
   }
 
-  private void initFlusher() {
-    flusher = new RegionFlusherAndCompactor(conf, new Abortable() {
+  private void initFlusher(long flushSize, long flushPerChanges, long flushIntervalMs) {
+    flusher = new LocalRegionFlusherAndCompactor(conf, new Abortable() {
 
       @Override
       public boolean isAborted() {
@@ -101,13 +107,12 @@ public class TestRegionProcedureStoreFlush {
       @Override
       public void abort(String why, Throwable e) {
       }
-    }, region);
+    }, region, flushSize, flushPerChanges, flushIntervalMs, 4, new Path("/tmp"), "");
   }
 
   @Test
   public void testTriggerFlushBySize() throws IOException, InterruptedException {
-    conf.setLong(RegionFlusherAndCompactor.FLUSH_SIZE_KEY, 1024 * 1024);
-    initFlusher();
+    initFlusher(1024 * 1024, 1_000_000, TimeUnit.MINUTES.toMillis(15));
     memstoreHeapSize.set(1000 * 1024);
     flusher.onUpdate();
     Thread.sleep(1000);
@@ -130,16 +135,14 @@ public class TestRegionProcedureStoreFlush {
 
   @Test
   public void testTriggerFlushByChanges() throws InterruptedException {
-    conf.setLong(RegionFlusherAndCompactor.FLUSH_PER_CHANGES_KEY, 10);
-    initFlusher();
+    initFlusher(128 * 1024 * 1024, 10, TimeUnit.MINUTES.toMillis(15));
     assertTriggerFlushByChanges(10);
     assertTriggerFlushByChanges(10);
   }
 
   @Test
   public void testPeriodicalFlush() throws InterruptedException {
-    conf.setLong(RegionFlusherAndCompactor.FLUSH_INTERVAL_MS_KEY, 1000);
-    initFlusher();
+    initFlusher(128 * 1024 * 1024, 1_000_000, TimeUnit.SECONDS.toMillis(1));
     assertEquals(0, flushCalled.get());
     Thread.sleep(1500);
     assertEquals(1, flushCalled.get());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/store/TestLocalRegionOnTwoFileSystems.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/store/TestLocalRegionOnTwoFileSystems.java
new file mode 100644
index 0000000..94cfe40
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/store/TestLocalRegionOnTwoFileSystems.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.store;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
+
+@Category({ MasterTests.class, MediumTests.class })
+public class TestLocalRegionOnTwoFileSystems {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestLocalRegionOnTwoFileSystems.class);
+
+  private static final HBaseCommonTestingUtility HFILE_UTIL = new HBaseCommonTestingUtility();
+
+  private static final HBaseTestingUtility WAL_UTIL = new HBaseTestingUtility();
+
+  private static byte[] CF = Bytes.toBytes("f");
+
+  private static byte[] CQ = Bytes.toBytes("q");
+
+  private static String REGION_DIR_NAME = "local";
+
+  private static TableDescriptor TD =
+    TableDescriptorBuilder.newBuilder(TableName.valueOf("test:local"))
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF)).build();
+
+  private static int COMPACT_MIN = 4;
+
+  private LocalRegion region;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    WAL_UTIL.startMiniCluster(3);
+    Configuration conf = HFILE_UTIL.getConfiguration();
+    conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
+    Path rootDir = HFILE_UTIL.getDataTestDir();
+    CommonFSUtils.setRootDir(conf, rootDir);
+    Path walRootDir = WAL_UTIL.getDataTestDirOnTestFS();
+    FileSystem walFs = WAL_UTIL.getTestFileSystem();
+    CommonFSUtils.setWALRootDir(conf,
+      walRootDir.makeQualified(walFs.getUri(), walFs.getWorkingDirectory()));
+
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    WAL_UTIL.shutdownMiniDFSCluster();
+    WAL_UTIL.cleanupTestDir();
+    HFILE_UTIL.cleanupTestDir();
+  }
+
+  private LocalRegion createLocalRegion(ServerName serverName) throws IOException {
+    Server server = mock(Server.class);
+    when(server.getConfiguration()).thenReturn(HFILE_UTIL.getConfiguration());
+    when(server.getServerName()).thenReturn(serverName);
+    LocalRegionParams params = new LocalRegionParams();
+    params.server(server).regionDirName(REGION_DIR_NAME).tableDescriptor(TD)
+      .flushSize(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE).flushPerChanges(1_000_000)
+      .flushIntervalMs(TimeUnit.MINUTES.toMillis(15)).compactMin(COMPACT_MIN).maxWals(32)
+      .useHsync(false).ringBufferSlotCount(16).rollPeriodMs(TimeUnit.MINUTES.toMillis(15))
+      .archivedWalSuffix(LocalStore.ARCHIVED_WAL_SUFFIX)
+      .archivedHFileSuffix(LocalStore.ARCHIVED_HFILE_SUFFIX);
+    return LocalRegion.create(params);
+  }
+
+  @Before
+  public void setUpBeforeTest() throws IOException {
+    Path rootDir = HFILE_UTIL.getDataTestDir();
+    FileSystem fs = rootDir.getFileSystem(HFILE_UTIL.getConfiguration());
+    fs.delete(rootDir, true);
+    Path walRootDir = WAL_UTIL.getDataTestDirOnTestFS();
+    FileSystem walFs = WAL_UTIL.getTestFileSystem();
+    walFs.delete(walRootDir, true);
+    region = createLocalRegion(ServerName.valueOf("localhost", 12345, System.currentTimeMillis()));
+  }
+
+  @After
+  public void tearDownAfterTest() {
+    region.close(true);
+  }
+
+  private int getStorefilesCount() {
+    return Iterables.getOnlyElement(region.region.getStores()).getStorefilesCount();
+  }
+
+  @Test
+  public void testFlushAndCompact() throws Exception {
+    for (int i = 0; i < COMPACT_MIN - 1; i++) {
+      final int index = i;
+      region
+        .update(r -> r.put(new Put(Bytes.toBytes(index)).addColumn(CF, CQ, Bytes.toBytes(index))));
+      region.flush(true);
+    }
+    region.requestRollAll();
+    region.waitUntilWalRollFinished();
+    region.update(r -> r.put(
+      new Put(Bytes.toBytes(COMPACT_MIN - 1)).addColumn(CF, CQ, Bytes.toBytes(COMPACT_MIN - 1))));
+    region.flusherAndCompactor.requestFlush();
+
+    HFILE_UTIL.waitFor(15000, () -> getStorefilesCount() == 1);
+
+    // make sure the archived hfiles are on the root fs
+    Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePathForRootDir(
+      HFILE_UTIL.getDataTestDir(), region.region.getRegionInfo(), CF);
+    FileSystem rootFs = storeArchiveDir.getFileSystem(HFILE_UTIL.getConfiguration());
+    HFILE_UTIL.waitFor(15000, () -> {
+      try {
+        FileStatus[] fses = rootFs.listStatus(storeArchiveDir);
+        return fses != null && fses.length == COMPACT_MIN;
+      } catch (FileNotFoundException e) {
+        return false;
+      }
+    });
+
+    // make sure the archived wal files are on the wal fs
+    Path walArchiveDir = new Path(CommonFSUtils.getWALRootDir(HFILE_UTIL.getConfiguration()),
+      HConstants.HREGION_OLDLOGDIR_NAME);
+    HFILE_UTIL.waitFor(15000, () -> {
+      try {
+        FileStatus[] fses = WAL_UTIL.getTestFileSystem().listStatus(walArchiveDir);
+        return fses != null && fses.length == 1;
+      } catch (FileNotFoundException e) {
+        return false;
+      }
+    });
+  }
+
+  @Test
+  public void testRecovery() throws IOException {
+    int countPerRound = 100;
+    for (int round = 0; round < 5; round++) {
+      for (int i = 0; i < countPerRound; i++) {
+        int row = round * countPerRound + i;
+        Put put = new Put(Bytes.toBytes(row)).addColumn(CF, CQ, Bytes.toBytes(row));
+        region.update(r -> r.put(put));
+      }
+      region.close(true);
+      region = createLocalRegion(
+        ServerName.valueOf("localhost", 12345, System.currentTimeMillis() + round + 1));
+      try (RegionScanner scanner = region.getScanner(new Scan())) {
+        List<Cell> cells = new ArrayList<>();
+        boolean moreValues = true;
+        for (int i = 0; i < (round + 1) * countPerRound; i++) {
+          assertTrue(moreValues);
+          moreValues = scanner.next(cells);
+          assertEquals(1, cells.size());
+          Result result = Result.create(cells);
+          cells.clear();
+          assertEquals(i, Bytes.toInt(result.getRow()));
+          assertEquals(i, Bytes.toInt(result.getValue(CF, CQ)));
+        }
+        assertFalse(moreValues);
+      }
+    }
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreWALCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/store/TestLocalRegionWALCleaner.java
similarity index 56%
rename from hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreWALCleaner.java
rename to hbase-server/src/test/java/org/apache/hadoop/hbase/master/store/TestLocalRegionWALCleaner.java
index b4290da..052b1e6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreWALCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/store/TestLocalRegionWALCleaner.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.procedure2.store.region;
+package org.apache.hadoop.hbase.master.store;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -26,62 +26,39 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ChoreService;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
-import org.apache.hadoop.hbase.master.cleaner.TimeToLiveProcedureWALCleaner;
-import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
-import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
+import org.apache.hadoop.hbase.master.cleaner.TimeToLiveMasterLocalStoreWALCleaner;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.CommonFSUtils;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 @Category({ MasterTests.class, MediumTests.class })
-public class TestRegionProcedureStoreWALCleaner {
+public class TestLocalRegionWALCleaner extends LocalRegionTestBase {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestRegionProcedureStoreWALCleaner.class);
+    HBaseClassTestRule.forClass(TestLocalRegionWALCleaner.class);
 
-  private HBaseCommonTestingUtility htu;
-
-  private FileSystem fs;
-
-  private RegionProcedureStore store;
-
-  private ChoreService choreService;
-
-  private DirScanPool dirScanPool;
+  private static long TTL_MS = 5000;
 
   private LogCleaner logCleaner;
 
   private Path globalWALArchiveDir;
 
-  @Before
-  public void setUp() throws IOException {
-    htu = new HBaseCommonTestingUtility();
+  @Override
+  protected void postSetUp() throws IOException {
     Configuration conf = htu.getConfiguration();
-    conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
-    // Runs on local filesystem. Test does not need sync. Turn off checks.
-    htu.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
+    conf.setLong(TimeToLiveMasterLocalStoreWALCleaner.TTL_CONF_KEY, TTL_MS);
     Path testDir = htu.getDataTestDir();
-    fs = testDir.getFileSystem(conf);
-    CommonFSUtils.setWALRootDir(conf, testDir);
+    FileSystem fs = testDir.getFileSystem(conf);
     globalWALArchiveDir = new Path(testDir, HConstants.HREGION_OLDLOGDIR_NAME);
-    choreService = new ChoreService("Region-Procedure-Store");
-    dirScanPool = new DirScanPool(conf);
-    conf.setLong(TimeToLiveProcedureWALCleaner.TTL_CONF_KEY, 5000);
-    conf.setInt(HMaster.HBASE_MASTER_CLEANER_INTERVAL, 1000);
     logCleaner = new LogCleaner(1000, new Stoppable() {
 
       private volatile boolean stopped = false;
@@ -95,30 +72,21 @@ public class TestRegionProcedureStoreWALCleaner {
       public boolean isStopped() {
         return stopped;
       }
-    }, conf, fs, globalWALArchiveDir, dirScanPool);
+    }, conf, fs, globalWALArchiveDir, cleanerPool);
     choreService.scheduleChore(logCleaner);
-    store = RegionProcedureStoreTestHelper.createStore(conf, choreService, dirScanPool,
-      new LoadCounter());
-  }
-
-  @After
-  public void tearDown() throws IOException {
-    store.stop(true);
-    logCleaner.cancel();
-    dirScanPool.shutdownNow();
-    choreService.shutdown();
-    htu.cleanupTestDir();
   }
 
   @Test
   public void test() throws IOException, InterruptedException {
-    RegionProcedureStoreTestProcedure proc = new RegionProcedureStoreTestProcedure();
-    store.insert(proc, null);
-    store.region.flush(true);
+    region
+      .update(r -> r.put(new Put(Bytes.toBytes(1)).addColumn(CF1, QUALIFIER, Bytes.toBytes(1))));
+    region.flush(true);
+    Path testDir = htu.getDataTestDir();
+    FileSystem fs = testDir.getFileSystem(htu.getConfiguration());
     // no archived wal files yet
     assertFalse(fs.exists(globalWALArchiveDir));
-    store.walRoller.requestRollAll();
-    store.walRoller.waitUntilWalRollFinished();
+    region.requestRollAll();
+    region.waitUntilWalRollFinished();
     // should have one
     FileStatus[] files = fs.listStatus(globalWALArchiveDir);
     assertEquals(1, files.length);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStorePerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStorePerformanceEvaluation.java
index fa8b151..2822fa9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStorePerformanceEvaluation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStorePerformanceEvaluation.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
-import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
+import org.apache.hadoop.hbase.master.store.LocalStore;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStorePerformanceEvaluation;
 import org.apache.hadoop.hbase.regionserver.ChunkCreator;
 import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
@@ -47,19 +47,15 @@ public class RegionProcedureStorePerformanceEvaluation
     private final ServerName serverName =
       ServerName.valueOf("localhost", 12345, System.currentTimeMillis());
 
-    private final ChoreService choreService;
-
     private volatile boolean abort = false;
 
     public MockServer(Configuration conf) {
       this.conf = conf;
-      this.choreService = new ChoreService("Cleaner-Chore-Service");
     }
 
     @Override
     public void abort(String why, Throwable e) {
       abort = true;
-      choreService.shutdown();
     }
 
     @Override
@@ -69,7 +65,6 @@ public class RegionProcedureStorePerformanceEvaluation
 
     @Override
     public void stop(String why) {
-      choreService.shutdown();
     }
 
     @Override
@@ -114,11 +109,11 @@ public class RegionProcedureStorePerformanceEvaluation
 
     @Override
     public ChoreService getChoreService() {
-      return choreService;
+      throw new UnsupportedOperationException();
     }
   }
 
-  private DirScanPool cleanerPool;
+  private LocalStore localStore;
 
   @Override
   protected RegionProcedureStore createProcedureStore(Path storeDir) throws IOException {
@@ -132,10 +127,11 @@ public class RegionProcedureStorePerformanceEvaluation
     int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT);
     ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage,
       initialCountPercentage, null);
-    conf.setBoolean(RegionProcedureStore.USE_HSYNC_KEY, "hsync".equals(syncType));
+    conf.setBoolean(LocalStore.USE_HSYNC_KEY, "hsync".equals(syncType));
     CommonFSUtils.setRootDir(conf, storeDir);
-    cleanerPool = new DirScanPool(conf);
-    return new RegionProcedureStore(new MockServer(conf), cleanerPool, (fs, apth) -> {
+    MockServer server = new MockServer(conf);
+    localStore = LocalStore.create(server);
+    return new RegionProcedureStore(server, localStore, (fs, apth) -> {
     });
   }
 
@@ -152,7 +148,7 @@ public class RegionProcedureStorePerformanceEvaluation
 
   @Override
   protected void postStop(RegionProcedureStore store) throws IOException {
-    cleanerPool.shutdownNow();
+    localStore.close(true);
   }
 
   public static void main(String[] args) throws IOException {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestBase.java
index dde04a4..3dd00d2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestBase.java
@@ -20,9 +20,9 @@ package org.apache.hadoop.hbase.procedure2.store.region;
 import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ChoreService;
 import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
-import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.master.store.LocalStore;
 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
 import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
@@ -30,42 +30,35 @@ import org.junit.After;
 import org.junit.Before;
 
 /**
- * This runs on local filesystem. hsync and hflush are not supported. May lose data!
- * Only use where data loss is not of consequence.
+ * This runs on local filesystem. hsync and hflush are not supported. May lose data! Only use where
+ * data loss is not of consequence.
  */
 public class RegionProcedureStoreTestBase {
 
   protected HBaseCommonTestingUtility htu;
 
-  protected RegionProcedureStore store;
-
-  protected ChoreService choreService;
-
-  protected DirScanPool cleanerPool;
+  protected LocalStore localStore;
 
-  protected void configure(Configuration conf) {
-  }
+  protected RegionProcedureStore store;
 
   @Before
   public void setUp() throws IOException {
     htu = new HBaseCommonTestingUtility();
-    htu.getConfiguration().setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
+    Configuration conf = htu.getConfiguration();
+    conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
     // Runs on local filesystem. Test does not need sync. Turn off checks.
-    htu.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
-    configure(htu.getConfiguration());
+    conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
     Path testDir = htu.getDataTestDir();
-    CommonFSUtils.setWALRootDir(htu.getConfiguration(), testDir);
-    choreService = new ChoreService(getClass().getSimpleName());
-    cleanerPool = new DirScanPool(htu.getConfiguration());
-    store = RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), choreService,
-      cleanerPool, new LoadCounter());
+    CommonFSUtils.setRootDir(htu.getConfiguration(), testDir);
+    Server server = RegionProcedureStoreTestHelper.mockServer(conf);
+    localStore = LocalStore.create(server);
+    store = RegionProcedureStoreTestHelper.createStore(server, localStore, new LoadCounter());
   }
 
   @After
   public void tearDown() throws IOException {
     store.stop(true);
-    cleanerPool.shutdownNow();
-    choreService.shutdown();
+    localStore.close(true);
     htu.cleanupTestDir();
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestHelper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestHelper.java
index 5497b8a..396574e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestHelper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestHelper.java
@@ -24,10 +24,9 @@ import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ChoreService;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
+import org.apache.hadoop.hbase.master.store.LocalStore;
 import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
 
@@ -36,14 +35,17 @@ final class RegionProcedureStoreTestHelper {
   private RegionProcedureStoreTestHelper() {
   }
 
-  static RegionProcedureStore createStore(Configuration conf, ChoreService choreService,
-    DirScanPool cleanerPool, ProcedureLoader loader) throws IOException {
+  static Server mockServer(Configuration conf) {
     Server server = mock(Server.class);
     when(server.getConfiguration()).thenReturn(conf);
     when(server.getServerName())
       .thenReturn(ServerName.valueOf("localhost", 12345, System.currentTimeMillis()));
-    when(server.getChoreService()).thenReturn(choreService);
-    RegionProcedureStore store = new RegionProcedureStore(server, cleanerPool, new LeaseRecovery() {
+    return server;
+  }
+
+  static RegionProcedureStore createStore(Server server, LocalStore localStore,
+    ProcedureLoader loader) throws IOException {
+    RegionProcedureStore store = new RegionProcedureStore(server, localStore, new LeaseRecovery() {
 
       @Override
       public void recoverFileLease(FileSystem fs, Path path) throws IOException {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestHFileProcedurePrettyPrinter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestHFileProcedurePrettyPrinter.java
index 27d20a5..684da19 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestHFileProcedurePrettyPrinter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestHFileProcedurePrettyPrinter.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.master.store.LocalStore;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -93,16 +94,15 @@ public class TestHFileProcedurePrettyPrinter extends RegionProcedureStoreTestBas
       store.insert(proc, null);
       procs.add(proc);
     }
-    store.region.flush(true);
+    store.localStore.flush(true);
     for (int i = 0; i < 5; i++) {
       store.delete(procs.get(i).getProcId());
     }
-    store.region.flush(true);
+    store.localStore.flush(true);
     store.cleanup();
-    store.region.flush(true);
+    store.localStore.flush(true);
     Path tableDir = CommonFSUtils.getTableDir(
-      new Path(htu.getDataTestDir(), RegionProcedureStore.MASTER_PROCEDURE_DIR),
-      RegionProcedureStore.TABLE_NAME);
+      new Path(htu.getDataTestDir(), LocalStore.MASTER_STORE_DIR), LocalStore.TABLE_NAME);
     FileSystem fs = tableDir.getFileSystem(htu.getConfiguration());
     Path regionDir =
       fs.listStatus(tableDir, p -> RegionInfo.isEncodedRegionName(Bytes.toBytes(p.getName())))[0]
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java
index 178e78d..1f4ceb4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java
@@ -126,24 +126,24 @@ public class TestRegionProcedureStore extends RegionProcedureStoreTestBase {
     assertEquals(1, loader.getRunnableCount());
 
     // the row should still be there
-    assertTrue(store.region
+    assertTrue(store.localStore
       .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
-    assertTrue(store.region
+    assertTrue(store.localStore
       .get(new Get(Bytes.toBytes(proc2.getProcId())).setCheckExistenceOnly(true)).getExists());
 
     // proc2 will be deleted after cleanup, but proc3 should still be there as it holds the max proc
     // id
     store.cleanup();
-    assertTrue(store.region
+    assertTrue(store.localStore
       .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
-    assertFalse(store.region
+    assertFalse(store.localStore
       .get(new Get(Bytes.toBytes(proc2.getProcId())).setCheckExistenceOnly(true)).getExists());
 
     RegionProcedureStoreTestProcedure proc4 = new RegionProcedureStoreTestProcedure();
     store.insert(proc4, null);
     store.cleanup();
     // proc3 should also be deleted as now proc4 holds the max proc id
-    assertFalse(store.region
+    assertFalse(store.localStore
       .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
   }
 
@@ -227,7 +227,7 @@ public class TestRegionProcedureStore extends RegionProcedureStoreTestBase {
 
       @Override
       public void setResponse(Message param, CellScanner cells, Throwable errorThrowable,
-          String error) {
+        String error) {
       }
 
       @Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreCompaction.java
deleted file mode 100644
index 15682bb..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreCompaction.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.procedure2.store.region;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
-import org.apache.hadoop.hbase.testclassification.MasterTests;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.HFileArchiveUtil;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
-
-@Category({ MasterTests.class, MediumTests.class })
-public class TestRegionProcedureStoreCompaction extends RegionProcedureStoreTestBase {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestRegionProcedureStoreCompaction.class);
-
-  private int compactMin = 4;
-
-  @Override
-  protected void configure(Configuration conf) {
-    conf.setInt(RegionFlusherAndCompactor.COMPACT_MIN_KEY, compactMin);
-    conf.setInt(HMaster.HBASE_MASTER_CLEANER_INTERVAL, 500);
-    conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 5000);
-  }
-
-  private int getStorefilesCount() {
-    return Iterables.getOnlyElement(store.region.getStores()).getStorefilesCount();
-  }
-
-  @Test
-  public void test() throws IOException, InterruptedException {
-    for (int i = 0; i < compactMin - 1; i++) {
-      store.insert(new RegionProcedureStoreTestProcedure(), null);
-      store.region.flush(true);
-    }
-    assertEquals(compactMin - 1, getStorefilesCount());
-    store.insert(new RegionProcedureStoreTestProcedure(), null);
-    store.flusherAndCompactor.requestFlush();
-    htu.waitFor(15000, () -> getStorefilesCount() == 1);
-    Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePathForRootDir(
-      new Path(htu.getDataTestDir(), RegionProcedureStore.MASTER_PROCEDURE_DIR),
-      store.region.getRegionInfo(), RegionProcedureStore.FAMILY);
-    FileSystem fs = storeArchiveDir.getFileSystem(htu.getConfiguration());
-    // after compaction, the old hfiles should have been compacted
-    htu.waitFor(15000, () -> {
-      try {
-        FileStatus[] fses = fs.listStatus(storeArchiveDir);
-        return fses != null && fses.length == compactMin;
-      } catch (FileNotFoundException e) {
-        return false;
-      }
-    });
-    // ttl has not expired, so should not delete any files
-    Thread.sleep(1000);
-    FileStatus[] compactedHFiles = fs.listStatus(storeArchiveDir);
-    assertEquals(4, compactedHFiles.length);
-    Thread.sleep(2000);
-    // touch one file
-    long currentTime = System.currentTimeMillis();
-    fs.setTimes(compactedHFiles[0].getPath(), currentTime, currentTime);
-    Thread.sleep(3000);
-    // only the touched file is still there after clean up
-    FileStatus[] remainingHFiles = fs.listStatus(storeArchiveDir);
-    assertEquals(1, remainingHFiles.length);
-    assertEquals(compactedHFiles[0].getPath(), remainingHFiles[0].getPath());
-    Thread.sleep(6000);
-    // the touched file should also be cleaned up and then the cleaner will delete the parent
-    // directory since it is empty.
-    assertFalse(fs.exists(storeArchiveDir));
-  }
-}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreMigration.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreMigration.java
index 2512d0f..effa751 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreMigration.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreMigration.java
@@ -32,14 +32,14 @@ import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ChoreService;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
 import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
-import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
+import org.apache.hadoop.hbase.master.store.LocalStore;
 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
 import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
@@ -65,13 +65,13 @@ public class TestRegionProcedureStoreMigration {
 
   private HBaseCommonTestingUtility htu;
 
-  private RegionProcedureStore store;
+  private Server server;
 
-  private WALProcedureStore walStore;
+  private LocalStore localStore;
 
-  private ChoreService choreService;
+  private RegionProcedureStore store;
 
-  private DirScanPool cleanerPool;
+  private WALProcedureStore walStore;
 
   @Before
   public void setUp() throws IOException {
@@ -81,7 +81,7 @@ public class TestRegionProcedureStoreMigration {
     // Runs on local filesystem. Test does not need sync. Turn off checks.
     htu.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
     Path testDir = htu.getDataTestDir();
-    CommonFSUtils.setWALRootDir(conf, testDir);
+    CommonFSUtils.setRootDir(conf, testDir);
     walStore = new WALProcedureStore(conf, new LeaseRecovery() {
 
       @Override
@@ -91,8 +91,8 @@ public class TestRegionProcedureStoreMigration {
     walStore.start(1);
     walStore.recoverLease();
     walStore.load(new LoadCounter());
-    choreService = new ChoreService(getClass().getSimpleName());
-    cleanerPool = new DirScanPool(htu.getConfiguration());
+    server = RegionProcedureStoreTestHelper.mockServer(conf);
+    localStore = LocalStore.create(server);
   }
 
   @After
@@ -100,9 +100,8 @@ public class TestRegionProcedureStoreMigration {
     if (store != null) {
       store.stop(true);
     }
+    localStore.close(true);
     walStore.stop(true);
-    cleanerPool.shutdownNow();
-    choreService.shutdown();
     htu.cleanupTestDir();
   }
 
@@ -121,30 +120,29 @@ public class TestRegionProcedureStoreMigration {
     SortedSet<RegionProcedureStoreTestProcedure> loadedProcs =
       new TreeSet<>((p1, p2) -> Long.compare(p1.getProcId(), p2.getProcId()));
     MutableLong maxProcIdSet = new MutableLong(0);
-    store = RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), choreService,
-      cleanerPool, new ProcedureLoader() {
+    store = RegionProcedureStoreTestHelper.createStore(server, localStore, new ProcedureLoader() {
 
-        @Override
-        public void setMaxProcId(long maxProcId) {
-          maxProcIdSet.setValue(maxProcId);
-        }
+      @Override
+      public void setMaxProcId(long maxProcId) {
+        maxProcIdSet.setValue(maxProcId);
+      }
 
-        @Override
-        public void load(ProcedureIterator procIter) throws IOException {
-          while (procIter.hasNext()) {
-            RegionProcedureStoreTestProcedure proc =
-              (RegionProcedureStoreTestProcedure) procIter.next();
-            loadedProcs.add(proc);
-          }
+      @Override
+      public void load(ProcedureIterator procIter) throws IOException {
+        while (procIter.hasNext()) {
+          RegionProcedureStoreTestProcedure proc =
+            (RegionProcedureStoreTestProcedure) procIter.next();
+          loadedProcs.add(proc);
         }
+      }
 
-        @Override
-        public void handleCorrupted(ProcedureIterator procIter) throws IOException {
-          if (procIter.hasNext()) {
-            fail("Found corrupted procedures");
-          }
+      @Override
+      public void handleCorrupted(ProcedureIterator procIter) throws IOException {
+        if (procIter.hasNext()) {
+          fail("Found corrupted procedures");
         }
-      });
+      }
+    });
     assertEquals(10, maxProcIdSet.longValue());
     assertEquals(5, loadedProcs.size());
     int procId = 1;
@@ -168,8 +166,7 @@ public class TestRegionProcedureStoreMigration {
     walStore.stop(true);
 
     try {
-      store = RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), choreService,
-        cleanerPool, new LoadCounter());
+      store = RegionProcedureStoreTestHelper.createStore(server, localStore, new LoadCounter());
       fail("Should fail since AssignProcedure is not supported");
     } catch (HBaseIOException e) {
       assertThat(e.getMessage(), startsWith("Unsupported"));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestWALProcedurePrettyPrinter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestWALProcedurePrettyPrinter.java
index 6ad16c9..3fde2c0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestWALProcedurePrettyPrinter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestWALProcedurePrettyPrinter.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.master.store.LocalStore;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.util.ToolRunner;
@@ -57,18 +58,18 @@ public class TestWALProcedurePrettyPrinter extends RegionProcedureStoreTestBase
       store.insert(proc, null);
       procs.add(proc);
     }
-    store.region.flush(true);
+    store.localStore.flush(true);
     for (int i = 0; i < 5; i++) {
       store.delete(procs.get(i).getProcId());
     }
     store.cleanup();
     Path walParentDir = new Path(htu.getDataTestDir(),
-      RegionProcedureStore.MASTER_PROCEDURE_DIR + "/" + HConstants.HREGION_LOGDIR_NAME);
+      LocalStore.MASTER_STORE_DIR + "/" + HConstants.HREGION_LOGDIR_NAME);
     FileSystem fs = walParentDir.getFileSystem(htu.getConfiguration());
     Path walDir = fs.listStatus(walParentDir)[0].getPath();
     Path walFile = fs.listStatus(walDir)[0].getPath();
-    store.walRoller.requestRollAll();
-    store.walRoller.waitUntilWalRollFinished();
+    store.localStore.requestRollAll();
+    store.localStore.waitUntilWalRollFinished();
     ByteArrayOutputStream bos = new ByteArrayOutputStream();
     PrintStream out = new PrintStream(bos);
     WALProcedurePrettyPrinter printer = new WALProcedurePrettyPrinter(out);