You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ta...@apache.org on 2021/03/16 03:31:20 UTC

[hbase] branch HBASE-24749 updated (fd649ce -> 7b58ada)

This is an automated email from the ASF dual-hosted git repository.

taklwu pushed a change to branch HBASE-24749
in repository https://gitbox.apache.org/repos/asf/hbase.git.


    omit fd649ce  HBASE-25395 Introduce PersistedStoreEngine and PersistedStoreFileManager (#2931)
     new 7b58ada  HBASE-25395 Introduce PersistedStoreEngine and PersistedStoreFileManager (#2931)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (fd649ce)
            \
             N -- N -- N   refs/heads/HBASE-24749 (7b58ada)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:


[hbase] 01/01: HBASE-25395 Introduce PersistedStoreEngine and PersistedStoreFileManager (#2931)

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

taklwu pushed a commit to branch HBASE-24749
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 7b58ada2ff3e06b2a4550d69aa121e0c1c8f3c67
Author: Tak Lon (Stephen) Wu <ta...@apache.org>
AuthorDate: Mon Mar 15 20:24:35 2021 -0700

    HBASE-25395 Introduce PersistedStoreEngine and PersistedStoreFileManager (#2931)
    
    * HBASE-25395 Introduce PersistedStoreEngine and PersistedStoreFileManager
    
    - Implement HTableStoreFilePathAccessor
    - Add loadInitialFiles to StoreFileManager Interface
    - refactor and add loadInitialFiles to the base interface,
      such each store file manager implementation could have their
      own way to discovery a list of available files instead of
      just calling fs.getStoreFiles()
    - add store instance to each store file manager implementation
    - Add PersistedStoreEngine and PersistedStoreFileManager
       that fork the in-memory object in store file manager to
       a off-memory structure that is going to be used when
       region reassign, (re)open, cluster restart and etc.
       This improvement aims for better object store (e.g. S3)
       support without calling LIST to get storefiles from
       a given column family.
    
    Reviewed-by: Andor Molnár <an...@apache.org>
    Signed-off-by: Zach York <zy...@apache.org>
---
 .../java/org/apache/hadoop/hbase/HConstants.java   |  13 +
 .../java/org/apache/hadoop/hbase/TableName.java    |   5 +
 .../org/apache/hadoop/hbase/master/HMaster.java    |   7 +
 .../AbstractStoreFilePathAccessor.java             | 111 +++++
 .../hbase/regionserver/DateTieredStoreEngine.java  |   2 +-
 .../hbase/regionserver/DefaultStoreEngine.java     |  10 +-
 .../regionserver/DefaultStoreFileManager.java      |  43 +-
 .../apache/hadoop/hbase/regionserver/HStore.java   |   3 +-
 .../regionserver/HTableStoreFilePathAccessor.java  | 156 +++++++
 .../hbase/regionserver/PersistedStoreEngine.java   |  72 +++
 .../regionserver/PersistedStoreFileManager.java    | 169 +++++++
 .../hbase/regionserver/StoreFileManager.java       |  19 +-
 .../hbase/regionserver/StoreFilePathAccessor.java  |  79 ++++
 .../hbase/regionserver/StoreFilePathUpdate.java    |  92 ++++
 .../hbase/regionserver/StoreFileTrackingUtils.java | 135 ++++++
 .../hbase/regionserver/StripeStoreEngine.java      |   3 +-
 .../hbase/regionserver/StripeStoreFileManager.java |  12 +-
 .../StoreFilePathAccessorTestBase.java             | 195 ++++++++
 .../TestHTableStoreFilePathAccessor.java           |  77 ++++
 .../regionserver/TestPersistedStoreEngine.java     | 145 ++++++
 .../TestPersistedStoreFileManager.java             | 496 +++++++++++++++++++++
 .../regionserver/TestStoreFileTrackingUtils.java   |  91 ++++
 .../regionserver/TestStripeStoreFileManager.java   |   5 +-
 23 files changed, 1918 insertions(+), 22 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 48fa00c..0417a96 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -25,6 +25,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -1646,6 +1647,18 @@ public final class HConstants {
    */
   public static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;
 
+  /**
+   * Configuration for storefile tracking feature
+   */
+  public static final String STOREFILE_TRACKING_PERSIST_ENABLED =
+    "hbase.storefile.tracking.persist.enabled";
+  public static final boolean DEFAULT_STOREFILE_TRACKING_PERSIST_ENABLED = false;
+
+  public static final String STOREFILE_TRACKING_INIT_TIMEOUT =
+    "hbase.storefile.tracking.init.timeout";
+  public static final long DEFAULT_STOREFILE_TRACKING_INIT_TIMEOUT =
+    TimeUnit.MINUTES.toMillis(5);
+
   private HConstants() {
     // Can't be instantiated with this ctor.
   }
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java
index b659d14..b8cef72 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java
@@ -93,6 +93,11 @@ public final class TableName implements Comparable<TableName> {
   public static final TableName NAMESPACE_TABLE_NAME =
     valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "namespace");
 
+  /** The storefile table's name. */
+  public static final String STOREFILE_STR = "storefile";
+  public static final TableName STOREFILE_TABLE_NAME =
+      valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, STOREFILE_STR);
+
   public static final String OLD_META_STR = ".META.";
   public static final String OLD_ROOT_STR = "-ROOT-";
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 9911f01..7b8195e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -173,6 +173,7 @@ import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshotNotifierFactory;
 import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.regionserver.StoreFileTrackingUtils;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationLoadSource;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@@ -1022,6 +1023,12 @@ public class HMaster extends HRegionServer implements MasterServices {
     getChoreService().scheduleChore(catalogJanitorChore);
     this.hbckChore = new HbckChore(this);
     getChoreService().scheduleChore(hbckChore);
+
+    // enable or cleanup storefile tracking feature
+    if (StoreFileTrackingUtils.isStoreFileTrackingPersistEnabled(conf)) {
+      StoreFileTrackingUtils.init(this);
+    }
+
     this.serverManager.startChore();
 
     // Only for rolling upgrade, where we need to migrate the data in namespace table to meta table.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractStoreFilePathAccessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractStoreFilePathAccessor.java
new file mode 100644
index 0000000..d494d86
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractStoreFilePathAccessor.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+
+@InterfaceAudience.Private
+public abstract class AbstractStoreFilePathAccessor implements StoreFilePathAccessor {
+
+  public static final String STOREFILE_INCLUDED_STR = "included";
+
+  protected static final String LIST_SEPARATOR = ";";
+  protected final Configuration conf;
+
+  public AbstractStoreFilePathAccessor(Configuration conf) {
+    this.conf = conf;
+  }
+
+  abstract String getSeparator();
+
+  abstract List<Path> getStoreFilePaths(final String tableName, final String regionName,
+    final String storeName, final String columnName) throws IOException;
+
+  @Override
+  public abstract void writeStoreFilePaths(final String tableName, final String regionName,
+    final String storeName, StoreFilePathUpdate storeFilePathUpdate)
+    throws IOException;
+
+  @Override
+  public List<Path> getIncludedStoreFilePaths(final String tableName, final String regionName,
+    final String storeName) throws IOException {
+    return getStoreFilePaths(tableName, regionName, storeName, STOREFILE_INCLUDED_STR);
+  }
+
+  protected static byte[] storeFileListToByteArray(List<Path> storeFilePaths) {
+    return Bytes.toBytes(Joiner.on(LIST_SEPARATOR).join(storeFilePaths));
+  }
+
+  protected static List<Path> byteToStoreFileList(byte[] data) {
+    List<Path> paths = new ArrayList<>();
+    if (data != null && data.length != 0) {
+      String pathString = Bytes.toString(data);
+      String[] pathStrings = StringUtils.split(pathString, LIST_SEPARATOR);
+      for (String path : pathStrings) {
+        paths.add(new Path(path));
+      }
+    }
+    return paths;
+  }
+
+  /**
+   * Get a rowkey in the order of regionName-storeName-tablename
+   *
+   * @param tableName table name
+   * @param regionName region encoded name
+   * @param storeName column family name
+   * @return a joint rowkey in the form of regionName-storeName-tablename
+   */
+  protected String getKey(final String tableName, final String regionName, final String storeName) {
+    return Joiner.on(getSeparator()).join(regionName, storeName, tableName);
+  }
+
+  protected void validate(final String tableName, final String regionName,
+    final String storeName, final String columnName) {
+    validate(tableName, regionName, storeName);
+    Preconditions.checkArgument(StringUtils.isNotBlank(columnName),
+      "column name cannot be null or empty");
+  }
+
+  protected void validate(final String tableName, final String regionName,
+    final String storeName) {
+    Preconditions
+      .checkArgument(StringUtils.isNotBlank(tableName), "table name cannot be null or empty");
+    Preconditions
+      .checkArgument(StringUtils.isNotBlank(regionName), "region name cannot be null or empty");
+    Preconditions
+      .checkArgument(StringUtils.isNotBlank(storeName), "store name cannot be null or empty");
+  }
+
+  protected void validate(final String tableName, final String regionName, final String storeName,
+    final StoreFilePathUpdate storeFilePathUpdate) {
+    validate(tableName, regionName, storeName);
+    Preconditions.checkArgument(!storeFilePathUpdate.getStoreFiles().isEmpty(),
+      "Must have storefiles to be updated");
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
index 1df953d..de8bd35 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
@@ -59,7 +59,7 @@ public class DateTieredStoreEngine extends StoreEngine<DefaultStoreFlusher,
     this.compactionPolicy = new DateTieredCompactionPolicy(conf, store);
     this.storeFileManager =
         new DefaultStoreFileManager(kvComparator, StoreFileComparators.SEQ_ID_MAX_TIMESTAMP, conf,
-            compactionPolicy.getConf());
+            compactionPolicy.getConf(), store.getRegionFileSystem(), store.getColumnFamilyName());
     this.storeFlusher = new DefaultStoreFlusher(conf, store);
     this.compactor = new DateTieredCompactor(conf, store);
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
index 58f8bbb..090c7d9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
@@ -68,9 +68,13 @@ public class DefaultStoreEngine extends StoreEngine<
     createCompactor(conf, store);
     createCompactionPolicy(conf, store);
     createStoreFlusher(conf, store);
-    storeFileManager =
-        new DefaultStoreFileManager(kvComparator, StoreFileComparators.SEQ_ID, conf,
-            compactionPolicy.getConf());
+    createStoreFileManager(conf, store, kvComparator);
+  }
+
+  protected void createStoreFileManager(Configuration conf, HStore store,
+    CellComparator kvComparator) {
+    storeFileManager = new DefaultStoreFileManager(kvComparator, StoreFileComparators.SEQ_ID, conf,
+      compactionPolicy.getConf(), store.getRegionFileSystem(), store.getColumnFamilyName());
   }
 
   protected void createCompactor(Configuration conf, HStore store) throws IOException {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
index f5c3fa7..ed705a1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
@@ -49,6 +49,8 @@ class DefaultStoreFileManager implements StoreFileManager {
   private final CompactionConfiguration comConf;
   private final int blockingFileCount;
   private final Comparator<HStoreFile> storeFileComparator;
+  private final HRegionFileSystem regionFs;
+  private final String familyName;
   /**
    * List of store files inside this store. This is an immutable list that
    * is atomically replaced when its contents change.
@@ -64,20 +66,28 @@ class DefaultStoreFileManager implements StoreFileManager {
 
   public DefaultStoreFileManager(CellComparator cellComparator,
       Comparator<HStoreFile> storeFileComparator, Configuration conf,
-      CompactionConfiguration comConf) {
+      CompactionConfiguration comConf, HRegionFileSystem regionFs,
+      String familyName) {
     this.cellComparator = cellComparator;
     this.storeFileComparator = storeFileComparator;
     this.comConf = comConf;
     this.blockingFileCount =
         conf.getInt(HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
+    this.regionFs = regionFs;
+    this.familyName = familyName;
   }
 
   @Override
-  public void loadFiles(List<HStoreFile> storeFiles) {
+  public void loadFiles(List<HStoreFile> storeFiles) throws IOException {
     this.storefiles = ImmutableList.sortedCopyOf(storeFileComparator, storeFiles);
   }
 
   @Override
+  public Collection<StoreFileInfo> loadInitialFiles() throws IOException {
+    return regionFs.getStoreFiles(familyName);
+  }
+
+  @Override
   public final Collection<HStoreFile> getStorefiles() {
     return storefiles;
   }
@@ -101,7 +111,7 @@ class DefaultStoreFileManager implements StoreFileManager {
   }
 
   @Override
-  public Collection<HStoreFile> clearCompactedFiles() {
+  public Collection<HStoreFile> clearCompactedFiles() throws IOException {
     List<HStoreFile> result = compactedfiles;
     compactedfiles = ImmutableList.of();
     return result;
@@ -119,16 +129,27 @@ class DefaultStoreFileManager implements StoreFileManager {
 
   @Override
   public void addCompactionResults(Collection<HStoreFile> newCompactedfiles,
-      Collection<HStoreFile> results) {
+    Collection<HStoreFile> results) throws IOException {
     this.storefiles = ImmutableList.sortedCopyOf(storeFileComparator, Iterables
-        .concat(Iterables.filter(storefiles, sf -> !newCompactedfiles.contains(sf)), results));
+      .concat(Iterables.filter(storefiles, sf -> !newCompactedfiles.contains(sf)), results));
+    this.compactedfiles = ImmutableList.sortedCopyOf(storeFileComparator,
+      Iterables.concat(this.compactedfiles, newCompactedfiles));
+
+    addCompactionResultsHook(storefiles);
     // Mark the files as compactedAway once the storefiles and compactedfiles list is finalized
     // Let a background thread close the actual reader on these compacted files and also
     // ensure to evict the blocks from block cache so that they are no longer in
     // cache
     newCompactedfiles.forEach(HStoreFile::markCompactedAway);
-    this.compactedfiles = ImmutableList.sortedCopyOf(storeFileComparator,
-      Iterables.concat(this.compactedfiles, newCompactedfiles));
+  }
+
+  /**
+   * additional logic after addCompactionResults() before marking compactedAway to
+   * newCompactedfiles the if any.
+   * @param storeFiles a list of store files to be processed
+   */
+  protected void addCompactionResultsHook(ImmutableList<HStoreFile> storeFiles) throws IOException {
+    // no-ops
   }
 
   @Override
@@ -203,5 +224,13 @@ class DefaultStoreFileManager implements StoreFileManager {
   public Comparator<HStoreFile> getStoreFileComparator() {
     return storeFileComparator;
   }
+
+  HRegionFileSystem getRegionFs() {
+    return regionFs;
+  }
+
+  String getFamilyName() {
+    return familyName;
+  }
 }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 99880ef..a622578 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -547,7 +547,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
    * from the given directory.
    */
   private List<HStoreFile> loadStoreFiles(boolean warmup) throws IOException {
-    Collection<StoreFileInfo> files = getRegionFileSystem().getStoreFiles(getColumnFamilyName());
+    Collection<StoreFileInfo> files =
+        this.storeEngine.getStoreFileManager().loadInitialFiles();
     return openStoreFiles(files, warmup);
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HTableStoreFilePathAccessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HTableStoreFilePathAccessor.java
new file mode 100644
index 0000000..0f87ae7
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HTableStoreFilePathAccessor.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
+/**
+ * Helper class to interact with the hbase:storefile system table
+ *
+ * <pre>
+ *   ROW-KEY              FAMILY:QUALIFIER      DATA VALUE
+ *   region-store-table   included:files        List&lt;Path&gt; filesIncludedInRead
+ * </pre>
+ *
+ * The region encoded name is set as prefix for region split loading balance, and we use the
+ * target table name as suffix such that operator can identify the records per table.
+ *
+ * included:files is used for persisting storefiles of StoreFileManager in the cases of store
+ * opens and store closes. Meanwhile compactedFiles of StoreFileManager isn't being tracked
+ * off-memory, because the updated included:files contains compactedFiles and the leftover
+ * compactedFiles are either archived when a store closes or opens.
+ *
+ * TODO we will need a followup change to introduce in-memory temporarily file, such that further
+ *      we can introduce a non-tracking temporarily storefiles left from a flush or compaction when
+ *      a regionserver crashes without closing the store properly
+ */
+
+@InterfaceAudience.Private
+public class HTableStoreFilePathAccessor extends AbstractStoreFilePathAccessor {
+
+  public static final byte[] STOREFILE_FAMILY_INCLUDED = Bytes.toBytes(STOREFILE_INCLUDED_STR);
+
+  private static final String DASH_SEPARATOR = "-";
+  private static final String STOREFILE_QUALIFIER_STR = "filepaths";
+  private static final byte[] STOREFILE_QUALIFIER = Bytes.toBytes(STOREFILE_QUALIFIER_STR);
+  private static final int STOREFILE_TABLE_VERSIONS = 3;
+
+  // TODO find a way for system table to support region split at table creation or remove this
+  //  comment when we merge into hbase:meta table
+  public static final TableDescriptor STOREFILE_TABLE_DESC =
+    TableDescriptorBuilder.newBuilder(TableName.STOREFILE_TABLE_NAME)
+      .setColumnFamily(
+        ColumnFamilyDescriptorBuilder.newBuilder(STOREFILE_FAMILY_INCLUDED)
+          .setMaxVersions(STOREFILE_TABLE_VERSIONS)
+          .setInMemory(true)
+          .build())
+      .setRegionSplitPolicyClassName(BusyRegionSplitPolicy.class.getName())
+      .build();
+
+  private final Connection connection;
+
+  public HTableStoreFilePathAccessor(Configuration conf, Connection connection) {
+    super(conf);
+    Preconditions.checkNotNull(connection, "connection cannot be null");
+    this.connection = connection;
+  }
+
+  @Override
+  List<Path> getStoreFilePaths(final String tableName, final String regionName,
+    final String storeName, final String colFamily) throws IOException {
+    validate(tableName, regionName, storeName, colFamily);
+    byte[] colFamilyBytes = Bytes.toBytes(colFamily);
+    Get get =
+      new Get(Bytes.toBytes(getKey(tableName, regionName, storeName)));
+    get.addColumn(colFamilyBytes, STOREFILE_QUALIFIER);
+    Result result = doGet(get);
+    if (result.isEmpty()) {
+      return new ArrayList<>();
+    }
+    return byteToStoreFileList(result.getValue(colFamilyBytes, STOREFILE_QUALIFIER));
+  }
+
+  @Override
+  public void writeStoreFilePaths(final String tableName, final String regionName,
+    final String storeName, StoreFilePathUpdate storeFilePathUpdate)
+    throws IOException {
+    validate(tableName, regionName, storeName, storeFilePathUpdate);
+    Put put = generatePutForStoreFilePaths(tableName, regionName, storeName, storeFilePathUpdate);
+    doPut(put);
+  }
+
+
+  private Put generatePutForStoreFilePaths(final String tableName, final String regionName,
+    final String storeName, final StoreFilePathUpdate storeFilePathUpdate) {
+    Put put = new Put(Bytes.toBytes(getKey(tableName, regionName, storeName)));
+    put.addColumn(Bytes.toBytes(STOREFILE_INCLUDED_STR), STOREFILE_QUALIFIER,
+      storeFileListToByteArray(storeFilePathUpdate.getStoreFiles()));
+    return put;
+  }
+
+  @Override
+  public void deleteStoreFilePaths(final String tableName, final String regionName,
+    final String storeName) throws IOException {
+    validate(tableName, regionName, storeName);
+    Delete delete = new Delete(
+      Bytes.toBytes(getKey(tableName, regionName, storeName)));
+    delete.addColumns(STOREFILE_FAMILY_INCLUDED, STOREFILE_QUALIFIER);
+    doDelete(Lists.newArrayList(delete));
+  }
+
+  @Override
+  String getSeparator() {
+    return DASH_SEPARATOR;
+  }
+
+  private Result doGet(final Get get) throws IOException {
+    try (Table table = connection.getTable(TableName.STOREFILE_TABLE_NAME)) {
+      return table.get(get);
+    }
+  }
+
+  private void doPut(final Put put) throws IOException {
+    try (Table table = connection.getTable(TableName.STOREFILE_TABLE_NAME)) {
+      table.put(put);
+    }
+  }
+
+  private void doDelete(final List<Delete> delete) throws IOException {
+    try (Table table = connection.getTable(TableName.STOREFILE_TABLE_NAME)) {
+      table.delete(delete);
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedStoreEngine.java
new file mode 100644
index 0000000..1c44e3e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedStoreEngine.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+
+/**
+ * The StoreEngine that implements persisted and renameless store compaction and flush
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class PersistedStoreEngine extends DefaultStoreEngine {
+
+  @Override
+  public void createComponents(
+    Configuration conf, HStore store, CellComparator kvComparator) throws IOException {
+    Preconditions.checkArgument(StoreFileTrackingUtils.isStoreFileTrackingPersistEnabled(conf));
+
+    createCompactor(conf, store);
+    createCompactionPolicy(conf, store);
+    createStoreFlusher(conf, store);
+    createStoreFileManager(conf, store, kvComparator);
+  }
+
+  @Override
+  protected void createStoreFileManager(Configuration conf, HStore store,
+    CellComparator kvComparator) {
+    TableName tableName = store.getTableName();
+    // for master region, hbase:meta and hbase:storefile table, DefaultStoreManager is used.
+    // such these tables scan from the filesystem directly
+    if (tableName.equals(TableName.META_TABLE_NAME)
+      || tableName.equals(MasterRegionFactory.TABLE_NAME)
+      || tableName.equals(TableName.STOREFILE_TABLE_NAME)) {
+      super.createStoreFileManager(conf, store, kvComparator);
+      return;
+    }
+
+    RegionServerServices regionServerServices = store.getHRegion().getRegionServerServices();
+    Connection connection = regionServerServices.getConnection();
+    boolean readOnly = store.getHRegion().isReadOnly();
+
+    storeFileManager =
+      new PersistedStoreFileManager(kvComparator, StoreFileComparators.SEQ_ID, conf,
+        compactionPolicy.getConf(), store.getRegionFileSystem(), store.getRegionInfo(),
+        store.getColumnFamilyName(),
+        StoreFileTrackingUtils.createStoreFilePathAccessor(conf, connection),
+        readOnly);
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedStoreFileManager.java
new file mode 100644
index 0000000..d3f096a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedStoreFileManager.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
+
+/**
+ * A Storefile manager that is used by {@link PersistedStoreEngine} that persists the in-memory
+ * storefile tracking to a persistent table hbase:storefile.
+ *
+ * We don't override the {@link #clearFiles()} from {@link DefaultStoreFileManager} and persist
+ * in-memory storefiles tracking, it will be reused when region reassigns on a different
+ * region server.
+ */
+@InterfaceAudience.Private
+public class PersistedStoreFileManager extends DefaultStoreFileManager {
+  private static final Logger LOG = LoggerFactory.getLogger(PersistedStoreFileManager.class);
+  private final RegionInfo regionInfo;
+  private final String tableName;
+  private final String regionName;
+  private final String storeName;
+  private final StoreFilePathAccessor accessor;
+  private final Configuration conf;
+  // only uses for warmupHRegion
+  private final boolean readOnly;
+
+  public PersistedStoreFileManager(CellComparator cellComparator,
+    Comparator<HStoreFile> storeFileComparator, Configuration conf,
+    CompactionConfiguration compactionConfiguration, HRegionFileSystem regionFs,
+    RegionInfo regionInfo, String familyName, StoreFilePathAccessor accessor, boolean readOnly) {
+    super(cellComparator, storeFileComparator, conf, compactionConfiguration, regionFs, familyName);
+    this.conf = conf;
+    this.regionInfo = regionInfo;
+    this.tableName = regionInfo.getTable().getNameAsString();
+    this.regionName = regionInfo.getEncodedName();
+    this.storeName = familyName;
+    this.accessor = accessor;
+    this.readOnly = readOnly;
+  }
+
+  public PersistedStoreFileManager(CellComparator cellComparator,
+    Comparator<HStoreFile> storeFileComparator, Configuration conf,
+    CompactionConfiguration compactionConfiguration, HRegionFileSystem regionFs,
+    RegionInfo regionInfo, String familyName, StoreFilePathAccessor accessor) {
+    this(cellComparator, storeFileComparator, conf, compactionConfiguration, regionFs, regionInfo,
+      familyName, accessor, false);
+  }
+
+  @Override
+  public void loadFiles(List<HStoreFile> storeFiles) throws IOException {
+    // update with a sorted store files
+    super.loadFiles(storeFiles);
+    Preconditions.checkArgument(storeFiles != null, "store files cannot be "
+      + "null when loading");
+    if (storeFiles.isEmpty()) {
+      LOG.warn("Other than fresh region with no store files, store files should not be empty");
+      return;
+    }
+    updatePathListToTracker(StoreFilePathUpdate.builder().withStoreFiles(getStorefiles()).build());
+  }
+
+  @Override
+  public void insertNewFiles(Collection<HStoreFile> sfs) throws IOException {
+    // concatenate the new store files
+    super.insertNewFiles(sfs);
+    // return in case of empty store files as it is a No-op, here empty files are expected
+    // during region close
+    if (CollectionUtils.isEmpty(getStorefiles())) {
+      return;
+    }
+    updatePathListToTracker(StoreFilePathUpdate.builder().withStoreFiles(getStorefiles()).build());
+  }
+
+  @Override
+  protected void addCompactionResultsHook(ImmutableList<HStoreFile> storeFiles)
+    throws IOException {
+    Preconditions.checkNotNull(storeFiles, "storeFiles cannot be null");
+    updatePathListToTracker(StoreFilePathUpdate.builder().withStoreFiles(storeFiles).build());
+  }
+
+  @Override
+  public Collection<StoreFileInfo> loadInitialFiles() throws IOException {
+    // this logic is totally different from the default implementation in DefaultStoreFileManager
+
+    List<Path> pathList = accessor.getIncludedStoreFilePaths(tableName, regionName, storeName);
+    boolean isEmptyInPersistedFilePaths = CollectionUtils.isEmpty(pathList);
+    if (isEmptyInPersistedFilePaths) {
+      // When the path accessor is returning empty result, we scan the
+      // the file storage and see if there is any existing HFiles should be loaded.
+      // the scan is a one time process when store open during region assignment.
+      //
+      // this is especially used for region and store open
+      // 1. First time migration from a filesystem based e.g. DefaultStoreFileEngine
+      // 2. After region split and merge
+      // 3. After table clone and create new HFiles directly into data directory
+      //
+      // Also we don't handle the inconsistency between storefile tracking and file system, which
+      // will be handled by a HBCK command
+      LOG.info("Cannot find tracking paths ({}) for store {} in region {} of "
+          + "table {}, fall back to scan the storage to get a list of storefiles to be opened"
+        , isEmptyInPersistedFilePaths, storeName, regionName,
+        tableName);
+      return getRegionFs().getStoreFiles(getFamilyName());
+    }
+    ArrayList<StoreFileInfo> storeFiles = new ArrayList<>();
+    for (Path storeFilePath : pathList) {
+      if (!StoreFileInfo.isValid(getRegionFs().getFileSystem().getFileStatus(storeFilePath))) {
+        LOG.warn("Invalid StoreFile: {}, and archiving it", storeFilePath);
+        getRegionFs().removeStoreFile(storeName, storeFilePath);
+        continue;
+      }
+      StoreFileInfo info = ServerRegionReplicaUtil
+        .getStoreFileInfo(conf, getRegionFs().getFileSystem(), regionInfo,
+          ServerRegionReplicaUtil.getRegionInfoForFs(regionInfo), getFamilyName(),
+          storeFilePath);
+      storeFiles.add(info);
+    }
+    return storeFiles;
+  }
+
+  void updatePathListToTracker(StoreFilePathUpdate storeFilePathUpdate) throws IOException {
+    try {
+      // if this is not a read only region, update the tracking path
+      if (!readOnly) {
+        accessor.writeStoreFilePaths(tableName, regionName, storeName, storeFilePathUpdate);
+      }
+    } catch (IOException e) {
+      String message = String.format(
+        "Failed to persist tracking paths with key %s-%s-%s to table [%s]. "
+          + "%nPaths failed to be updated are: %s",
+        regionName, storeName, tableName, TableName.STOREFILE_STR, storeFilePathUpdate);
+      LOG.error(message);
+      throw new IOException(message, e);
+    }
+  }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
index d4c4f17..11264f8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
@@ -24,11 +24,9 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
-
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.yetus.audience.InterfaceAudience;
-
 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection;
 
 /**
@@ -46,7 +44,16 @@ public interface StoreFileManager {
    * Loads the initial store files into empty StoreFileManager.
    * @param storeFiles The files to load.
    */
-  void loadFiles(List<HStoreFile> storeFiles);
+  void loadFiles(List<HStoreFile> storeFiles) throws IOException;
+
+  /**
+   * Load store files that are available for opening to perform filter-based
+   * validation
+   *
+   * @return a list of {@link StoreFileInfo} for the requested store.
+   * @throws IOException if store files cannot be listed
+   */
+  Collection<StoreFileInfo> loadInitialFiles() throws IOException;
 
   /**
    * Adds new files, either for from MemStore flush or bulk insert, into the structure.
@@ -65,7 +72,7 @@ public interface StoreFileManager {
   /**
    * Remove the compacted files
    * @param compactedFiles the list of compacted files
-   * @throws IOException
+   * @throws IOException if compacted files cannot be cleaned
    */
   void removeCompactedFiles(Collection<HStoreFile> compactedFiles) throws IOException;
 
@@ -80,7 +87,7 @@ public interface StoreFileManager {
    * accessed single threaded.
    * @return The files compacted previously.
    */
-  Collection<HStoreFile> clearCompactedFiles();
+  Collection<HStoreFile> clearCompactedFiles() throws IOException;
 
   /**
    * Gets the snapshot of the store files currently in use. Can be used for things like metrics
@@ -145,7 +152,7 @@ public interface StoreFileManager {
   /**
    * Gets the split point for the split of this set of store files (approx. middle).
    * @return The mid-point if possible.
-   * @throws IOException
+   * @throws IOException on failures
    */
   Optional<byte[]> getSplitPoint() throws IOException;
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFilePathAccessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFilePathAccessor.java
new file mode 100644
index 0000000..cc2d716
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFilePathAccessor.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Helper class to interact with the hbase storefile tracking data persisted as off-memory data
+ * from the {@link StoreFileManager}
+ *
+ * There is only a set of tracking storefiles, 'included'.
+ *
+ * e.g. list of storefile paths in 'included' should be the identical copy of the in-memory
+ * {@link HStoreFile}'s Path(s) and can be reused during region opens and region reassignment.
+ */
+@InterfaceAudience.Private
+public interface StoreFilePathAccessor {
+
+  /**
+   * Get storefile paths from the 'included' data set
+   * @param tableName name of the current table in String
+   * @param regionName name of the current region in String
+   * @param storeName name of the column family in String, to be combined with regionName to make
+   *                 the row key.
+   * @return list of StoreFile paths that should be included in reads in this store,
+   *         returns an empty list if the target cell is empty or doesn't exist.
+   * @throws IOException if a remote or network exception occurs during Get
+   */
+  List<Path> getIncludedStoreFilePaths(final String tableName, final String regionName,
+    final String storeName) throws IOException;
+
+  /**
+   * Write an entity that should be persisted into the tracking data for the
+   * specific column family of a given region
+   *
+   * it would be happened during storefile operation e.g. flush and compaction.
+   *
+   * @param tableName name of the current table in String
+   * @param regionName name of the current region in String
+   * @param storeName name of the column family in String, to be combined with regionName to make
+   *                 the row key.
+   * @param storeFilePathUpdate Updates to be persisted
+   * @throws IOException if a remote or network exception occurs during write
+   */
+  void writeStoreFilePaths(final String tableName, final String regionName,
+    final String storeName, final StoreFilePathUpdate storeFilePathUpdate) throws IOException;
+
+  /**
+   * Delete storefile paths for a tracking column family, normally used when a region-store is
+   * completely removed due to region split or merge
+   * @param tableName name of the current table in String
+   * @param regionName name of the current region in String
+   * @param storeName name of the column family in String, to be combined with regionName to make
+   *                 the row key.
+   * @throws IOException if a remote or network exception occurs during delete
+   */
+  void deleteStoreFilePaths(final String tableName, final String regionName, final String storeName)
+    throws IOException;
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFilePathUpdate.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFilePathUpdate.java
new file mode 100644
index 0000000..472be43
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFilePathUpdate.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import com.google.errorprone.annotations.RestrictedApi;
+import java.util.Collection;
+import java.util.List;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.fs.Path;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+
+@InterfaceAudience.Private
+final class StoreFilePathUpdate {
+
+  private final List<Path> storeFiles;
+
+  private StoreFilePathUpdate(final List<Path> storeFiles) {
+    Preconditions.checkNotNull(storeFiles, "StoreFiles cannot be null");
+    this.storeFiles = storeFiles;
+  }
+
+  List<Path> getStoreFiles() {
+    return storeFiles;
+  }
+
+  @Override
+  public String toString() {
+    return "StoreFilePathUpdate{" + "storeFiles=" + storeFiles + "}";
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    StoreFilePathUpdate that = (StoreFilePathUpdate) o;
+
+    return new EqualsBuilder().append(storeFiles, that.storeFiles).isEquals();
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder(17, 37).append(storeFiles).toHashCode();
+  }
+
+  static Builder builder() {
+    return new Builder();
+  }
+
+  static class Builder {
+    private List<Path> storeFiles = ImmutableList.of();
+
+    Builder withStoreFiles(Collection<HStoreFile> storeFiles) {
+      this.storeFiles = StoreFileTrackingUtils.convertStoreFilesToPaths(storeFiles);
+      return this;
+    }
+
+    @RestrictedApi(explanation = "Should only be called in tests", link = "",
+      allowedOnPath = ".*/src/test/.*")
+    Builder withStorePaths(List<Path> storeFiles) {
+      this.storeFiles = storeFiles;
+      return this;
+    }
+
+    StoreFilePathUpdate build() {
+      return new StoreFilePathUpdate(storeFiles);
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileTrackingUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileTrackingUtils.java
new file mode 100644
index 0000000..5e14663
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileTrackingUtils.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to support persistent store file tracking
+ */
+@InterfaceAudience.Private
+public final class StoreFileTrackingUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackingUtils.class);
+  public static final long SLEEP_DELTA_MS = TimeUnit.MILLISECONDS.toMillis(100);
+
+  private StoreFileTrackingUtils() {
+    // private for utility class
+  }
+
+  public static boolean isStoreFileTrackingPersistEnabled(Configuration conf) {
+    boolean isStoreTrackingPersistEnabled =
+      conf.getBoolean(HConstants.STOREFILE_TRACKING_PERSIST_ENABLED,
+        HConstants.DEFAULT_STOREFILE_TRACKING_PERSIST_ENABLED);
+    boolean isPersistedStoreEngineSet =
+      conf.get(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName())
+        .equals(PersistedStoreEngine.class.getName());
+    boolean isFeatureEnabled = isStoreTrackingPersistEnabled && isPersistedStoreEngineSet;
+    if (isStoreTrackingPersistEnabled ^ isPersistedStoreEngineSet) {
+      // check if both configuration are correct.
+      String errorMessage = String.format("please set %s to true and set store engine key %s to "
+          + "%s to enable persist storefile tracking",
+        HConstants.STOREFILE_TRACKING_PERSIST_ENABLED, StoreEngine.STORE_ENGINE_CLASS_KEY,
+        PersistedStoreEngine.class.getName());
+      throw new IllegalArgumentException(errorMessage);
+    }
+    return isFeatureEnabled;
+  }
+
+  /**
+   * if storefile tracking feature is configured, Initialize hbase:storefile table and wait for it
+   * to be online. Otherwise, look for hbase:storefile table and remove it
+   *
+   * @param masterServices masterServices
+   * @throws IOException if hbase:storefile table cannot be initialized and be online
+   */
+  public static void init(MasterServices masterServices) throws IOException {
+    createStorefileTable(masterServices);
+    waitForStoreFileTableOnline(masterServices);
+  }
+
+  public static StoreFilePathAccessor createStoreFilePathAccessor(Configuration conf,
+    Connection connection) {
+    return new HTableStoreFilePathAccessor(conf, connection);
+  }
+
+  public static List<Path> convertStoreFilesToPaths(Collection<HStoreFile> storeFiles) {
+    return storeFiles.stream().map(HStoreFile::getPath).collect(Collectors.toList());
+  }
+
+  private static void createStorefileTable(MasterServices masterServices)
+    throws IOException {
+    if (MetaTableAccessor.getTableState(masterServices.getConnection(),
+      TableName.STOREFILE_TABLE_NAME) == null) {
+      LOG.info("{} table not found. Creating...", TableName.STOREFILE_TABLE_NAME);
+      masterServices.createSystemTable(HTableStoreFilePathAccessor.STOREFILE_TABLE_DESC);
+    }
+  }
+
+  private static void waitForStoreFileTableOnline(MasterServices masterServices)
+    throws IOException {
+    try {
+      long startTime = EnvironmentEdgeManager.currentTime();
+      long timeout = masterServices.getConfiguration()
+        .getLong(HConstants.STOREFILE_TRACKING_INIT_TIMEOUT,
+          HConstants.DEFAULT_STOREFILE_TRACKING_INIT_TIMEOUT);
+      while (!isStoreFileTableAssignedAndEnabled(masterServices)) {
+        if (EnvironmentEdgeManager.currentTime() - startTime + SLEEP_DELTA_MS > timeout) {
+          throw new IOException("Time out " + timeout + " ms waiting for hbase:storefile table to "
+            + "be assigned and enabled: " + masterServices.getTableStateManager()
+            .getTableState(TableName.STOREFILE_TABLE_NAME));
+        }
+        Thread.sleep(SLEEP_DELTA_MS);
+      }
+    } catch (InterruptedException e) {
+      throw new IOException("Interrupted when wait for " + TableName.STOREFILE_TABLE_NAME
+        + " to be assigned and enabled", e);
+    }
+  }
+
+  public static boolean isStoreFileTableAssignedAndEnabled(MasterServices masterServices)
+    throws IOException {
+    return masterServices.getAssignmentManager().getRegionStates()
+      .hasTableRegionStates(TableName.STOREFILE_TABLE_NAME) && masterServices
+      .getTableStateManager().getTableState(TableName.STOREFILE_TABLE_NAME).isEnabled();
+  }
+
+  static String getFamilyFromKey(String key, String tableName, String regionName,
+    String separator) {
+    assert key.startsWith(regionName) : "Unexpected suffix for row key from hbase:storefile "
+      + "table";
+    int startIndex = regionName.length() + separator.length();
+    int endIndex = key.lastIndexOf(separator + tableName);
+    return key.substring(startIndex, endIndex);
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java
index 14863a6..8b3ddbd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java
@@ -61,7 +61,8 @@ public class StripeStoreEngine extends StoreEngine<StripeStoreFlusher,
       Configuration conf, HStore store, CellComparator comparator) throws IOException {
     this.config = new StripeStoreConfig(conf, store);
     this.compactionPolicy = new StripeCompactionPolicy(conf, store, config);
-    this.storeFileManager = new StripeStoreFileManager(comparator, conf, this.config);
+    this.storeFileManager = new StripeStoreFileManager(comparator, conf, this.config,
+        store.getRegionFileSystem(), store.getColumnFamilyName());
     this.storeFlusher = new StripeStoreFlusher(
       conf, store, this.compactionPolicy, this.storeFileManager);
     this.compactor = new StripeCompactor(conf, store);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
index beed41f..4ad5dc4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
@@ -82,6 +82,8 @@ public class StripeStoreFileManager
    */
   public final static byte[] OPEN_KEY = HConstants.EMPTY_BYTE_ARRAY;
   final static byte[] INVALID_KEY = null;
+  private final HRegionFileSystem regionFs;
+  private final String familyName;
 
   /**
    * The state class. Used solely to replace results atomically during
@@ -124,11 +126,14 @@ public class StripeStoreFileManager
   private final int blockingFileCount;
 
   public StripeStoreFileManager(
-      CellComparator kvComparator, Configuration conf, StripeStoreConfig config) {
+      CellComparator kvComparator, Configuration conf, StripeStoreConfig config,
+      HRegionFileSystem regionFs, String familyName) {
     this.cellComparator = kvComparator;
     this.config = config;
     this.blockingFileCount = conf.getInt(
         HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
+    this.regionFs = regionFs;
+    this.familyName = familyName;
   }
 
   @Override
@@ -137,6 +142,11 @@ public class StripeStoreFileManager
   }
 
   @Override
+  public Collection<StoreFileInfo> loadInitialFiles() throws IOException {
+    return regionFs.getStoreFiles(familyName);
+  }
+
+  @Override
   public Collection<HStoreFile> getStorefiles() {
     return state.allFilesCached;
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/StoreFilePathAccessorTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/StoreFilePathAccessorTestBase.java
new file mode 100644
index 0000000..a346217
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/StoreFilePathAccessorTestBase.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TestName;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
+public abstract class StoreFilePathAccessorTestBase {
+
+  @Rule
+  public TestName name = new TestName();
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  protected StoreFilePathAccessor storeFilePathAccessor;
+  protected static final String REGION_NAME = UUID.randomUUID().toString().replaceAll("-", "");
+  protected static final String STORE_NAME = UUID.randomUUID().toString();
+  protected static final List<Path> EMPTY_PATH = Collections.emptyList();
+  protected static final List<Path> INCLUDE_EXAMPLE_PATH =
+    Lists.newArrayList(new Path("hdfs://foo/bar1"), new Path("hdfs://foo/bar2"));
+  protected static final String VALID_TABLE_NAME_CHARS = "_.";
+
+  protected String tableName;
+
+  protected abstract StoreFilePathAccessor getStoreFilePathAccessor() throws IOException;
+
+  @BeforeClass
+  public static void setUpCluster() throws Exception {
+    TEST_UTIL.startMiniCluster(1);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    tableName = name.getMethodName() + VALID_TABLE_NAME_CHARS + UUID.randomUUID();
+    init();
+    storeFilePathAccessor = getStoreFilePathAccessor();
+  }
+
+  abstract void init() throws Exception;
+
+  @After
+  public void after() throws Exception {
+    cleanupTest();
+  }
+
+  abstract void cleanupTest() throws Exception;
+
+  @Test
+  public void testInitialize() throws Exception {
+    MasterServices masterServices = TEST_UTIL.getHBaseCluster().getMaster();
+    verifyInitialize(masterServices);
+  }
+
+  // this will be implemented by each implementation of StoreFilePathAccessor
+  abstract void verifyInitialize(MasterServices masterServices) throws Exception;
+
+  abstract void verifyNotInitializedException();
+
+  @Test
+  public void testIncludedStoreFilePaths() throws Exception {
+    testInitialize();
+    // verify empty list before write
+    verifyIncludedFilePaths(EMPTY_PATH);
+    writeAndVerifyIncludedFilePaths(INCLUDE_EXAMPLE_PATH);
+  }
+
+  @Test
+  public void testIncludedStoreFilePathsWithEmptyList() throws Exception {
+    expectedException.expect(IllegalArgumentException.class);
+    testInitialize();
+    // verify empty before write
+    verifyIncludedFilePaths(EMPTY_PATH);
+    // write and verify empty list fails
+    writeAndVerifyIncludedFilePaths(EMPTY_PATH);
+  }
+
+  @Test
+  public void testWriteIncludedStoreFilePathsWhenNotInitialized() throws Exception {
+    verifyNotInitializedException();
+    writeAndVerifyIncludedFilePaths(INCLUDE_EXAMPLE_PATH);
+  }
+
+  @Test
+  public void testGetIncludedStoreFilePathsWhenNotInitialized() throws Exception {
+    verifyNotInitializedException();
+    storeFilePathAccessor.getIncludedStoreFilePaths(tableName, REGION_NAME, STORE_NAME);
+  }
+
+  @Test
+  public void testWriteIncludedStoreFilePathsWithEmptyList() throws Exception {
+    expectedException.expect(IllegalArgumentException.class);
+    testInitialize();
+    // verify empty before write
+    verifyIncludedFilePaths(EMPTY_PATH);
+    // write and verify empty list fails
+    writeAndVerifyIncludedFilePaths(EMPTY_PATH);
+  }
+
+  @Test
+  public void testWriteIncludedStoreFilePaths() throws Exception {
+    testInitialize();
+    verifyIncludedFilePaths(EMPTY_PATH);
+    writeAndVerifyIncludedFilePaths(INCLUDE_EXAMPLE_PATH);
+  }
+
+  @Test
+  public void testWriteIncludedStoreFilePathsWithNull() throws Exception {
+    expectedException.expect(NullPointerException.class);
+    testInitialize();
+    // verify empty before write
+    verifyIncludedFilePaths(EMPTY_PATH);
+    // write and verify empty list fails
+    writeAndVerifyIncludedFilePaths(null);
+  }
+
+  @Test
+  public void testDeleteStoreFilePaths() throws Exception {
+    testInitialize();
+
+    // verify empty list before write
+    verifyIncludedFilePaths(EMPTY_PATH);
+    // write some date to included:files data set
+    writeAndVerifyIncludedFilePaths(INCLUDE_EXAMPLE_PATH);
+    // delete and verify both data set are empty
+    storeFilePathAccessor.deleteStoreFilePaths(tableName, REGION_NAME, STORE_NAME);
+    verifyIncludedFilePaths(EMPTY_PATH);
+  }
+
+  @Test
+  public void testDeleteStoreFilePathsWithNoData() throws Exception {
+    testInitialize();
+
+    // verify empty list before write
+    verifyIncludedFilePaths(EMPTY_PATH);
+    // delete and verify both data set are empty
+    storeFilePathAccessor.deleteStoreFilePaths(tableName, REGION_NAME, STORE_NAME);
+    verifyIncludedFilePaths(EMPTY_PATH);
+  }
+
+  @Test
+  public void testDeleteStoreFilePathsWhenNotInitialized() throws Exception {
+    expectedException.expectCause(Matchers.isA(TableNotFoundException.class));
+    storeFilePathAccessor.deleteStoreFilePaths(tableName, REGION_NAME, STORE_NAME);
+  }
+
+  protected void writeAndVerifyIncludedFilePaths(List<Path> paths) throws IOException {
+    storeFilePathAccessor.writeStoreFilePaths(tableName, REGION_NAME, STORE_NAME,
+      StoreFilePathUpdate.builder().withStorePaths(paths).build());
+    verifyIncludedFilePaths(paths);
+  }
+
+  protected void verifyIncludedFilePaths(List<Path> expectPaths) throws IOException {
+    assertEquals(expectPaths, storeFilePathAccessor
+      .getIncludedStoreFilePaths(tableName, REGION_NAME, STORE_NAME));
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHTableStoreFilePathAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHTableStoreFilePathAccessor.java
new file mode 100644
index 0000000..bf7d233
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHTableStoreFilePathAccessor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import java.io.IOException;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+
+@Category({ MiscTests.class, MediumTests.class })
+public class TestHTableStoreFilePathAccessor extends StoreFilePathAccessorTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestHTableStoreFilePathAccessor.class);
+
+  private Admin admin;
+
+  @Override
+  protected HTableStoreFilePathAccessor getStoreFilePathAccessor() {
+    return new HTableStoreFilePathAccessor(TEST_UTIL.getConfiguration(), admin.getConnection());
+  }
+
+  @Override
+  public void init() throws Exception {
+    admin = TEST_UTIL.getAdmin();
+  }
+
+  @Override
+  public void cleanupTest() throws IOException {
+    if (admin.tableExists(TableName.STOREFILE_TABLE_NAME)
+      && admin.isTableEnabled(TableName.STOREFILE_TABLE_NAME)) {
+      admin.disableTable(TableName.STOREFILE_TABLE_NAME);
+      admin.deleteTable(TableName.STOREFILE_TABLE_NAME);
+    }
+  }
+
+  @Override
+  public void verifyInitialize(MasterServices masterServices) throws Exception {
+    assertFalse(admin.tableExists(TableName.STOREFILE_TABLE_NAME));
+    StoreFileTrackingUtils.init(TEST_UTIL.getHBaseCluster().getMaster());
+    assertNotNull(TEST_UTIL.getConnection().getTable(TableName.STOREFILE_TABLE_NAME));
+    assertTrue(
+      TEST_UTIL.getMiniHBaseCluster().getRegions(TableName.STOREFILE_TABLE_NAME).size() >= 1);
+    assertTrue("hbase:storefile table must be assigned and enabled.",
+      StoreFileTrackingUtils.isStoreFileTableAssignedAndEnabled(masterServices));
+  }
+
+  @Override
+  public void verifyNotInitializedException() {
+    expectedException.expect(TableNotFoundException.class);
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedStoreEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedStoreEngine.java
new file mode 100644
index 0000000..dc2d0c7
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedStoreEngine.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.hamcrest.core.Is.isA;
+import static org.junit.Assert.assertEquals;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellComparatorImpl;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TestName;
+import org.mockito.Mockito;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestPersistedStoreEngine {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestPersistedStoreEngine.class);
+
+  @Rule
+  public TestName name = new TestName();
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static final byte[] DEFAULT_STORE_BYTE = TEST_UTIL.fam1;
+
+  private TableName tableName;
+  private Configuration conf;
+  private HStore store;
+
+  @BeforeClass
+  public static void setUpCluster() throws Exception {
+    TEST_UTIL.getConfiguration().setBoolean(HConstants.STOREFILE_TRACKING_PERSIST_ENABLED, true);
+    TEST_UTIL.getConfiguration().set(StoreEngine.STORE_ENGINE_CLASS_KEY,
+      PersistedStoreEngine.class.getName());
+    TEST_UTIL.startMiniCluster();
+  }
+
+  @Before
+  public void setup() throws Exception {
+    store = Mockito.mock(HStore.class);
+    StoreContext context = new StoreContext.Builder().build();
+
+    conf = TEST_UTIL.getConfiguration();
+    Mockito.when(store.getStoreContext()).thenReturn(context);
+    Mockito.when(store.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    for (TableDescriptor htd: TEST_UTIL.getAdmin().listTableDescriptors()) {
+      TEST_UTIL.deleteTable(htd.getTableName());
+    }
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testEngine() throws IOException {
+    tableName = TableName.valueOf(name.getMethodName());
+    createTableAndLoadData();
+    StoreEngine storeEngine = TEST_UTIL.getMiniHBaseCluster().getRegions(tableName).get(0)
+      .getStore(DEFAULT_STORE_BYTE).getStoreEngine();
+    verifyStoreEngineAndStoreFileManager(storeEngine, PersistedStoreEngine.class,
+      PersistedStoreFileManager.class);
+  }
+
+  @Test
+  public void testEngineWithStorefileTrackingPersistDisabled() throws IOException {
+    expectedException.expect(IOException.class);
+    expectedException.expectMessage("Unable to load configured store engine '"
+      + PersistedStoreEngine.class.getName() + "'");
+    expectedException.expectCause(isA(IllegalArgumentException.class));
+    conf.setBoolean(HConstants.STOREFILE_TRACKING_PERSIST_ENABLED, false);
+    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, PersistedStoreEngine.class.getName());
+    CellComparator cellComparator = new CellComparatorImpl();
+    StoreEngine.create(store, conf, cellComparator);
+  }
+
+  @Test
+  public void testEngineWithOnlyStorefileTrackingPersistEnabled() throws IOException {
+    // just setting the storefile tracking enabled will not take any consideration for store engine
+    // creation because it does not go thru PersistedStoreEngine, but the master startup will fail
+    conf.setBoolean(HConstants.STOREFILE_TRACKING_PERSIST_ENABLED, true);
+    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
+    CellComparator cellComparator = new CellComparatorImpl();
+    StoreEngine storeEngine = StoreEngine.create(store, conf, cellComparator);
+    verifyStoreEngineAndStoreFileManager(storeEngine, DefaultStoreEngine.class,
+      DefaultStoreFileManager.class);
+  }
+
+  private void createTableAndLoadData() throws IOException {
+    Table testTable = TEST_UTIL.createMultiRegionTable(tableName, DEFAULT_STORE_BYTE);
+    int loadedRows = TEST_UTIL.loadTable(testTable, DEFAULT_STORE_BYTE);
+    int actualCount = TEST_UTIL.countRows(testTable);
+    assertEquals(loadedRows, actualCount);
+  }
+
+  private void verifyStoreEngineAndStoreFileManager(StoreEngine storeEngine, Class storeEngineClass,
+    Class storeFileManagerClass) {
+    StoreFileManager storeFileManager = storeEngine.getStoreFileManager();
+    assertEquals(storeEngineClass, storeEngine.getClass());
+    assertEquals(storeFileManagerClass, storeFileManager.getClass());
+  }
+
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedStoreFileManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedStoreFileManager.java
new file mode 100644
index 0000000..6bb52cf
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedStoreFileManager.java
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellComparatorImpl;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.mockito.Mockito;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.ListUtils;
+
+@Category({ RegionServerTests.class, LargeTests.class })
+public class TestPersistedStoreFileManager {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestPersistedStoreFileManager.class);
+
+  @Rule
+  public TestName name = new TestName();
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static final byte[] DEFAULT_STORE_BYTE = TEST_UTIL.fam1;
+  private static final String DEFAULT_STORE_NAME = Bytes.toString(DEFAULT_STORE_BYTE);
+  private static final ArrayList<HStoreFile> EMPTY_LIST = new ArrayList<>();
+  private static final CellComparator DEFAULT_CELL_COMPARATOR = new CellComparatorImpl();
+  private static final Comparator<HStoreFile> COMPARATOR = StoreFileComparators.SEQ_ID;
+
+  private Path baseDir;
+  private FileSystem fs;
+  private PersistedStoreFileManager storeFileManager;
+  private StoreFilePathAccessor storeFilePathAccessor;
+  private Configuration conf;
+  private HRegion region;
+  private HRegionFileSystem regionFS;
+  private List<HStoreFile> initialStoreFiles;
+  private List<HStoreFile> sortedInitialStoreFiles;
+  private List<HStoreFile> additionalStoreFiles;
+  private List<HStoreFile> sortedAdditionalStoreFiles;
+  private List<HStoreFile> sortedCombinedStoreFiles;
+  private List<Path> initialStorePaths;
+  private List<Path> additionalStorePaths;
+  private TableName tableName;
+  private String regionName;
+  private TableDescriptor htd;
+  private RegionInfo regioninfo;
+
+  @BeforeClass
+  public static void setUpCluster() throws Exception {
+    TEST_UTIL.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void before() throws IOException, InterruptedException {
+    conf = TEST_UTIL.getConfiguration();
+    baseDir = TEST_UTIL.getDataTestDirOnTestFS();
+    tableName = TableName.valueOf(name.getMethodName());
+    htd = TEST_UTIL.createTableDescriptor(tableName, DEFAULT_STORE_BYTE);
+    regioninfo = RegionInfoBuilder.newBuilder(tableName).build();
+    region = TEST_UTIL.createRegionAndWAL(regioninfo, baseDir, conf, htd);
+    regionFS = region.getRegionFileSystem();
+    fs = TEST_UTIL.getTestFileSystem();
+    initialStoreFiles = createStoreFilesList();
+    sortedInitialStoreFiles = ImmutableList.sortedCopyOf(COMPARATOR, initialStoreFiles);
+    additionalStoreFiles = createStoreFilesList();
+    sortedAdditionalStoreFiles = ImmutableList.sortedCopyOf(COMPARATOR, additionalStoreFiles);
+    sortedCombinedStoreFiles = ImmutableList.sortedCopyOf(COMPARATOR,
+      ListUtils.union(initialStoreFiles, additionalStoreFiles));
+    initialStorePaths = createPathList();
+    additionalStorePaths = createPathList();
+    regionName = region.getRegionInfo().getEncodedName();
+
+    storeFilePathAccessor =
+      new HTableStoreFilePathAccessor(conf, TEST_UTIL.getAdmin().getConnection());
+    // the hbase:storefile should be created in master startup, but we initialize it here for
+    // unit tests
+    StoreFileTrackingUtils.init(TEST_UTIL.getHBaseCluster().getMaster());
+
+    storeFileManager =
+      new PersistedStoreFileManager(DEFAULT_CELL_COMPARATOR, COMPARATOR, conf,
+        Mockito.mock(CompactionPolicy.class).getConf(), regionFS, regioninfo,
+        DEFAULT_STORE_NAME, storeFilePathAccessor);
+
+    verifyStoreFileManagerWhenStarts();
+  }
+
+  @After
+  public void after() throws IOException {
+    storeFilePathAccessor
+      .deleteStoreFilePaths(tableName.getNameAsString(), regionName, DEFAULT_STORE_NAME);
+  }
+
+  @Test
+  public void testLoadFiles() throws IOException {
+    storeFileManager.loadFiles(initialStoreFiles);
+    compareIncludedInManagerVsTable(sortedInitialStoreFiles);
+  }
+
+  @Test
+  public void testLoadFiles_WithReadOnly() throws IOException {
+    storeFileManager =
+      new PersistedStoreFileManager(DEFAULT_CELL_COMPARATOR, COMPARATOR, conf,
+        Mockito.mock(CompactionPolicy.class).getConf(), regionFS, regioninfo,
+        DEFAULT_STORE_NAME, storeFilePathAccessor, true);
+    storeFileManager.loadFiles(initialStoreFiles);
+    compareIncludedInManagerVsTable(sortedInitialStoreFiles, EMPTY_LIST);
+  }
+
+  @Test
+  public void testLoadFilesWithEmptyListWithExistingData() throws IOException {
+    // first load data into the store and simulate we have persisted data
+    storeFileManager.loadFiles(initialStoreFiles);
+
+    // writing empty list to loadFiles will not fail but it's not doing anything
+    // and mostly this is expected when a fresh region is created.
+    storeFileManager.loadFiles(EMPTY_LIST);
+    // on heap view will be updated to empty, but the pre step loadInitialFiles should have
+    // provide the right view
+    //
+    // this test is telling us that we will never write empty to include list.
+    compareIncludedInManagerVsTable(EMPTY_LIST, sortedInitialStoreFiles);
+  }
+
+  @Test
+  public void testLoadFilesWithEmptyList() throws IOException {
+    storeFileManager.loadFiles(EMPTY_LIST);
+    compareIncludedInManagerVsTable(EMPTY_LIST);
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testLoadFilesWithNull() throws IOException {
+    storeFileManager.loadFiles(initialStoreFiles);
+    storeFileManager.loadFiles(null);
+  }
+
+  @Test
+  public void testInsertNewFiles() throws IOException {
+    storeFileManager.insertNewFiles(initialStoreFiles);
+    compareIncludedInManagerVsTable(sortedInitialStoreFiles);
+    storeFileManager.insertNewFiles(additionalStoreFiles);
+    compareIncludedInManagerVsTable(sortedCombinedStoreFiles);
+  }
+
+  @Test
+  public void testLoadInitialFilesWithNoData() throws IOException {
+    assertNull(storeFileManager.loadInitialFiles());
+  }
+
+  @Test
+  public void testLoadInitialFiles() throws IOException {
+    StoreFilePathAccessor mockStoreFilePathAccessor = Mockito.mock(StoreFilePathAccessor.class);
+    HRegionFileSystem mockFs = Mockito.spy(regionFS);
+    PersistedStoreFileManager storeFileManager =
+      new PersistedStoreFileManager(DEFAULT_CELL_COMPARATOR, COMPARATOR, conf,
+        Mockito.mock(CompactionPolicy.class).getConf(), mockFs, regioninfo,
+        DEFAULT_STORE_NAME, mockStoreFilePathAccessor);
+
+    // make sure the tracking table is not empty and return the list of initialStoreFiles
+    List<Path> storeFilePaths =
+      StoreFileTrackingUtils.convertStoreFilesToPaths(initialStoreFiles);
+    when(mockStoreFilePathAccessor
+      .getIncludedStoreFilePaths(tableName.getNameAsString(), regionName, DEFAULT_STORE_NAME))
+      .thenReturn(storeFilePaths);
+
+    Collection<StoreFileInfo> expectedStoreFileInfos =
+      convertToStoreFileInfos(mockFs.getFileSystem(), initialStoreFiles);
+    Collection<StoreFileInfo> actualStoreFileInfos = storeFileManager.loadInitialFiles();
+    verify(mockFs, times(0)).getStoreFiles(DEFAULT_STORE_NAME);
+    assertEquals(expectedStoreFileInfos, actualStoreFileInfos);
+  }
+
+  @Test
+  public void testLoadInitialFilesWithRefreshFileSystem() throws IOException {
+    StoreFilePathAccessor mockStoreFilePathAccessor = Mockito.mock(StoreFilePathAccessor.class);
+    HRegionFileSystem mockFs = Mockito.mock(HRegionFileSystem.class);
+    PersistedStoreFileManager storeFileManager =
+      new PersistedStoreFileManager(DEFAULT_CELL_COMPARATOR, COMPARATOR, conf,
+        Mockito.mock(CompactionPolicy.class).getConf(), mockFs, regioninfo,
+        DEFAULT_STORE_NAME, mockStoreFilePathAccessor, false);
+
+    Collection<StoreFileInfo> expectedStoreFileInfos =
+      convertToStoreFileInfos(fs, initialStoreFiles);
+
+    when(mockFs.getStoreFiles(DEFAULT_STORE_NAME)).thenReturn(expectedStoreFileInfos);
+
+    Collection<StoreFileInfo> actualStoreFileInfos = storeFileManager.loadInitialFiles();
+    verify(mockFs, times(1)).getStoreFiles(DEFAULT_STORE_NAME);
+    assertEquals(expectedStoreFileInfos, actualStoreFileInfos);
+  }
+
+  @Test
+  public void testLoadInitialFilesWithNoFiles() throws IOException {
+    HRegionFileSystem mockFs = Mockito.mock(HRegionFileSystem.class);
+    when(mockFs.getStoreFiles(DEFAULT_STORE_NAME)).thenReturn(null);
+
+    PersistedStoreFileManager storeFileManager =
+      new PersistedStoreFileManager(DEFAULT_CELL_COMPARATOR, COMPARATOR, conf,
+        Mockito.mock(CompactionPolicy.class).getConf(), mockFs, regioninfo,
+        DEFAULT_STORE_NAME, storeFilePathAccessor);
+
+    Collection<StoreFileInfo> actualStoreFileInfos = storeFileManager.loadInitialFiles();
+    verify(mockFs, times(1)).getStoreFiles(DEFAULT_STORE_NAME);
+    assertNull(actualStoreFileInfos);
+    assertEquals(EMPTY_LIST, storeFileManager.getStorefiles());
+  }
+
+  @Test
+  public void testLoadInitialFilesWithFilesFromFileSystem() throws IOException {
+    HRegionFileSystem mockFs = Mockito.mock(HRegionFileSystem.class);
+    when(mockFs.getFileSystem()).thenReturn(fs);
+    Collection<StoreFileInfo> expectedStoreFileInfos =
+      convertToStoreFileInfos(fs, initialStoreFiles);
+    when(mockFs.getStoreFiles(DEFAULT_STORE_NAME)).thenReturn(expectedStoreFileInfos);
+
+    PersistedStoreFileManager storeFileManager =
+      new PersistedStoreFileManager(DEFAULT_CELL_COMPARATOR, COMPARATOR, conf,
+        Mockito.mock(CompactionPolicy.class).getConf(), mockFs, regioninfo,
+        DEFAULT_STORE_NAME, storeFilePathAccessor);
+
+    // try to check if there is any store file to be loaded
+    Collection<StoreFileInfo> actualStoreFileInfos = storeFileManager.loadInitialFiles();
+    verify(mockFs, times(1)).getStoreFiles(DEFAULT_STORE_NAME);
+    assertEquals(expectedStoreFileInfos, actualStoreFileInfos);
+  }
+
+  @Test
+  public void testClearFiles() throws IOException {
+    storeFileManager.clearFiles();
+    compareIncludedInManagerVsTable(EMPTY_LIST);
+
+    storeFileManager.loadFiles(initialStoreFiles);
+    compareIncludedInManagerVsTable(sortedInitialStoreFiles);
+
+    storeFileManager.clearFiles();
+    compareIncludedInManagerVsTable(EMPTY_LIST, sortedInitialStoreFiles);
+  }
+
+  @Test
+  public void testClearCompactedFiles() throws IOException {
+    storeFileManager.clearCompactedFiles();
+    verifyCompactedfiles(EMPTY_LIST);
+
+    storeFileManager.addCompactionResults(initialStoreFiles, initialStoreFiles);
+    verifyCompactedfiles(sortedInitialStoreFiles);
+    storeFileManager.clearCompactedFiles();
+    verifyCompactedfiles(EMPTY_LIST);
+  }
+
+  @Test
+  public void testAddCompactionResults() throws IOException {
+    storeFileManager.clearFiles();
+    storeFileManager.clearCompactedFiles();
+
+    List<HStoreFile> firstCompactionResult = createStoreFilesList();
+    List<Path> firstCompactionResultPath =
+      StoreFileTrackingUtils.convertStoreFilesToPaths(firstCompactionResult);
+    List<HStoreFile> secondCompactionResult = createStoreFilesList();
+    List<Path> secondCompactionResultPath =
+      StoreFileTrackingUtils.convertStoreFilesToPaths(secondCompactionResult);
+    List<HStoreFile> thirdCompactionResult = createStoreFilesList();
+    List<Path> thirdCompactionResultPath =
+      StoreFileTrackingUtils.convertStoreFilesToPaths(thirdCompactionResult);
+    // Composition of all compaction results, loaded into tmpFiles to track that tmpFiles are being
+    // removed correctly
+    List<Path> initialCompactionTmpPaths = ImmutableList.copyOf(Iterables
+      .concat(firstCompactionResultPath, secondCompactionResultPath, thirdCompactionResultPath));
+
+    // manager.storefiles    = EMPTY_LIST            -> firstCompactionResult
+    // manager.compactedfiles= [empty]               -> EMPTY_LIST
+    // manager.tmpFiles = EMPTY_LIST                 -> initialCompactionTmpPaths (all results)
+    storeFileManager.addCompactionResults(EMPTY_LIST, firstCompactionResult);
+    ImmutableList<HStoreFile> expectedFirstCompactionResult =
+      ImmutableList.sortedCopyOf(COMPARATOR, firstCompactionResult);
+    compareIncludedInManagerVsTable(expectedFirstCompactionResult);
+    verifyCompactedfiles(EMPTY_LIST);
+
+    // manager.storefiles    = firstCompactionResult -> secondCompactionResult
+    // manager.compactedfiles= EMPTY_LIST  -> firstCompactionResult
+    // manager.tmpFiles      = second + third -> third
+    storeFileManager.addCompactionResults(firstCompactionResult, secondCompactionResult);
+    ImmutableList<HStoreFile> expectedSecondCompactionResult =
+      ImmutableList.sortedCopyOf(COMPARATOR, secondCompactionResult);
+    compareIncludedInManagerVsTable(expectedSecondCompactionResult);
+    verifyCompactedfiles(expectedFirstCompactionResult);
+
+    // check manager.compactedfiles accumulates
+    // manager.storefiles    = secondCompactionResult-> thirdCompactionResult
+    // manager.compactedfiles= firstCompactionResult -> firstCompactionResult+secondCompactionResult
+    // manager.tmpFiles      = third -> EMPTY
+    storeFileManager.addCompactionResults(secondCompactionResult, thirdCompactionResult);
+    ImmutableList<HStoreFile> expectedThirdCompactionResult =
+      ImmutableList.sortedCopyOf(COMPARATOR, thirdCompactionResult);
+    compareIncludedInManagerVsTable(expectedThirdCompactionResult);
+    secondCompactionResult.addAll(firstCompactionResult);
+    ImmutableList<HStoreFile> expectedCompactedfiles =
+      ImmutableList.sortedCopyOf(COMPARATOR, secondCompactionResult);
+    verifyCompactedfiles(expectedCompactedfiles);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testAddCompactionResultsWithEmptyResults() throws IOException {
+    // PersistedStoreFileManager can only perform addCompactionResults
+    // after storefiles are flushed and being tracking in-memory
+    storeFileManager.addCompactionResults(initialStoreFiles, EMPTY_LIST);
+  }
+
+  @Test
+  public void testRemoveCompactedFiles() throws IOException {
+    // make sure the store file tracking is empty
+    compareIncludedInManagerVsTable(EMPTY_LIST);
+    verifyCompactedfiles(EMPTY_LIST);
+
+    List<HStoreFile> storefilesSet1 = initialStoreFiles;
+    List<HStoreFile> storefilesSet2 = additionalStoreFiles;
+    List<HStoreFile> compactedFiles = Lists.newArrayList(storefilesSet1);
+    // load some files into store file manager
+    storeFileManager.loadFiles(storefilesSet1);
+
+    storeFileManager.addCompactionResults(storefilesSet1, storefilesSet2);
+    List<HStoreFile> expectedCompactedfiles = sortedInitialStoreFiles;
+    List<HStoreFile> expectedIncluded = sortedAdditionalStoreFiles;
+    verifyCompactedfiles(expectedCompactedfiles);
+    compareIncludedInManagerVsTable(expectedIncluded);
+
+    storeFileManager.removeCompactedFiles(compactedFiles);
+    verifyCompactedfiles(EMPTY_LIST);
+    compareIncludedInManagerVsTable(expectedIncluded);
+  }
+
+  @Test
+  public void testRemoveCompactedFilesWhenEmpty() throws IOException {
+    // simulate the store.close() remove again and it should not change the table both in memory
+    // and the table.
+    HRegionFileSystem mockFs = Mockito.mock(HRegionFileSystem.class);
+    StoreFilePathAccessor mockStoreFilePathAccessor = Mockito.mock(StoreFilePathAccessor.class);
+    PersistedStoreFileManager storeFileManager =
+      new PersistedStoreFileManager(DEFAULT_CELL_COMPARATOR, COMPARATOR, conf,
+        Mockito.mock(CompactionPolicy.class).getConf(), mockFs, regioninfo,
+        DEFAULT_STORE_NAME, mockStoreFilePathAccessor);
+    assertTrue(storeFileManager.getCompactedfiles().isEmpty());
+    storeFileManager.removeCompactedFiles(sortedInitialStoreFiles);
+    assertTrue(storeFileManager.getCompactedfiles().isEmpty());
+  }
+
+  @Test
+  public void testRemoveCompactedFilesNormalOperation() throws IOException {
+    // simulate the store.close() remove again and it should not change the table both in memory
+    // and the table.
+    HRegionFileSystem mockFs = Mockito.mock(HRegionFileSystem.class);
+    StoreFilePathAccessor mockStoreFilePathAccessor = Mockito.mock(StoreFilePathAccessor.class);
+    PersistedStoreFileManager storeFileManager =
+      new PersistedStoreFileManager(DEFAULT_CELL_COMPARATOR, COMPARATOR, conf,
+        Mockito.mock(CompactionPolicy.class).getConf(), mockFs, regioninfo, DEFAULT_STORE_NAME,
+        mockStoreFilePathAccessor);
+    assertTrue(storeFileManager.getCompactedfiles().isEmpty());
+
+    storeFileManager.addCompactionResults(initialStoreFiles, initialStoreFiles);
+    assertEquals(storeFileManager.getCompactedfiles(), sortedInitialStoreFiles);
+    storeFileManager.removeCompactedFiles(sortedInitialStoreFiles);
+    assertTrue(storeFileManager.getCompactedfiles().isEmpty());
+  }
+
+  @Test
+  public void testUpdatePathListToTracker_ReadOnly() throws IOException {
+    HRegionFileSystem mockFs = Mockito.mock(HRegionFileSystem.class);
+    StoreFilePathAccessor mockStoreFilePathAccessor = Mockito.mock(StoreFilePathAccessor.class);
+    PersistedStoreFileManager storeFileManager =
+      new PersistedStoreFileManager(DEFAULT_CELL_COMPARATOR, COMPARATOR, conf,
+        Mockito.mock(CompactionPolicy.class).getConf(), mockFs, regioninfo, DEFAULT_STORE_NAME,
+        mockStoreFilePathAccessor, true);
+
+    StoreFilePathUpdate storeFilePathUpdate = StoreFilePathUpdate.builder()
+      .withStoreFiles(initialStoreFiles).build();
+    storeFileManager.updatePathListToTracker(storeFilePathUpdate);
+    verify(mockStoreFilePathAccessor, times(0))
+      .writeStoreFilePaths(regioninfo.getTable().getNameAsString(), regioninfo.getEncodedName(),
+        DEFAULT_STORE_NAME, storeFilePathUpdate);
+  }
+
+  private void compareIncludedInManagerVsTable(List<HStoreFile> expectedFiles) throws IOException {
+    compareIncludedInManagerVsTable(expectedFiles, expectedFiles);
+  }
+
+  private void compareIncludedInManagerVsTable(List<HStoreFile> expectedStoreFilesOnHeap,
+    List<HStoreFile> expectedStoreFilesInTable) throws IOException {
+    Collection<HStoreFile> storeFilesOnHeap = storeFileManager.getStorefiles();
+    assertEquals(expectedStoreFilesOnHeap, storeFilesOnHeap);
+
+    Collection<Path> includedPathsFromAccessor = storeFilePathAccessor
+      .getIncludedStoreFilePaths(tableName.getNameAsString(), regionName, DEFAULT_STORE_NAME);
+    assertEquals(StoreFileTrackingUtils.convertStoreFilesToPaths(expectedStoreFilesInTable),
+      includedPathsFromAccessor);
+  }
+
+  private void verifyCompactedfiles(List<HStoreFile> expectedCompactedfilesOnHeap) {
+    Collection<HStoreFile> compactedFilesOnHeap = storeFileManager.getCompactedfiles();
+    assertEquals(expectedCompactedfilesOnHeap, compactedFilesOnHeap);
+  }
+
+  private List<HStoreFile> createStoreFilesList() throws IOException {
+    HStoreFile sf1 = createFile();
+    HStoreFile sf2 = createFile();
+    HStoreFile sf3 = createFile();
+    return Lists.newArrayList(sf1, sf2, sf3);
+  }
+
+  private List<Path> createPathList() throws IOException {
+    Path path1 = createFilePath();
+    Path path2 = createFilePath();
+    Path path3 = createFilePath();
+    return Lists.newArrayList(path1, path2, path3);
+  }
+
+  private MockHStoreFile createFile() throws IOException {
+    return new MockHStoreFile(TEST_UTIL, createFilePath(), 0, 0, false, 1);
+  }
+
+  private Path createFilePath() throws IOException {
+    Path testFilePath = StoreFileWriter.getUniqueFile(fs, baseDir);
+    FSDataOutputStream out = fs.create(testFilePath);
+    out.write(0);
+    out.close();
+    return testFilePath;
+  }
+
+  private void verifyStoreFileManagerWhenStarts() {
+    assertTrue(storeFileManager.getStorefiles().isEmpty());
+    assertTrue(storeFileManager.getCompactedfiles().isEmpty());
+  }
+
+  private Collection<StoreFileInfo> convertToStoreFileInfos(FileSystem fs,
+    List<HStoreFile> storeFiles)
+    throws IOException {
+    ArrayList<StoreFileInfo> result = new ArrayList<>(storeFiles.size());
+    for (HStoreFile storeFile: storeFiles) {
+      StoreFileInfo info = new StoreFileInfo(conf, fs, storeFile.getFileInfo().getFileStatus());
+      result.add(info);
+    }
+    return result;
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileTrackingUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileTrackingUtils.java
new file mode 100644
index 0000000..82040bf
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileTrackingUtils.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MiscTests.class, SmallTests.class })
+public class TestStoreFileTrackingUtils {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestStoreFileTrackingUtils.class);
+
+  private Configuration conf;
+  private boolean isFeatureEnabled;
+
+  @Before
+  public void setup() throws IOException {
+    conf = HBaseConfiguration.create();
+  }
+
+  @Test
+  public void testIsStoreFileTrackingPersistEnabled() {
+    conf.setBoolean(HConstants.STOREFILE_TRACKING_PERSIST_ENABLED, true);
+    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, PersistedStoreEngine.class.getName());
+    isFeatureEnabled = StoreFileTrackingUtils.isStoreFileTrackingPersistEnabled(conf);
+    assertTrue(isFeatureEnabled);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testIsStorefileTrackingPersistDisabledWithStoreEngineSet() {
+    conf.setBoolean(HConstants.STOREFILE_TRACKING_PERSIST_ENABLED, false);
+    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, PersistedStoreEngine.class.getName());
+    isFeatureEnabled = StoreFileTrackingUtils.isStoreFileTrackingPersistEnabled(conf);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testisStoreFileTrackingPersistEnabledWithMismatchedStoreEngine() {
+    conf.setBoolean(HConstants.STOREFILE_TRACKING_PERSIST_ENABLED, true);
+    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
+    isFeatureEnabled = StoreFileTrackingUtils.isStoreFileTrackingPersistEnabled(conf);
+  }
+
+  @Test
+  public void testIsStorefileTrackingPersistDisabled() {
+    conf.setBoolean(HConstants.STOREFILE_TRACKING_PERSIST_ENABLED, false);
+    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
+    isFeatureEnabled = StoreFileTrackingUtils.isStoreFileTrackingPersistEnabled(conf);
+    assertFalse(isFeatureEnabled);
+  }
+
+  @Test
+  public void testGetFamilyFromKey() {
+    String separator = "-";
+    String rowkey1 = "region-cf-table";
+    String rowkey2 = "region-new-cf-table";
+    assertEquals("cf",
+      StoreFileTrackingUtils.getFamilyFromKey(rowkey1, "table", "region", separator));
+    assertEquals("new-cf",
+      StoreFileTrackingUtils.getFamilyFromKey(rowkey2, "table", "region", separator));
+  }
+
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java
index 1d1e9fb..2f9fcf2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java
@@ -611,10 +611,11 @@ public class TestStripeStoreFileManager {
 
   private static StripeStoreFileManager createManager(
       ArrayList<HStoreFile> sfs, Configuration conf) throws Exception {
+    HStore store = Mockito.mock(HStore.class);
     StripeStoreConfig config = new StripeStoreConfig(
-        conf, Mockito.mock(StoreConfigInformation.class));
+        conf, store);
     StripeStoreFileManager result = new StripeStoreFileManager(CellComparatorImpl.COMPARATOR, conf,
-        config);
+        config, store.getRegionFileSystem(), store.getColumnFamilyName());
     result.loadFiles(sfs);
     return result;
   }