You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ta...@apache.org on 2021/03/16 03:37:43 UTC

[hbase] branch HBASE-24749 updated (7b58ada -> cca4cbf)

This is an automated email from the ASF dual-hosted git repository.

taklwu pushed a change to branch HBASE-24749
in repository https://gitbox.apache.org/repos/asf/hbase.git.


 discard 7b58ada  HBASE-25395 Introduce PersistedStoreEngine and PersistedStoreFileManager (#2931)
     add d6aff6c  HBASE-25553 It is better for ReplicationTracker.getListOfRegionServers to return ServerName instead of String (#2928)
     add 355bccb  HBASE-25560 Remove unused parameter named peerId in the constructor method of CatalogReplicationSourcePeer (#2939)
     add ce9c9b4  HBASE-25550 More readable Competition Time (#2925)
     add ca672ac  HBASE-25519 BLOCKSIZE needs to support pretty print (#2894)
     add 098f7c0  fix bug: string out of bounds when construct illegal tablename error message (#2884)
     add 16fe1e9  Revert "fix bug: string out of bounds when construct illegal tablename error message (#2884)"
     add 8a4c3b0  HBASE-25512 May throw StringIndexOutOfBoundsException when construct illegal tablename error #2884
     add ae063f9  HBASE-25542 Add client detail to scan name so when lease expires, we … (#2930)
     add f09e420  HBASE-24772 Use GetoptLong or OptionParser in hbase-shell (#2918)
     add 0353909  HBASE-25507 Leak of ESTABLISHED sockets when compaction encountered "java.io.IOException: Invalid HFile block magic" (#2882)
     add 4a3ff98  HBASE-25559 Terminate threads of oldsources while RS is closing (#2938)
     add 25e3633  HBASE-25534 Honor TableDescriptor settings earlier in normalization (#2917)
     add 3e743df  HBASE-25364 Redo the getMidPoint() in HFileWriterImpl to get rid of the double comparison process (#2741)
     add e88f54b  HBASE-25508 Add an example of using the thrift proxy in thrift-over-http mode
     add b2f85e5  HBASE-23887 AdaptiveLRU cache (#2934)
     add b6649a8  HBASE-23887 Bug fix heavyEvictionMbSizeLimit (ADDENDUM) (#2957)
     add 618236d  HBASE-25547: Thread pools should release unused resources (#2922)
     add 1beda0d  HBASE-25541 : Setting the path to null when we dequeue the current log (#2959)
     add 88057d8  HBASE-25539: Add age of oldest wal metric (#2945)
     add ed90a14  HBASE-25562 ReplicationSourceWALReader log and handle exception immediately without retrying (#2943)
     add dc993f3  HBASE-25589 Update download page for HBase Operator Tools to 1.1.0
     add d8b8662  HBASE-25574 Revisit put/delete/increment/append related RegionObserver methods (#2953)
     add 5fa15cf  HBASE-25575 Should validate Puts in RowMutations (#2954)
     add 8f03c44  HBASE-25556 Frequent replication "Encountered a malformed edit" warnings (#2965)
     add 51a3d45  HBASE-25598 TestFromClientSide5.testScanMetrics is flaky (#2977)
     add ed2693f  HBASE-25602 Fix broken TestReplicationShell on master (#2981)
     add a7d0445  HBASE-25601 Use ASF-official mailing list archives
     add 3f1c486  HBASE-25596: Fix NPE and avoid permanent unreplicated data due to EOF (#2987)
     add 8d0de96  HBASE-25590 Bulkload replication HFileRefs cannot be cleared in some cases where set exclude-namespace/exclude-table-cfs (#2969)
     add a984358  HBASE-25586 Fix HBASE-22492 on branch-2 (SASL GapToken) (#2961)
     add 30cb419  HBASE-25615 Upgrade java version in pre commit docker file (#2997)
     add 34bd1bd  HBASE-25620 Increase timeout value for pre commit (#3000)
     add d5df999  HBASE-25604 Upgrade spotbugs to 4.x (#2986)
     add b24bd40  HBASE-25611 ExportSnapshot chmod flag uses value as decimal (#3003)
     add b522d2a  Revert "HBASE-25604 Upgrade spotbugs to 4.x (#2986)"
     add a97a40c  HBASE-25580 Release scripts should include in the vote email the git hash that the RC tag points to (#2956)
     add 157200e  HBASE-25402 Sorting order by start key or end key is not considering empty start key/end key (#2955)
     add e099ef3  HBASE-25626 Possible Resource Leak in HeterogeneousRegionCountCostFunction
     add a4eb1aa  HBASE-25421 There is no limit on the column length when creating a table (#2796)
     add 5d9a6ed  HBASE-25367 Sort broken after Change 'State time' in UI (#2964)
     add e80b901  HBASE-25603 Add switch for compaction after bulkload (#2982)
     add f93c9c6  HBASE-25385 TestCurrentHourProvider fails if the latest timezone changes are not present (#3012)
     add 830d289  HBASE-25460 : Expose drainingServers as cluster metric (#2995)
     add dd4a11e  HBASE-25637 Rename method completeCompaction to refreshStoreSizeAndTotalBytes (#3023)
     add 9b0485f  HBASE-23578 [UI] Master UI shows long stack traces when table is broken (#3014)
     add 190c253  HBASE-25609 There is a problem with the SPLITS_FILE in the HBase shell statement(#2992)
     add 53128fe  HBASE-25644 Scan#setSmall blindly sets ReadType as PREAD
     add c1dacfd  HBASE-25547 (addendum): Roll ExecutorType into ExecutorConfig (#2996)
     add 109bd24  HBASE-25630 Set switch compaction after bulkload default as false (#3022)
     add 573daed  HBASE-25646: Possible Resource Leak in CatalogJanitor #3036
     add d818eff  HBASE-25582 Support setting scan ReadType to be STREAM at cluster level (#3035)
     add 92fe609  HBASE-25604 Upgrade spotbugs to 4.x (#3029)
     add 95342a2  HBASE-25654 [Documentation] Fix format error in security.adoc
     add 373dc77  HBASE-25548 Optionally allow snapshots to preserve cluster's max file… (#2923)
     add d79019b  HBASE-25629 Reimplement TestCurrentHourProvider to not depend on unstable TZs (#3013)
     add 0e6c2c4  HBASE-25636 Expose HBCK report as metrics (#3031)
     add 0cc1ae4  HBASE-25587 [hbck2] Schedule SCP for all unknown servers (#2978)
     add cc61714  HBASE-25566 RoundRobinTableInputFormat (#2947)
     add 1a69a52  HBASE-25570 On largish cluster, "CleanerChore: Could not delete dir..." makes master log unreadable (#2949)
     add 7386fb6  HBASE-25622 Result#compareResults should compare tags. (#3026)
     add 876fec1  HBASE-25657 Fix spotbugs warnings after upgrading spotbugs to 4.x (#3041)
     add aeec8ca  HBASE-25635 CandidateGenerator may miss some region balance actions (#3024)
     add 8337fb2  HBASE-25662 Fix spotbugs warning in RoundRobinTableInputFormat (#3050)
     add f4e1ab7  HBASE-25663 Make graceful_stop localhostname compare match even if fqdn (#3048)
     add 630f47e   HBASE-25660 Print split policy in use on Region open (as well as split policy vitals) (#3044)
     add 21409bf  HBASE-25573 release script generated vote template has incorrect staging area (#2952)
     add 625bea3  HBASE-25595 TestLruBlockCache.testBackgroundEvictionThread is flaky (#2974)
     add 0ef892b  HBASE-25621 Balancer should check region plan source to avoid misplace region groups (#3002)
     add db2e6d8  HBASE-25597 Add row info in Exception when cell size exceeds maxCellSize (#2976)
     add 5457554  HBASE-25374 Make REST Client connection and socket time out configurable (#2752)
     new cca4cbf  HBASE-25395 Introduce PersistedStoreEngine and PersistedStoreFileManager (#2931)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (7b58ada)
            \
             N -- N -- N   refs/heads/HBASE-24749 (cca4cbf)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 bin/graceful_stop.sh                               |  52 +--
 dev-support/Jenkinsfile_GitHub                     |   4 +-
 dev-support/create-release/release-build.sh        |  11 +-
 dev-support/create-release/vote.tmpl               |   6 +-
 dev-support/docker/Dockerfile                      |  13 +-
 dev-support/spotbugs-exclude.xml                   |   5 +
 .../client/ColumnFamilyDescriptorBuilder.java      |  12 +
 .../hadoop/hbase/client/ConnectionUtils.java       |  10 +-
 .../org/apache/hadoop/hbase/client/HBaseHbck.java  |  16 +
 .../java/org/apache/hadoop/hbase/client/Hbck.java  |   2 +
 .../hadoop/hbase/client/RawAsyncTableImpl.java     |  13 +-
 .../org/apache/hadoop/hbase/client/Result.java     |   3 +-
 .../java/org/apache/hadoop/hbase/client/Scan.java  |   4 +-
 .../hadoop/hbase/client/SnapshotDescription.java   |  14 +-
 .../hbase/client/TableDescriptorBuilder.java       |   4 +
 .../hbase/replication/ReplicationPeerConfig.java   |  29 +-
 .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java |   4 +
 .../client/TestColumnFamilyDescriptorBuilder.java  |  28 ++
 .../org/apache/hadoop/hbase/client/TestScan.java   |  12 +
 .../hbase/client/TestTableDescriptorBuilder.java   |  11 +-
 .../replication/TestReplicationPeerConfig.java     | 366 ++++++++-------
 .../java/org/apache/hadoop/hbase/CellUtil.java     |   4 +
 .../org/apache/hadoop/hbase/PrivateCellUtil.java   |  25 +
 .../java/org/apache/hadoop/hbase/TableName.java    |   6 +-
 .../hadoop/hbase/util/AbstractHBaseTool.java       |  12 +-
 hbase-common/src/main/resources/hbase-default.xml  |   7 +
 .../org/apache/hadoop/hbase/TestTableName.java     |   2 +-
 .../thrift1/demo_hbase_thrift_over_http_tls.py     |  71 +++
 .../master/MetricsAssignmentManagerSource.java     |  70 +++
 .../master/MetricsAssignmentManagerSourceImpl.java |  58 +++
 .../hadoop/hbase/master/MetricsMasterSource.java   |   4 +
 .../hbase/master/MetricsMasterSourceImpl.java      |   4 +
 .../hadoop/hbase/master/MetricsMasterWrapper.java  |  14 +
 .../MetricsReplicationGlobalSourceSourceImpl.java  |  12 +
 .../MetricsReplicationSourceSource.java            |   4 +
 .../MetricsReplicationSourceSourceImpl.java        |  14 +
 .../mapreduce/RoundRobinTableInputFormat.java      | 172 +++++++
 .../hadoop/hbase/mapreduce/TableMapReduceUtil.java |  16 +-
 .../apache/hadoop/hbase/mapreduce/TableSplit.java  |  14 +-
 .../hadoop/hbase/snapshot/ExportSnapshot.java      |   2 +-
 .../mapreduce/TestRoundRobinTableInputFormat.java  | 177 ++++++++
 .../hadoop/hbase/mapreduce/TestTableSplit.java     |  24 +-
 .../src/main/protobuf/server/Snapshot.proto        |   1 +
 .../src/main/protobuf/server/master/Master.proto   |  10 +
 .../hbase/replication/ReplicationTracker.java      |   7 +-
 .../replication/ReplicationTrackerZKImpl.java      |  16 +-
 .../org/apache/hadoop/hbase/rest/Constants.java    |   8 +
 .../apache/hadoop/hbase/rest/client/Client.java    |  66 ++-
 .../hadoop/hbase/tmpl/common/TaskMonitorTmpl.jamon |   8 +-
 .../hbase/tmpl/master/MasterStatusTmpl.jamon       |   1 +
 .../hbase/tmpl/regionserver/RSStatusTmpl.jamon     |   7 +-
 .../hadoop/hbase/coprocessor/RegionObserver.java   | 187 +++++++-
 .../hadoop/hbase/executor/ExecutorService.java     | 112 +++--
 .../hadoop/hbase/io/hfile/BlockCacheFactory.java   |   2 +
 .../hadoop/hbase/io/hfile/HFileWriterImpl.java     | 211 ++++-----
 ...uBlockCache.java => LruAdaptiveBlockCache.java} | 483 ++++++++++++++------
 .../hadoop/hbase/io/hfile/LruBlockCache.java       |   7 +-
 .../org/apache/hadoop/hbase/ipc/ServerCall.java    |  19 +-
 .../org/apache/hadoop/hbase/master/HMaster.java    |  54 ++-
 .../org/apache/hadoop/hbase/master/HbckChore.java  |  10 +
 .../hadoop/hbase/master/MasterRpcServices.java     |  25 +
 .../hbase/master/MetricsAssignmentManager.java     |  28 ++
 .../hbase/master/MetricsMasterWrapperImpl.java     |  18 +
 .../apache/hadoop/hbase/master/ServerManager.java  |  11 +
 .../hbase/master/assignment/AssignmentManager.java |  11 +
 .../hbase/master/balancer/CandidateGenerator.java  |   6 +-
 .../HeterogeneousRegionCountCostFunction.java      |  11 +-
 .../master/balancer/StochasticLoadBalancer.java    |   7 +
 .../hadoop/hbase/master/cleaner/CleanerChore.java  |  13 +-
 .../hbase/master/janitor/CatalogJanitor.java       |  17 +-
 .../hbase/master/normalizer/RegionNormalizer.java  |   8 +-
 .../master/normalizer/RegionNormalizerWorker.java  |   5 +-
 .../master/normalizer/SimpleRegionNormalizer.java  | 116 +++--
 .../hbase/master/snapshot/SnapshotManager.java     |   4 +
 .../hbase/master/snapshot/TakeSnapshotHandler.java |   9 +-
 .../hbase/regionserver/BusyRegionSplitPolicy.java  |   8 +-
 .../ConstantSizeRegionSplitPolicy.java             |  21 +-
 .../DelimitedKeyPrefixRegionSplitPolicy.java       |   6 +
 .../hbase/regionserver/FlushAllStoresPolicy.java   |   7 +-
 .../hbase/regionserver/FlushLargeStoresPolicy.java |   5 +
 .../apache/hadoop/hbase/regionserver/HRegion.java  |  52 ++-
 .../hadoop/hbase/regionserver/HRegionServer.java   |  79 ++--
 .../apache/hadoop/hbase/regionserver/HStore.java   |  16 +-
 .../IncreasingToUpperBoundRegionSplitPolicy.java   |   6 +
 .../regionserver/KeyPrefixRegionSplitPolicy.java   |   6 +
 .../hadoop/hbase/regionserver/RSRpcServices.java   | 176 ++++---
 .../hbase/regionserver/RegionCoprocessorHost.java  |  57 ++-
 .../regionserver/RegionServicesForStores.java      |   8 +-
 .../hbase/regionserver/SteppingSplitPolicy.java    |   7 +-
 .../hadoop/hbase/regionserver/StoreScanner.java    |  14 +-
 .../hbase/regionserver/compactions/Compactor.java  |  17 +-
 .../compactions/CurrentHourProvider.java           |  21 +-
 .../hbase/regionserver/wal/ProtobufLogReader.java  |   6 +-
 .../NamespaceTableCfWALEntryFilter.java            |  84 +---
 .../regionserver/CatalogReplicationSourcePeer.java |   2 +-
 .../regionserver/DumpReplicationQueues.java        |   4 +-
 .../replication/regionserver/MetricsSource.java    |  11 +
 .../regionserver/RecoveredReplicationSource.java   |  12 +-
 .../RecoveredReplicationSourceShipper.java         |  14 +-
 .../regionserver/ReplicationSource.java            | 100 ++--
 .../regionserver/ReplicationSourceLogQueue.java    | 189 ++++++++
 .../regionserver/ReplicationSourceManager.java     |   9 +-
 .../regionserver/ReplicationSourceShipper.java     |   7 +-
 .../regionserver/ReplicationSourceWALReader.java   | 168 ++++---
 .../SerialReplicationSourceWALReader.java          |  11 +-
 .../replication/regionserver/WALEntryBatch.java    |   4 +
 .../replication/regionserver/WALEntryStream.java   |  21 +-
 .../hadoop/hbase/wal/AbstractFSWALProvider.java    |   2 +-
 .../main/resources/hbase-webapps/master/table.jsp  |  21 +-
 .../static/js/parser-date-iso8601.min.js           |   4 +
 .../hadoop/hbase/TestExecutorStatusChore.java      |   5 +-
 .../org/apache/hadoop/hbase/TestIOFencing.java     |   6 +-
 .../apache/hadoop/hbase/client/TestAsyncTable.java |  43 ++
 .../hadoop/hbase/client/TestAsyncTableBatch.java   |  50 ++
 .../hadoop/hbase/client/TestFromClientSide5.java   |  20 +-
 .../org/apache/hadoop/hbase/client/TestResult.java | 141 ++++++
 .../hadoop/hbase/executor/TestExecutorService.java |  14 +-
 .../apache/hadoop/hbase/io/hfile/TestHFile.java    |  15 +
 ...ckCache.java => TestLruAdaptiveBlockCache.java} | 505 +++++++++++++--------
 .../hadoop/hbase/master/TestMasterMetrics.java     |   1 +
 ...estStochasticLoadBalancerHeterogeneousCost.java |  23 +
 .../normalizer/TestRegionNormalizerWorker.java     |   6 +-
 .../normalizer/TestSimpleRegionNormalizer.java     | 195 ++++++--
 .../hadoop/hbase/master/procedure/TestHBCKSCP.java |  30 +-
 ...taWithReplicas.java => TestHBCKSCPUnknown.java} |  36 +-
 .../master/snapshot/TestTakeSnapshotHandler.java   | 111 +++++
 .../regionserver/TestBulkLoadReplication.java      |   8 +-
 .../TestBulkLoadReplicationHFileRefs.java          | 310 +++++++++++++
 .../regionserver/TestCompactionAfterBulkLoad.java  |  46 +-
 .../regionserver/TestHRegionReplayEvents.java      |   6 +-
 .../hadoop/hbase/regionserver/TestHStore.java      |  17 +
 .../hbase/regionserver/TestRSRpcServices.java      |  65 +++
 .../hbase/regionserver/TestSplitLogWorker.java     |   4 +-
 .../compactions/TestCurrentHourProvider.java       |  38 +-
 .../hbase/replication/TestReplicationBase.java     |  27 +-
 .../TestReplicationEmptyWALRecovery.java           | 298 ++++++++++--
 .../replication/TestReplicationTrackerZKImpl.java  |  18 +-
 .../regionserver/TestReplicationSource.java        |  72 ++-
 .../TestReplicationSourceLogQueue.java             |  83 ++++
 .../TestReplicationSourceManagerJoin.java          | 109 +++++
 .../regionserver/TestWALEntryStream.java           | 166 +++++--
 hbase-shell/src/main/ruby/hbase/admin.rb           |   7 +-
 .../src/main/ruby/hbase/replication_admin.rb       |  23 +-
 hbase-shell/src/main/ruby/jar-bootstrap.rb         |  59 +--
 .../src/main/ruby/shell/commands/snapshot.rb       |   2 +-
 .../hadoop/hbase/client/TestReplicationShell.java  |   3 -
 pom.xml                                            |  15 +-
 src/main/asciidoc/_chapters/community.adoc         |   8 +-
 src/main/asciidoc/_chapters/compression.adoc       |   4 +-
 src/main/asciidoc/_chapters/configuration.adoc     |   2 +-
 src/main/asciidoc/_chapters/developer.adoc         |   6 +-
 src/main/asciidoc/_chapters/hbase-default.adoc     |  12 +
 src/main/asciidoc/_chapters/ops_mgt.adoc           |  38 +-
 src/main/asciidoc/_chapters/performance.adoc       |   3 +-
 src/main/asciidoc/_chapters/schema_design.adoc     |   4 +-
 src/main/asciidoc/_chapters/security.adoc          |   2 +-
 src/main/asciidoc/_chapters/troubleshooting.adoc   |  16 +-
 src/site/xdoc/downloads.xml                        |  12 +-
 158 files changed, 5109 insertions(+), 1579 deletions(-)
 create mode 100755 hbase-examples/src/main/python/thrift1/demo_hbase_thrift_over_http_tls.py
 create mode 100644 hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RoundRobinTableInputFormat.java
 create mode 100644 hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRoundRobinTableInputFormat.java
 copy hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/{LruBlockCache.java => LruAdaptiveBlockCache.java} (63%)
 create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceLogQueue.java
 create mode 100644 hbase-server/src/main/resources/hbase-webapps/static/js/parser-date-iso8601.min.js
 copy hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/{TestLruBlockCache.java => TestLruAdaptiveBlockCache.java} (78%)
 copy hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/{TestSCPWithMetaWithReplicas.java => TestHBCKSCPUnknown.java} (55%)
 create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestTakeSnapshotHandler.java
 create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplicationHFileRefs.java
 create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSRpcServices.java
 create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java
 create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerJoin.java


[hbase] 01/01: HBASE-25395 Introduce PersistedStoreEngine and PersistedStoreFileManager (#2931)

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

taklwu pushed a commit to branch HBASE-24749
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit cca4cbf173f4320dcb9262e9a97deba27f7de15e
Author: Tak Lon (Stephen) Wu <ta...@apache.org>
AuthorDate: Mon Mar 15 20:24:35 2021 -0700

    HBASE-25395 Introduce PersistedStoreEngine and PersistedStoreFileManager (#2931)
    
    * HBASE-25395 Introduce PersistedStoreEngine and PersistedStoreFileManager
    
    - Implement HTableStoreFilePathAccessor
    - Add loadInitialFiles to StoreFileManager Interface
    - refactor and add loadInitialFiles to the base interface,
      such each store file manager implementation could have their
      own way to discovery a list of available files instead of
      just calling fs.getStoreFiles()
    - add store instance to each store file manager implementation
    - Add PersistedStoreEngine and PersistedStoreFileManager
       that fork the in-memory object in store file manager to
       a off-memory structure that is going to be used when
       region reassign, (re)open, cluster restart and etc.
       This improvement aims for better object store (e.g. S3)
       support without calling LIST to get storefiles from
       a given column family.
    
    Reviewed-by: Andor Molnár <an...@apache.org>
    Signed-off-by: Zach York <zy...@apache.org>
---
 .../java/org/apache/hadoop/hbase/HConstants.java   |  13 +
 .../java/org/apache/hadoop/hbase/TableName.java    |   5 +
 .../org/apache/hadoop/hbase/master/HMaster.java    |   7 +
 .../AbstractStoreFilePathAccessor.java             | 111 +++++
 .../hbase/regionserver/DateTieredStoreEngine.java  |   2 +-
 .../hbase/regionserver/DefaultStoreEngine.java     |  10 +-
 .../regionserver/DefaultStoreFileManager.java      |  43 +-
 .../apache/hadoop/hbase/regionserver/HStore.java   |   3 +-
 .../regionserver/HTableStoreFilePathAccessor.java  | 156 +++++++
 .../hbase/regionserver/PersistedStoreEngine.java   |  72 +++
 .../regionserver/PersistedStoreFileManager.java    | 169 +++++++
 .../hbase/regionserver/StoreFileManager.java       |  19 +-
 .../hbase/regionserver/StoreFilePathAccessor.java  |  79 ++++
 .../hbase/regionserver/StoreFilePathUpdate.java    |  92 ++++
 .../hbase/regionserver/StoreFileTrackingUtils.java | 135 ++++++
 .../hbase/regionserver/StripeStoreEngine.java      |   3 +-
 .../hbase/regionserver/StripeStoreFileManager.java |  12 +-
 .../StoreFilePathAccessorTestBase.java             | 195 ++++++++
 .../TestHTableStoreFilePathAccessor.java           |  77 ++++
 .../regionserver/TestPersistedStoreEngine.java     | 145 ++++++
 .../TestPersistedStoreFileManager.java             | 496 +++++++++++++++++++++
 .../regionserver/TestStoreFileTrackingUtils.java   |  91 ++++
 .../regionserver/TestStripeStoreFileManager.java   |   5 +-
 23 files changed, 1918 insertions(+), 22 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 48fa00c..0417a96 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -25,6 +25,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -1646,6 +1647,18 @@ public final class HConstants {
    */
   public static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;
 
+  /**
+   * Configuration for storefile tracking feature
+   */
+  public static final String STOREFILE_TRACKING_PERSIST_ENABLED =
+    "hbase.storefile.tracking.persist.enabled";
+  public static final boolean DEFAULT_STOREFILE_TRACKING_PERSIST_ENABLED = false;
+
+  public static final String STOREFILE_TRACKING_INIT_TIMEOUT =
+    "hbase.storefile.tracking.init.timeout";
+  public static final long DEFAULT_STOREFILE_TRACKING_INIT_TIMEOUT =
+    TimeUnit.MINUTES.toMillis(5);
+
   private HConstants() {
     // Can't be instantiated with this ctor.
   }
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java
index 62668ec..e3ad39d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java
@@ -93,6 +93,11 @@ public final class TableName implements Comparable<TableName> {
   public static final TableName NAMESPACE_TABLE_NAME =
     valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "namespace");
 
+  /** The storefile table's name. */
+  public static final String STOREFILE_STR = "storefile";
+  public static final TableName STOREFILE_TABLE_NAME =
+      valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, STOREFILE_STR);
+
   public static final String OLD_META_STR = ".META.";
   public static final String OLD_ROOT_STR = "-ROOT-";
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 4c3ff89..81a3dd9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -174,6 +174,7 @@ import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshotNotifierFactory;
 import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.regionserver.StoreFileTrackingUtils;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationLoadSource;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@@ -1023,6 +1024,12 @@ public class HMaster extends HRegionServer implements MasterServices {
     getChoreService().scheduleChore(catalogJanitorChore);
     this.hbckChore = new HbckChore(this);
     getChoreService().scheduleChore(hbckChore);
+
+    // enable or cleanup storefile tracking feature
+    if (StoreFileTrackingUtils.isStoreFileTrackingPersistEnabled(conf)) {
+      StoreFileTrackingUtils.init(this);
+    }
+
     this.serverManager.startChore();
 
     // Only for rolling upgrade, where we need to migrate the data in namespace table to meta table.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractStoreFilePathAccessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractStoreFilePathAccessor.java
new file mode 100644
index 0000000..d494d86
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractStoreFilePathAccessor.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+
+@InterfaceAudience.Private
+public abstract class AbstractStoreFilePathAccessor implements StoreFilePathAccessor {
+
+  public static final String STOREFILE_INCLUDED_STR = "included";
+
+  protected static final String LIST_SEPARATOR = ";";
+  protected final Configuration conf;
+
+  public AbstractStoreFilePathAccessor(Configuration conf) {
+    this.conf = conf;
+  }
+
+  abstract String getSeparator();
+
+  abstract List<Path> getStoreFilePaths(final String tableName, final String regionName,
+    final String storeName, final String columnName) throws IOException;
+
+  @Override
+  public abstract void writeStoreFilePaths(final String tableName, final String regionName,
+    final String storeName, StoreFilePathUpdate storeFilePathUpdate)
+    throws IOException;
+
+  @Override
+  public List<Path> getIncludedStoreFilePaths(final String tableName, final String regionName,
+    final String storeName) throws IOException {
+    return getStoreFilePaths(tableName, regionName, storeName, STOREFILE_INCLUDED_STR);
+  }
+
+  protected static byte[] storeFileListToByteArray(List<Path> storeFilePaths) {
+    return Bytes.toBytes(Joiner.on(LIST_SEPARATOR).join(storeFilePaths));
+  }
+
+  protected static List<Path> byteToStoreFileList(byte[] data) {
+    List<Path> paths = new ArrayList<>();
+    if (data != null && data.length != 0) {
+      String pathString = Bytes.toString(data);
+      String[] pathStrings = StringUtils.split(pathString, LIST_SEPARATOR);
+      for (String path : pathStrings) {
+        paths.add(new Path(path));
+      }
+    }
+    return paths;
+  }
+
+  /**
+   * Get a rowkey in the order of regionName-storeName-tablename
+   *
+   * @param tableName table name
+   * @param regionName region encoded name
+   * @param storeName column family name
+   * @return a joint rowkey in the form of regionName-storeName-tablename
+   */
+  protected String getKey(final String tableName, final String regionName, final String storeName) {
+    return Joiner.on(getSeparator()).join(regionName, storeName, tableName);
+  }
+
+  protected void validate(final String tableName, final String regionName,
+    final String storeName, final String columnName) {
+    validate(tableName, regionName, storeName);
+    Preconditions.checkArgument(StringUtils.isNotBlank(columnName),
+      "column name cannot be null or empty");
+  }
+
+  protected void validate(final String tableName, final String regionName,
+    final String storeName) {
+    Preconditions
+      .checkArgument(StringUtils.isNotBlank(tableName), "table name cannot be null or empty");
+    Preconditions
+      .checkArgument(StringUtils.isNotBlank(regionName), "region name cannot be null or empty");
+    Preconditions
+      .checkArgument(StringUtils.isNotBlank(storeName), "store name cannot be null or empty");
+  }
+
+  protected void validate(final String tableName, final String regionName, final String storeName,
+    final StoreFilePathUpdate storeFilePathUpdate) {
+    validate(tableName, regionName, storeName);
+    Preconditions.checkArgument(!storeFilePathUpdate.getStoreFiles().isEmpty(),
+      "Must have storefiles to be updated");
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
index 1df953d..de8bd35 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
@@ -59,7 +59,7 @@ public class DateTieredStoreEngine extends StoreEngine<DefaultStoreFlusher,
     this.compactionPolicy = new DateTieredCompactionPolicy(conf, store);
     this.storeFileManager =
         new DefaultStoreFileManager(kvComparator, StoreFileComparators.SEQ_ID_MAX_TIMESTAMP, conf,
-            compactionPolicy.getConf());
+            compactionPolicy.getConf(), store.getRegionFileSystem(), store.getColumnFamilyName());
     this.storeFlusher = new DefaultStoreFlusher(conf, store);
     this.compactor = new DateTieredCompactor(conf, store);
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
index 58f8bbb..090c7d9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
@@ -68,9 +68,13 @@ public class DefaultStoreEngine extends StoreEngine<
     createCompactor(conf, store);
     createCompactionPolicy(conf, store);
     createStoreFlusher(conf, store);
-    storeFileManager =
-        new DefaultStoreFileManager(kvComparator, StoreFileComparators.SEQ_ID, conf,
-            compactionPolicy.getConf());
+    createStoreFileManager(conf, store, kvComparator);
+  }
+
+  protected void createStoreFileManager(Configuration conf, HStore store,
+    CellComparator kvComparator) {
+    storeFileManager = new DefaultStoreFileManager(kvComparator, StoreFileComparators.SEQ_ID, conf,
+      compactionPolicy.getConf(), store.getRegionFileSystem(), store.getColumnFamilyName());
   }
 
   protected void createCompactor(Configuration conf, HStore store) throws IOException {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
index f5c3fa7..ed705a1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
@@ -49,6 +49,8 @@ class DefaultStoreFileManager implements StoreFileManager {
   private final CompactionConfiguration comConf;
   private final int blockingFileCount;
   private final Comparator<HStoreFile> storeFileComparator;
+  private final HRegionFileSystem regionFs;
+  private final String familyName;
   /**
    * List of store files inside this store. This is an immutable list that
    * is atomically replaced when its contents change.
@@ -64,20 +66,28 @@ class DefaultStoreFileManager implements StoreFileManager {
 
   public DefaultStoreFileManager(CellComparator cellComparator,
       Comparator<HStoreFile> storeFileComparator, Configuration conf,
-      CompactionConfiguration comConf) {
+      CompactionConfiguration comConf, HRegionFileSystem regionFs,
+      String familyName) {
     this.cellComparator = cellComparator;
     this.storeFileComparator = storeFileComparator;
     this.comConf = comConf;
     this.blockingFileCount =
         conf.getInt(HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
+    this.regionFs = regionFs;
+    this.familyName = familyName;
   }
 
   @Override
-  public void loadFiles(List<HStoreFile> storeFiles) {
+  public void loadFiles(List<HStoreFile> storeFiles) throws IOException {
     this.storefiles = ImmutableList.sortedCopyOf(storeFileComparator, storeFiles);
   }
 
   @Override
+  public Collection<StoreFileInfo> loadInitialFiles() throws IOException {
+    return regionFs.getStoreFiles(familyName);
+  }
+
+  @Override
   public final Collection<HStoreFile> getStorefiles() {
     return storefiles;
   }
@@ -101,7 +111,7 @@ class DefaultStoreFileManager implements StoreFileManager {
   }
 
   @Override
-  public Collection<HStoreFile> clearCompactedFiles() {
+  public Collection<HStoreFile> clearCompactedFiles() throws IOException {
     List<HStoreFile> result = compactedfiles;
     compactedfiles = ImmutableList.of();
     return result;
@@ -119,16 +129,27 @@ class DefaultStoreFileManager implements StoreFileManager {
 
   @Override
   public void addCompactionResults(Collection<HStoreFile> newCompactedfiles,
-      Collection<HStoreFile> results) {
+    Collection<HStoreFile> results) throws IOException {
     this.storefiles = ImmutableList.sortedCopyOf(storeFileComparator, Iterables
-        .concat(Iterables.filter(storefiles, sf -> !newCompactedfiles.contains(sf)), results));
+      .concat(Iterables.filter(storefiles, sf -> !newCompactedfiles.contains(sf)), results));
+    this.compactedfiles = ImmutableList.sortedCopyOf(storeFileComparator,
+      Iterables.concat(this.compactedfiles, newCompactedfiles));
+
+    addCompactionResultsHook(storefiles);
     // Mark the files as compactedAway once the storefiles and compactedfiles list is finalized
     // Let a background thread close the actual reader on these compacted files and also
     // ensure to evict the blocks from block cache so that they are no longer in
     // cache
     newCompactedfiles.forEach(HStoreFile::markCompactedAway);
-    this.compactedfiles = ImmutableList.sortedCopyOf(storeFileComparator,
-      Iterables.concat(this.compactedfiles, newCompactedfiles));
+  }
+
+  /**
+   * additional logic after addCompactionResults() before marking compactedAway to
+   * newCompactedfiles the if any.
+   * @param storeFiles a list of store files to be processed
+   */
+  protected void addCompactionResultsHook(ImmutableList<HStoreFile> storeFiles) throws IOException {
+    // no-ops
   }
 
   @Override
@@ -203,5 +224,13 @@ class DefaultStoreFileManager implements StoreFileManager {
   public Comparator<HStoreFile> getStoreFileComparator() {
     return storeFileComparator;
   }
+
+  HRegionFileSystem getRegionFs() {
+    return regionFs;
+  }
+
+  String getFamilyName() {
+    return familyName;
+  }
 }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index a2a8f9d..6439fef 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -547,7 +547,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
    * from the given directory.
    */
   private List<HStoreFile> loadStoreFiles(boolean warmup) throws IOException {
-    Collection<StoreFileInfo> files = getRegionFileSystem().getStoreFiles(getColumnFamilyName());
+    Collection<StoreFileInfo> files =
+        this.storeEngine.getStoreFileManager().loadInitialFiles();
     return openStoreFiles(files, warmup);
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HTableStoreFilePathAccessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HTableStoreFilePathAccessor.java
new file mode 100644
index 0000000..0f87ae7
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HTableStoreFilePathAccessor.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
+/**
+ * Helper class to interact with the hbase:storefile system table
+ *
+ * <pre>
+ *   ROW-KEY              FAMILY:QUALIFIER      DATA VALUE
+ *   region-store-table   included:files        List&lt;Path&gt; filesIncludedInRead
+ * </pre>
+ *
+ * The region encoded name is set as prefix for region split loading balance, and we use the
+ * target table name as suffix such that operator can identify the records per table.
+ *
+ * included:files is used for persisting storefiles of StoreFileManager in the cases of store
+ * opens and store closes. Meanwhile compactedFiles of StoreFileManager isn't being tracked
+ * off-memory, because the updated included:files contains compactedFiles and the leftover
+ * compactedFiles are either archived when a store closes or opens.
+ *
+ * TODO we will need a followup change to introduce in-memory temporarily file, such that further
+ *      we can introduce a non-tracking temporarily storefiles left from a flush or compaction when
+ *      a regionserver crashes without closing the store properly
+ */
+
+@InterfaceAudience.Private
+public class HTableStoreFilePathAccessor extends AbstractStoreFilePathAccessor {
+
+  public static final byte[] STOREFILE_FAMILY_INCLUDED = Bytes.toBytes(STOREFILE_INCLUDED_STR);
+
+  private static final String DASH_SEPARATOR = "-";
+  private static final String STOREFILE_QUALIFIER_STR = "filepaths";
+  private static final byte[] STOREFILE_QUALIFIER = Bytes.toBytes(STOREFILE_QUALIFIER_STR);
+  private static final int STOREFILE_TABLE_VERSIONS = 3;
+
+  // TODO find a way for system table to support region split at table creation or remove this
+  //  comment when we merge into hbase:meta table
+  public static final TableDescriptor STOREFILE_TABLE_DESC =
+    TableDescriptorBuilder.newBuilder(TableName.STOREFILE_TABLE_NAME)
+      .setColumnFamily(
+        ColumnFamilyDescriptorBuilder.newBuilder(STOREFILE_FAMILY_INCLUDED)
+          .setMaxVersions(STOREFILE_TABLE_VERSIONS)
+          .setInMemory(true)
+          .build())
+      .setRegionSplitPolicyClassName(BusyRegionSplitPolicy.class.getName())
+      .build();
+
+  private final Connection connection;
+
+  public HTableStoreFilePathAccessor(Configuration conf, Connection connection) {
+    super(conf);
+    Preconditions.checkNotNull(connection, "connection cannot be null");
+    this.connection = connection;
+  }
+
+  @Override
+  List<Path> getStoreFilePaths(final String tableName, final String regionName,
+    final String storeName, final String colFamily) throws IOException {
+    validate(tableName, regionName, storeName, colFamily);
+    byte[] colFamilyBytes = Bytes.toBytes(colFamily);
+    Get get =
+      new Get(Bytes.toBytes(getKey(tableName, regionName, storeName)));
+    get.addColumn(colFamilyBytes, STOREFILE_QUALIFIER);
+    Result result = doGet(get);
+    if (result.isEmpty()) {
+      return new ArrayList<>();
+    }
+    return byteToStoreFileList(result.getValue(colFamilyBytes, STOREFILE_QUALIFIER));
+  }
+
+  @Override
+  public void writeStoreFilePaths(final String tableName, final String regionName,
+    final String storeName, StoreFilePathUpdate storeFilePathUpdate)
+    throws IOException {
+    validate(tableName, regionName, storeName, storeFilePathUpdate);
+    Put put = generatePutForStoreFilePaths(tableName, regionName, storeName, storeFilePathUpdate);
+    doPut(put);
+  }
+
+
+  private Put generatePutForStoreFilePaths(final String tableName, final String regionName,
+    final String storeName, final StoreFilePathUpdate storeFilePathUpdate) {
+    Put put = new Put(Bytes.toBytes(getKey(tableName, regionName, storeName)));
+    put.addColumn(Bytes.toBytes(STOREFILE_INCLUDED_STR), STOREFILE_QUALIFIER,
+      storeFileListToByteArray(storeFilePathUpdate.getStoreFiles()));
+    return put;
+  }
+
+  @Override
+  public void deleteStoreFilePaths(final String tableName, final String regionName,
+    final String storeName) throws IOException {
+    validate(tableName, regionName, storeName);
+    Delete delete = new Delete(
+      Bytes.toBytes(getKey(tableName, regionName, storeName)));
+    delete.addColumns(STOREFILE_FAMILY_INCLUDED, STOREFILE_QUALIFIER);
+    doDelete(Lists.newArrayList(delete));
+  }
+
+  @Override
+  String getSeparator() {
+    return DASH_SEPARATOR;
+  }
+
+  private Result doGet(final Get get) throws IOException {
+    try (Table table = connection.getTable(TableName.STOREFILE_TABLE_NAME)) {
+      return table.get(get);
+    }
+  }
+
+  private void doPut(final Put put) throws IOException {
+    try (Table table = connection.getTable(TableName.STOREFILE_TABLE_NAME)) {
+      table.put(put);
+    }
+  }
+
+  private void doDelete(final List<Delete> delete) throws IOException {
+    try (Table table = connection.getTable(TableName.STOREFILE_TABLE_NAME)) {
+      table.delete(delete);
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedStoreEngine.java
new file mode 100644
index 0000000..1c44e3e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedStoreEngine.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+
+/**
+ * The StoreEngine that implements persisted and renameless store compaction and flush
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class PersistedStoreEngine extends DefaultStoreEngine {
+
+  @Override
+  public void createComponents(
+    Configuration conf, HStore store, CellComparator kvComparator) throws IOException {
+    Preconditions.checkArgument(StoreFileTrackingUtils.isStoreFileTrackingPersistEnabled(conf));
+
+    createCompactor(conf, store);
+    createCompactionPolicy(conf, store);
+    createStoreFlusher(conf, store);
+    createStoreFileManager(conf, store, kvComparator);
+  }
+
+  @Override
+  protected void createStoreFileManager(Configuration conf, HStore store,
+    CellComparator kvComparator) {
+    TableName tableName = store.getTableName();
+    // for master region, hbase:meta and hbase:storefile table, DefaultStoreManager is used.
+    // such these tables scan from the filesystem directly
+    if (tableName.equals(TableName.META_TABLE_NAME)
+      || tableName.equals(MasterRegionFactory.TABLE_NAME)
+      || tableName.equals(TableName.STOREFILE_TABLE_NAME)) {
+      super.createStoreFileManager(conf, store, kvComparator);
+      return;
+    }
+
+    RegionServerServices regionServerServices = store.getHRegion().getRegionServerServices();
+    Connection connection = regionServerServices.getConnection();
+    boolean readOnly = store.getHRegion().isReadOnly();
+
+    storeFileManager =
+      new PersistedStoreFileManager(kvComparator, StoreFileComparators.SEQ_ID, conf,
+        compactionPolicy.getConf(), store.getRegionFileSystem(), store.getRegionInfo(),
+        store.getColumnFamilyName(),
+        StoreFileTrackingUtils.createStoreFilePathAccessor(conf, connection),
+        readOnly);
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedStoreFileManager.java
new file mode 100644
index 0000000..d3f096a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedStoreFileManager.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
+
+/**
+ * A Storefile manager that is used by {@link PersistedStoreEngine} that persists the in-memory
+ * storefile tracking to a persistent table hbase:storefile.
+ *
+ * We don't override the {@link #clearFiles()} from {@link DefaultStoreFileManager} and persist
+ * in-memory storefiles tracking, it will be reused when region reassigns on a different
+ * region server.
+ */
+@InterfaceAudience.Private
+public class PersistedStoreFileManager extends DefaultStoreFileManager {
+  private static final Logger LOG = LoggerFactory.getLogger(PersistedStoreFileManager.class);
+  private final RegionInfo regionInfo;
+  private final String tableName;
+  private final String regionName;
+  private final String storeName;
+  private final StoreFilePathAccessor accessor;
+  private final Configuration conf;
+  // only uses for warmupHRegion
+  private final boolean readOnly;
+
+  public PersistedStoreFileManager(CellComparator cellComparator,
+    Comparator<HStoreFile> storeFileComparator, Configuration conf,
+    CompactionConfiguration compactionConfiguration, HRegionFileSystem regionFs,
+    RegionInfo regionInfo, String familyName, StoreFilePathAccessor accessor, boolean readOnly) {
+    super(cellComparator, storeFileComparator, conf, compactionConfiguration, regionFs, familyName);
+    this.conf = conf;
+    this.regionInfo = regionInfo;
+    this.tableName = regionInfo.getTable().getNameAsString();
+    this.regionName = regionInfo.getEncodedName();
+    this.storeName = familyName;
+    this.accessor = accessor;
+    this.readOnly = readOnly;
+  }
+
+  public PersistedStoreFileManager(CellComparator cellComparator,
+    Comparator<HStoreFile> storeFileComparator, Configuration conf,
+    CompactionConfiguration compactionConfiguration, HRegionFileSystem regionFs,
+    RegionInfo regionInfo, String familyName, StoreFilePathAccessor accessor) {
+    this(cellComparator, storeFileComparator, conf, compactionConfiguration, regionFs, regionInfo,
+      familyName, accessor, false);
+  }
+
+  @Override
+  public void loadFiles(List<HStoreFile> storeFiles) throws IOException {
+    // update with a sorted store files
+    super.loadFiles(storeFiles);
+    Preconditions.checkArgument(storeFiles != null, "store files cannot be "
+      + "null when loading");
+    if (storeFiles.isEmpty()) {
+      LOG.warn("Other than fresh region with no store files, store files should not be empty");
+      return;
+    }
+    updatePathListToTracker(StoreFilePathUpdate.builder().withStoreFiles(getStorefiles()).build());
+  }
+
+  @Override
+  public void insertNewFiles(Collection<HStoreFile> sfs) throws IOException {
+    // concatenate the new store files
+    super.insertNewFiles(sfs);
+    // return in case of empty store files as it is a No-op, here empty files are expected
+    // during region close
+    if (CollectionUtils.isEmpty(getStorefiles())) {
+      return;
+    }
+    updatePathListToTracker(StoreFilePathUpdate.builder().withStoreFiles(getStorefiles()).build());
+  }
+
+  @Override
+  protected void addCompactionResultsHook(ImmutableList<HStoreFile> storeFiles)
+    throws IOException {
+    Preconditions.checkNotNull(storeFiles, "storeFiles cannot be null");
+    updatePathListToTracker(StoreFilePathUpdate.builder().withStoreFiles(storeFiles).build());
+  }
+
+  @Override
+  public Collection<StoreFileInfo> loadInitialFiles() throws IOException {
+    // this logic is totally different from the default implementation in DefaultStoreFileManager
+
+    List<Path> pathList = accessor.getIncludedStoreFilePaths(tableName, regionName, storeName);
+    boolean isEmptyInPersistedFilePaths = CollectionUtils.isEmpty(pathList);
+    if (isEmptyInPersistedFilePaths) {
+      // When the path accessor is returning empty result, we scan the
+      // the file storage and see if there is any existing HFiles should be loaded.
+      // the scan is a one time process when store open during region assignment.
+      //
+      // this is especially used for region and store open
+      // 1. First time migration from a filesystem based e.g. DefaultStoreFileEngine
+      // 2. After region split and merge
+      // 3. After table clone and create new HFiles directly into data directory
+      //
+      // Also we don't handle the inconsistency between storefile tracking and file system, which
+      // will be handled by a HBCK command
+      LOG.info("Cannot find tracking paths ({}) for store {} in region {} of "
+          + "table {}, fall back to scan the storage to get a list of storefiles to be opened"
+        , isEmptyInPersistedFilePaths, storeName, regionName,
+        tableName);
+      return getRegionFs().getStoreFiles(getFamilyName());
+    }
+    ArrayList<StoreFileInfo> storeFiles = new ArrayList<>();
+    for (Path storeFilePath : pathList) {
+      if (!StoreFileInfo.isValid(getRegionFs().getFileSystem().getFileStatus(storeFilePath))) {
+        LOG.warn("Invalid StoreFile: {}, and archiving it", storeFilePath);
+        getRegionFs().removeStoreFile(storeName, storeFilePath);
+        continue;
+      }
+      StoreFileInfo info = ServerRegionReplicaUtil
+        .getStoreFileInfo(conf, getRegionFs().getFileSystem(), regionInfo,
+          ServerRegionReplicaUtil.getRegionInfoForFs(regionInfo), getFamilyName(),
+          storeFilePath);
+      storeFiles.add(info);
+    }
+    return storeFiles;
+  }
+
+  void updatePathListToTracker(StoreFilePathUpdate storeFilePathUpdate) throws IOException {
+    try {
+      // if this is not a read only region, update the tracking path
+      if (!readOnly) {
+        accessor.writeStoreFilePaths(tableName, regionName, storeName, storeFilePathUpdate);
+      }
+    } catch (IOException e) {
+      String message = String.format(
+        "Failed to persist tracking paths with key %s-%s-%s to table [%s]. "
+          + "%nPaths failed to be updated are: %s",
+        regionName, storeName, tableName, TableName.STOREFILE_STR, storeFilePathUpdate);
+      LOG.error(message);
+      throw new IOException(message, e);
+    }
+  }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
index d4c4f17..11264f8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
@@ -24,11 +24,9 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
-
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.yetus.audience.InterfaceAudience;
-
 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection;
 
 /**
@@ -46,7 +44,16 @@ public interface StoreFileManager {
    * Loads the initial store files into empty StoreFileManager.
    * @param storeFiles The files to load.
    */
-  void loadFiles(List<HStoreFile> storeFiles);
+  void loadFiles(List<HStoreFile> storeFiles) throws IOException;
+
+  /**
+   * Load store files that are available for opening to perform filter-based
+   * validation
+   *
+   * @return a list of {@link StoreFileInfo} for the requested store.
+   * @throws IOException if store files cannot be listed
+   */
+  Collection<StoreFileInfo> loadInitialFiles() throws IOException;
 
   /**
    * Adds new files, either for from MemStore flush or bulk insert, into the structure.
@@ -65,7 +72,7 @@ public interface StoreFileManager {
   /**
    * Remove the compacted files
    * @param compactedFiles the list of compacted files
-   * @throws IOException
+   * @throws IOException if compacted files cannot be cleaned
    */
   void removeCompactedFiles(Collection<HStoreFile> compactedFiles) throws IOException;
 
@@ -80,7 +87,7 @@ public interface StoreFileManager {
    * accessed single threaded.
    * @return The files compacted previously.
    */
-  Collection<HStoreFile> clearCompactedFiles();
+  Collection<HStoreFile> clearCompactedFiles() throws IOException;
 
   /**
    * Gets the snapshot of the store files currently in use. Can be used for things like metrics
@@ -145,7 +152,7 @@ public interface StoreFileManager {
   /**
    * Gets the split point for the split of this set of store files (approx. middle).
    * @return The mid-point if possible.
-   * @throws IOException
+   * @throws IOException on failures
    */
   Optional<byte[]> getSplitPoint() throws IOException;
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFilePathAccessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFilePathAccessor.java
new file mode 100644
index 0000000..cc2d716
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFilePathAccessor.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Helper class to interact with the hbase storefile tracking data persisted as off-memory data
+ * from the {@link StoreFileManager}
+ *
+ * There is only a set of tracking storefiles, 'included'.
+ *
+ * e.g. list of storefile paths in 'included' should be the identical copy of the in-memory
+ * {@link HStoreFile}'s Path(s) and can be reused during region opens and region reassignment.
+ */
+@InterfaceAudience.Private
+public interface StoreFilePathAccessor {
+
+  /**
+   * Get storefile paths from the 'included' data set
+   * @param tableName name of the current table in String
+   * @param regionName name of the current region in String
+   * @param storeName name of the column family in String, to be combined with regionName to make
+   *                 the row key.
+   * @return list of StoreFile paths that should be included in reads in this store,
+   *         returns an empty list if the target cell is empty or doesn't exist.
+   * @throws IOException if a remote or network exception occurs during Get
+   */
+  List<Path> getIncludedStoreFilePaths(final String tableName, final String regionName,
+    final String storeName) throws IOException;
+
+  /**
+   * Write an entity that should be persisted into the tracking data for the
+   * specific column family of a given region
+   *
+   * it would be happened during storefile operation e.g. flush and compaction.
+   *
+   * @param tableName name of the current table in String
+   * @param regionName name of the current region in String
+   * @param storeName name of the column family in String, to be combined with regionName to make
+   *                 the row key.
+   * @param storeFilePathUpdate Updates to be persisted
+   * @throws IOException if a remote or network exception occurs during write
+   */
+  void writeStoreFilePaths(final String tableName, final String regionName,
+    final String storeName, final StoreFilePathUpdate storeFilePathUpdate) throws IOException;
+
+  /**
+   * Delete storefile paths for a tracking column family, normally used when a region-store is
+   * completely removed due to region split or merge
+   * @param tableName name of the current table in String
+   * @param regionName name of the current region in String
+   * @param storeName name of the column family in String, to be combined with regionName to make
+   *                 the row key.
+   * @throws IOException if a remote or network exception occurs during delete
+   */
+  void deleteStoreFilePaths(final String tableName, final String regionName, final String storeName)
+    throws IOException;
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFilePathUpdate.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFilePathUpdate.java
new file mode 100644
index 0000000..472be43
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFilePathUpdate.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import com.google.errorprone.annotations.RestrictedApi;
+import java.util.Collection;
+import java.util.List;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.fs.Path;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+
+@InterfaceAudience.Private
+final class StoreFilePathUpdate {
+
+  private final List<Path> storeFiles;
+
+  private StoreFilePathUpdate(final List<Path> storeFiles) {
+    Preconditions.checkNotNull(storeFiles, "StoreFiles cannot be null");
+    this.storeFiles = storeFiles;
+  }
+
+  List<Path> getStoreFiles() {
+    return storeFiles;
+  }
+
+  @Override
+  public String toString() {
+    return "StoreFilePathUpdate{" + "storeFiles=" + storeFiles + "}";
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    StoreFilePathUpdate that = (StoreFilePathUpdate) o;
+
+    return new EqualsBuilder().append(storeFiles, that.storeFiles).isEquals();
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder(17, 37).append(storeFiles).toHashCode();
+  }
+
+  static Builder builder() {
+    return new Builder();
+  }
+
+  static class Builder {
+    private List<Path> storeFiles = ImmutableList.of();
+
+    Builder withStoreFiles(Collection<HStoreFile> storeFiles) {
+      this.storeFiles = StoreFileTrackingUtils.convertStoreFilesToPaths(storeFiles);
+      return this;
+    }
+
+    @RestrictedApi(explanation = "Should only be called in tests", link = "",
+      allowedOnPath = ".*/src/test/.*")
+    Builder withStorePaths(List<Path> storeFiles) {
+      this.storeFiles = storeFiles;
+      return this;
+    }
+
+    StoreFilePathUpdate build() {
+      return new StoreFilePathUpdate(storeFiles);
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileTrackingUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileTrackingUtils.java
new file mode 100644
index 0000000..5e14663
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileTrackingUtils.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to support persistent store file tracking
+ */
+@InterfaceAudience.Private
+public final class StoreFileTrackingUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackingUtils.class);
+  public static final long SLEEP_DELTA_MS = TimeUnit.MILLISECONDS.toMillis(100);
+
+  private StoreFileTrackingUtils() {
+    // private for utility class
+  }
+
+  public static boolean isStoreFileTrackingPersistEnabled(Configuration conf) {
+    boolean isStoreTrackingPersistEnabled =
+      conf.getBoolean(HConstants.STOREFILE_TRACKING_PERSIST_ENABLED,
+        HConstants.DEFAULT_STOREFILE_TRACKING_PERSIST_ENABLED);
+    boolean isPersistedStoreEngineSet =
+      conf.get(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName())
+        .equals(PersistedStoreEngine.class.getName());
+    boolean isFeatureEnabled = isStoreTrackingPersistEnabled && isPersistedStoreEngineSet;
+    if (isStoreTrackingPersistEnabled ^ isPersistedStoreEngineSet) {
+      // check if both configuration are correct.
+      String errorMessage = String.format("please set %s to true and set store engine key %s to "
+          + "%s to enable persist storefile tracking",
+        HConstants.STOREFILE_TRACKING_PERSIST_ENABLED, StoreEngine.STORE_ENGINE_CLASS_KEY,
+        PersistedStoreEngine.class.getName());
+      throw new IllegalArgumentException(errorMessage);
+    }
+    return isFeatureEnabled;
+  }
+
+  /**
+   * if storefile tracking feature is configured, Initialize hbase:storefile table and wait for it
+   * to be online. Otherwise, look for hbase:storefile table and remove it
+   *
+   * @param masterServices masterServices
+   * @throws IOException if hbase:storefile table cannot be initialized and be online
+   */
+  public static void init(MasterServices masterServices) throws IOException {
+    createStorefileTable(masterServices);
+    waitForStoreFileTableOnline(masterServices);
+  }
+
+  public static StoreFilePathAccessor createStoreFilePathAccessor(Configuration conf,
+    Connection connection) {
+    return new HTableStoreFilePathAccessor(conf, connection);
+  }
+
+  public static List<Path> convertStoreFilesToPaths(Collection<HStoreFile> storeFiles) {
+    return storeFiles.stream().map(HStoreFile::getPath).collect(Collectors.toList());
+  }
+
+  private static void createStorefileTable(MasterServices masterServices)
+    throws IOException {
+    if (MetaTableAccessor.getTableState(masterServices.getConnection(),
+      TableName.STOREFILE_TABLE_NAME) == null) {
+      LOG.info("{} table not found. Creating...", TableName.STOREFILE_TABLE_NAME);
+      masterServices.createSystemTable(HTableStoreFilePathAccessor.STOREFILE_TABLE_DESC);
+    }
+  }
+
+  private static void waitForStoreFileTableOnline(MasterServices masterServices)
+    throws IOException {
+    try {
+      long startTime = EnvironmentEdgeManager.currentTime();
+      long timeout = masterServices.getConfiguration()
+        .getLong(HConstants.STOREFILE_TRACKING_INIT_TIMEOUT,
+          HConstants.DEFAULT_STOREFILE_TRACKING_INIT_TIMEOUT);
+      while (!isStoreFileTableAssignedAndEnabled(masterServices)) {
+        if (EnvironmentEdgeManager.currentTime() - startTime + SLEEP_DELTA_MS > timeout) {
+          throw new IOException("Time out " + timeout + " ms waiting for hbase:storefile table to "
+            + "be assigned and enabled: " + masterServices.getTableStateManager()
+            .getTableState(TableName.STOREFILE_TABLE_NAME));
+        }
+        Thread.sleep(SLEEP_DELTA_MS);
+      }
+    } catch (InterruptedException e) {
+      throw new IOException("Interrupted when wait for " + TableName.STOREFILE_TABLE_NAME
+        + " to be assigned and enabled", e);
+    }
+  }
+
+  public static boolean isStoreFileTableAssignedAndEnabled(MasterServices masterServices)
+    throws IOException {
+    return masterServices.getAssignmentManager().getRegionStates()
+      .hasTableRegionStates(TableName.STOREFILE_TABLE_NAME) && masterServices
+      .getTableStateManager().getTableState(TableName.STOREFILE_TABLE_NAME).isEnabled();
+  }
+
+  static String getFamilyFromKey(String key, String tableName, String regionName,
+    String separator) {
+    assert key.startsWith(regionName) : "Unexpected suffix for row key from hbase:storefile "
+      + "table";
+    int startIndex = regionName.length() + separator.length();
+    int endIndex = key.lastIndexOf(separator + tableName);
+    return key.substring(startIndex, endIndex);
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java
index 14863a6..8b3ddbd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java
@@ -61,7 +61,8 @@ public class StripeStoreEngine extends StoreEngine<StripeStoreFlusher,
       Configuration conf, HStore store, CellComparator comparator) throws IOException {
     this.config = new StripeStoreConfig(conf, store);
     this.compactionPolicy = new StripeCompactionPolicy(conf, store, config);
-    this.storeFileManager = new StripeStoreFileManager(comparator, conf, this.config);
+    this.storeFileManager = new StripeStoreFileManager(comparator, conf, this.config,
+        store.getRegionFileSystem(), store.getColumnFamilyName());
     this.storeFlusher = new StripeStoreFlusher(
       conf, store, this.compactionPolicy, this.storeFileManager);
     this.compactor = new StripeCompactor(conf, store);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
index beed41f..4ad5dc4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
@@ -82,6 +82,8 @@ public class StripeStoreFileManager
    */
   public final static byte[] OPEN_KEY = HConstants.EMPTY_BYTE_ARRAY;
   final static byte[] INVALID_KEY = null;
+  private final HRegionFileSystem regionFs;
+  private final String familyName;
 
   /**
    * The state class. Used solely to replace results atomically during
@@ -124,11 +126,14 @@ public class StripeStoreFileManager
   private final int blockingFileCount;
 
   public StripeStoreFileManager(
-      CellComparator kvComparator, Configuration conf, StripeStoreConfig config) {
+      CellComparator kvComparator, Configuration conf, StripeStoreConfig config,
+      HRegionFileSystem regionFs, String familyName) {
     this.cellComparator = kvComparator;
     this.config = config;
     this.blockingFileCount = conf.getInt(
         HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
+    this.regionFs = regionFs;
+    this.familyName = familyName;
   }
 
   @Override
@@ -137,6 +142,11 @@ public class StripeStoreFileManager
   }
 
   @Override
+  public Collection<StoreFileInfo> loadInitialFiles() throws IOException {
+    return regionFs.getStoreFiles(familyName);
+  }
+
+  @Override
   public Collection<HStoreFile> getStorefiles() {
     return state.allFilesCached;
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/StoreFilePathAccessorTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/StoreFilePathAccessorTestBase.java
new file mode 100644
index 0000000..a346217
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/StoreFilePathAccessorTestBase.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TestName;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
+public abstract class StoreFilePathAccessorTestBase {
+
+  @Rule
+  public TestName name = new TestName();
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  protected StoreFilePathAccessor storeFilePathAccessor;
+  protected static final String REGION_NAME = UUID.randomUUID().toString().replaceAll("-", "");
+  protected static final String STORE_NAME = UUID.randomUUID().toString();
+  protected static final List<Path> EMPTY_PATH = Collections.emptyList();
+  protected static final List<Path> INCLUDE_EXAMPLE_PATH =
+    Lists.newArrayList(new Path("hdfs://foo/bar1"), new Path("hdfs://foo/bar2"));
+  protected static final String VALID_TABLE_NAME_CHARS = "_.";
+
+  protected String tableName;
+
+  protected abstract StoreFilePathAccessor getStoreFilePathAccessor() throws IOException;
+
+  @BeforeClass
+  public static void setUpCluster() throws Exception {
+    TEST_UTIL.startMiniCluster(1);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    tableName = name.getMethodName() + VALID_TABLE_NAME_CHARS + UUID.randomUUID();
+    init();
+    storeFilePathAccessor = getStoreFilePathAccessor();
+  }
+
+  abstract void init() throws Exception;
+
+  @After
+  public void after() throws Exception {
+    cleanupTest();
+  }
+
+  abstract void cleanupTest() throws Exception;
+
+  @Test
+  public void testInitialize() throws Exception {
+    MasterServices masterServices = TEST_UTIL.getHBaseCluster().getMaster();
+    verifyInitialize(masterServices);
+  }
+
+  // this will be implemented by each implementation of StoreFilePathAccessor
+  abstract void verifyInitialize(MasterServices masterServices) throws Exception;
+
+  abstract void verifyNotInitializedException();
+
+  @Test
+  public void testIncludedStoreFilePaths() throws Exception {
+    testInitialize();
+    // verify empty list before write
+    verifyIncludedFilePaths(EMPTY_PATH);
+    writeAndVerifyIncludedFilePaths(INCLUDE_EXAMPLE_PATH);
+  }
+
+  @Test
+  public void testIncludedStoreFilePathsWithEmptyList() throws Exception {
+    expectedException.expect(IllegalArgumentException.class);
+    testInitialize();
+    // verify empty before write
+    verifyIncludedFilePaths(EMPTY_PATH);
+    // write and verify empty list fails
+    writeAndVerifyIncludedFilePaths(EMPTY_PATH);
+  }
+
+  @Test
+  public void testWriteIncludedStoreFilePathsWhenNotInitialized() throws Exception {
+    verifyNotInitializedException();
+    writeAndVerifyIncludedFilePaths(INCLUDE_EXAMPLE_PATH);
+  }
+
+  @Test
+  public void testGetIncludedStoreFilePathsWhenNotInitialized() throws Exception {
+    verifyNotInitializedException();
+    storeFilePathAccessor.getIncludedStoreFilePaths(tableName, REGION_NAME, STORE_NAME);
+  }
+
+  @Test
+  public void testWriteIncludedStoreFilePathsWithEmptyList() throws Exception {
+    expectedException.expect(IllegalArgumentException.class);
+    testInitialize();
+    // verify empty before write
+    verifyIncludedFilePaths(EMPTY_PATH);
+    // write and verify empty list fails
+    writeAndVerifyIncludedFilePaths(EMPTY_PATH);
+  }
+
+  @Test
+  public void testWriteIncludedStoreFilePaths() throws Exception {
+    testInitialize();
+    verifyIncludedFilePaths(EMPTY_PATH);
+    writeAndVerifyIncludedFilePaths(INCLUDE_EXAMPLE_PATH);
+  }
+
+  @Test
+  public void testWriteIncludedStoreFilePathsWithNull() throws Exception {
+    expectedException.expect(NullPointerException.class);
+    testInitialize();
+    // verify empty before write
+    verifyIncludedFilePaths(EMPTY_PATH);
+    // write and verify empty list fails
+    writeAndVerifyIncludedFilePaths(null);
+  }
+
+  @Test
+  public void testDeleteStoreFilePaths() throws Exception {
+    testInitialize();
+
+    // verify empty list before write
+    verifyIncludedFilePaths(EMPTY_PATH);
+    // write some date to included:files data set
+    writeAndVerifyIncludedFilePaths(INCLUDE_EXAMPLE_PATH);
+    // delete and verify both data set are empty
+    storeFilePathAccessor.deleteStoreFilePaths(tableName, REGION_NAME, STORE_NAME);
+    verifyIncludedFilePaths(EMPTY_PATH);
+  }
+
+  @Test
+  public void testDeleteStoreFilePathsWithNoData() throws Exception {
+    testInitialize();
+
+    // verify empty list before write
+    verifyIncludedFilePaths(EMPTY_PATH);
+    // delete and verify both data set are empty
+    storeFilePathAccessor.deleteStoreFilePaths(tableName, REGION_NAME, STORE_NAME);
+    verifyIncludedFilePaths(EMPTY_PATH);
+  }
+
+  @Test
+  public void testDeleteStoreFilePathsWhenNotInitialized() throws Exception {
+    expectedException.expectCause(Matchers.isA(TableNotFoundException.class));
+    storeFilePathAccessor.deleteStoreFilePaths(tableName, REGION_NAME, STORE_NAME);
+  }
+
+  protected void writeAndVerifyIncludedFilePaths(List<Path> paths) throws IOException {
+    storeFilePathAccessor.writeStoreFilePaths(tableName, REGION_NAME, STORE_NAME,
+      StoreFilePathUpdate.builder().withStorePaths(paths).build());
+    verifyIncludedFilePaths(paths);
+  }
+
+  protected void verifyIncludedFilePaths(List<Path> expectPaths) throws IOException {
+    assertEquals(expectPaths, storeFilePathAccessor
+      .getIncludedStoreFilePaths(tableName, REGION_NAME, STORE_NAME));
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHTableStoreFilePathAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHTableStoreFilePathAccessor.java
new file mode 100644
index 0000000..bf7d233
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHTableStoreFilePathAccessor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import java.io.IOException;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+
+@Category({ MiscTests.class, MediumTests.class })
+public class TestHTableStoreFilePathAccessor extends StoreFilePathAccessorTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestHTableStoreFilePathAccessor.class);
+
+  private Admin admin;
+
+  @Override
+  protected HTableStoreFilePathAccessor getStoreFilePathAccessor() {
+    return new HTableStoreFilePathAccessor(TEST_UTIL.getConfiguration(), admin.getConnection());
+  }
+
+  @Override
+  public void init() throws Exception {
+    admin = TEST_UTIL.getAdmin();
+  }
+
+  @Override
+  public void cleanupTest() throws IOException {
+    if (admin.tableExists(TableName.STOREFILE_TABLE_NAME)
+      && admin.isTableEnabled(TableName.STOREFILE_TABLE_NAME)) {
+      admin.disableTable(TableName.STOREFILE_TABLE_NAME);
+      admin.deleteTable(TableName.STOREFILE_TABLE_NAME);
+    }
+  }
+
+  @Override
+  public void verifyInitialize(MasterServices masterServices) throws Exception {
+    assertFalse(admin.tableExists(TableName.STOREFILE_TABLE_NAME));
+    StoreFileTrackingUtils.init(TEST_UTIL.getHBaseCluster().getMaster());
+    assertNotNull(TEST_UTIL.getConnection().getTable(TableName.STOREFILE_TABLE_NAME));
+    assertTrue(
+      TEST_UTIL.getMiniHBaseCluster().getRegions(TableName.STOREFILE_TABLE_NAME).size() >= 1);
+    assertTrue("hbase:storefile table must be assigned and enabled.",
+      StoreFileTrackingUtils.isStoreFileTableAssignedAndEnabled(masterServices));
+  }
+
+  @Override
+  public void verifyNotInitializedException() {
+    expectedException.expect(TableNotFoundException.class);
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedStoreEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedStoreEngine.java
new file mode 100644
index 0000000..dc2d0c7
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedStoreEngine.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.hamcrest.core.Is.isA;
+import static org.junit.Assert.assertEquals;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellComparatorImpl;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TestName;
+import org.mockito.Mockito;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestPersistedStoreEngine {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestPersistedStoreEngine.class);
+
+  @Rule
+  public TestName name = new TestName();
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static final byte[] DEFAULT_STORE_BYTE = TEST_UTIL.fam1;
+
+  private TableName tableName;
+  private Configuration conf;
+  private HStore store;
+
+  @BeforeClass
+  public static void setUpCluster() throws Exception {
+    TEST_UTIL.getConfiguration().setBoolean(HConstants.STOREFILE_TRACKING_PERSIST_ENABLED, true);
+    TEST_UTIL.getConfiguration().set(StoreEngine.STORE_ENGINE_CLASS_KEY,
+      PersistedStoreEngine.class.getName());
+    TEST_UTIL.startMiniCluster();
+  }
+
+  @Before
+  public void setup() throws Exception {
+    store = Mockito.mock(HStore.class);
+    StoreContext context = new StoreContext.Builder().build();
+
+    conf = TEST_UTIL.getConfiguration();
+    Mockito.when(store.getStoreContext()).thenReturn(context);
+    Mockito.when(store.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    for (TableDescriptor htd: TEST_UTIL.getAdmin().listTableDescriptors()) {
+      TEST_UTIL.deleteTable(htd.getTableName());
+    }
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testEngine() throws IOException {
+    tableName = TableName.valueOf(name.getMethodName());
+    createTableAndLoadData();
+    StoreEngine storeEngine = TEST_UTIL.getMiniHBaseCluster().getRegions(tableName).get(0)
+      .getStore(DEFAULT_STORE_BYTE).getStoreEngine();
+    verifyStoreEngineAndStoreFileManager(storeEngine, PersistedStoreEngine.class,
+      PersistedStoreFileManager.class);
+  }
+
+  @Test
+  public void testEngineWithStorefileTrackingPersistDisabled() throws IOException {
+    expectedException.expect(IOException.class);
+    expectedException.expectMessage("Unable to load configured store engine '"
+      + PersistedStoreEngine.class.getName() + "'");
+    expectedException.expectCause(isA(IllegalArgumentException.class));
+    conf.setBoolean(HConstants.STOREFILE_TRACKING_PERSIST_ENABLED, false);
+    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, PersistedStoreEngine.class.getName());
+    CellComparator cellComparator = new CellComparatorImpl();
+    StoreEngine.create(store, conf, cellComparator);
+  }
+
+  @Test
+  public void testEngineWithOnlyStorefileTrackingPersistEnabled() throws IOException {
+    // just setting the storefile tracking enabled will not take any consideration for store engine
+    // creation because it does not go thru PersistedStoreEngine, but the master startup will fail
+    conf.setBoolean(HConstants.STOREFILE_TRACKING_PERSIST_ENABLED, true);
+    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
+    CellComparator cellComparator = new CellComparatorImpl();
+    StoreEngine storeEngine = StoreEngine.create(store, conf, cellComparator);
+    verifyStoreEngineAndStoreFileManager(storeEngine, DefaultStoreEngine.class,
+      DefaultStoreFileManager.class);
+  }
+
+  private void createTableAndLoadData() throws IOException {
+    Table testTable = TEST_UTIL.createMultiRegionTable(tableName, DEFAULT_STORE_BYTE);
+    int loadedRows = TEST_UTIL.loadTable(testTable, DEFAULT_STORE_BYTE);
+    int actualCount = TEST_UTIL.countRows(testTable);
+    assertEquals(loadedRows, actualCount);
+  }
+
+  private void verifyStoreEngineAndStoreFileManager(StoreEngine storeEngine, Class storeEngineClass,
+    Class storeFileManagerClass) {
+    StoreFileManager storeFileManager = storeEngine.getStoreFileManager();
+    assertEquals(storeEngineClass, storeEngine.getClass());
+    assertEquals(storeFileManagerClass, storeFileManager.getClass());
+  }
+
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedStoreFileManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedStoreFileManager.java
new file mode 100644
index 0000000..6bb52cf
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedStoreFileManager.java
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellComparatorImpl;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.mockito.Mockito;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.ListUtils;
+
+@Category({ RegionServerTests.class, LargeTests.class })
+public class TestPersistedStoreFileManager {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestPersistedStoreFileManager.class);
+
+  @Rule
+  public TestName name = new TestName();
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static final byte[] DEFAULT_STORE_BYTE = TEST_UTIL.fam1;
+  private static final String DEFAULT_STORE_NAME = Bytes.toString(DEFAULT_STORE_BYTE);
+  private static final ArrayList<HStoreFile> EMPTY_LIST = new ArrayList<>();
+  private static final CellComparator DEFAULT_CELL_COMPARATOR = new CellComparatorImpl();
+  private static final Comparator<HStoreFile> COMPARATOR = StoreFileComparators.SEQ_ID;
+
+  private Path baseDir;
+  private FileSystem fs;
+  private PersistedStoreFileManager storeFileManager;
+  private StoreFilePathAccessor storeFilePathAccessor;
+  private Configuration conf;
+  private HRegion region;
+  private HRegionFileSystem regionFS;
+  private List<HStoreFile> initialStoreFiles;
+  private List<HStoreFile> sortedInitialStoreFiles;
+  private List<HStoreFile> additionalStoreFiles;
+  private List<HStoreFile> sortedAdditionalStoreFiles;
+  private List<HStoreFile> sortedCombinedStoreFiles;
+  private List<Path> initialStorePaths;
+  private List<Path> additionalStorePaths;
+  private TableName tableName;
+  private String regionName;
+  private TableDescriptor htd;
+  private RegionInfo regioninfo;
+
+  @BeforeClass
+  public static void setUpCluster() throws Exception {
+    TEST_UTIL.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void before() throws IOException, InterruptedException {
+    conf = TEST_UTIL.getConfiguration();
+    baseDir = TEST_UTIL.getDataTestDirOnTestFS();
+    tableName = TableName.valueOf(name.getMethodName());
+    htd = TEST_UTIL.createTableDescriptor(tableName, DEFAULT_STORE_BYTE);
+    regioninfo = RegionInfoBuilder.newBuilder(tableName).build();
+    region = TEST_UTIL.createRegionAndWAL(regioninfo, baseDir, conf, htd);
+    regionFS = region.getRegionFileSystem();
+    fs = TEST_UTIL.getTestFileSystem();
+    initialStoreFiles = createStoreFilesList();
+    sortedInitialStoreFiles = ImmutableList.sortedCopyOf(COMPARATOR, initialStoreFiles);
+    additionalStoreFiles = createStoreFilesList();
+    sortedAdditionalStoreFiles = ImmutableList.sortedCopyOf(COMPARATOR, additionalStoreFiles);
+    sortedCombinedStoreFiles = ImmutableList.sortedCopyOf(COMPARATOR,
+      ListUtils.union(initialStoreFiles, additionalStoreFiles));
+    initialStorePaths = createPathList();
+    additionalStorePaths = createPathList();
+    regionName = region.getRegionInfo().getEncodedName();
+
+    storeFilePathAccessor =
+      new HTableStoreFilePathAccessor(conf, TEST_UTIL.getAdmin().getConnection());
+    // the hbase:storefile should be created in master startup, but we initialize it here for
+    // unit tests
+    StoreFileTrackingUtils.init(TEST_UTIL.getHBaseCluster().getMaster());
+
+    storeFileManager =
+      new PersistedStoreFileManager(DEFAULT_CELL_COMPARATOR, COMPARATOR, conf,
+        Mockito.mock(CompactionPolicy.class).getConf(), regionFS, regioninfo,
+        DEFAULT_STORE_NAME, storeFilePathAccessor);
+
+    verifyStoreFileManagerWhenStarts();
+  }
+
+  @After
+  public void after() throws IOException {
+    storeFilePathAccessor
+      .deleteStoreFilePaths(tableName.getNameAsString(), regionName, DEFAULT_STORE_NAME);
+  }
+
+  @Test
+  public void testLoadFiles() throws IOException {
+    storeFileManager.loadFiles(initialStoreFiles);
+    compareIncludedInManagerVsTable(sortedInitialStoreFiles);
+  }
+
+  @Test
+  public void testLoadFiles_WithReadOnly() throws IOException {
+    storeFileManager =
+      new PersistedStoreFileManager(DEFAULT_CELL_COMPARATOR, COMPARATOR, conf,
+        Mockito.mock(CompactionPolicy.class).getConf(), regionFS, regioninfo,
+        DEFAULT_STORE_NAME, storeFilePathAccessor, true);
+    storeFileManager.loadFiles(initialStoreFiles);
+    compareIncludedInManagerVsTable(sortedInitialStoreFiles, EMPTY_LIST);
+  }
+
+  @Test
+  public void testLoadFilesWithEmptyListWithExistingData() throws IOException {
+    // first load data into the store and simulate we have persisted data
+    storeFileManager.loadFiles(initialStoreFiles);
+
+    // writing empty list to loadFiles will not fail but it's not doing anything
+    // and mostly this is expected when a fresh region is created.
+    storeFileManager.loadFiles(EMPTY_LIST);
+    // on heap view will be updated to empty, but the pre step loadInitialFiles should have
+    // provide the right view
+    //
+    // this test is telling us that we will never write empty to include list.
+    compareIncludedInManagerVsTable(EMPTY_LIST, sortedInitialStoreFiles);
+  }
+
+  @Test
+  public void testLoadFilesWithEmptyList() throws IOException {
+    storeFileManager.loadFiles(EMPTY_LIST);
+    compareIncludedInManagerVsTable(EMPTY_LIST);
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testLoadFilesWithNull() throws IOException {
+    storeFileManager.loadFiles(initialStoreFiles);
+    storeFileManager.loadFiles(null);
+  }
+
+  @Test
+  public void testInsertNewFiles() throws IOException {
+    storeFileManager.insertNewFiles(initialStoreFiles);
+    compareIncludedInManagerVsTable(sortedInitialStoreFiles);
+    storeFileManager.insertNewFiles(additionalStoreFiles);
+    compareIncludedInManagerVsTable(sortedCombinedStoreFiles);
+  }
+
+  @Test
+  public void testLoadInitialFilesWithNoData() throws IOException {
+    assertNull(storeFileManager.loadInitialFiles());
+  }
+
+  @Test
+  public void testLoadInitialFiles() throws IOException {
+    StoreFilePathAccessor mockStoreFilePathAccessor = Mockito.mock(StoreFilePathAccessor.class);
+    HRegionFileSystem mockFs = Mockito.spy(regionFS);
+    PersistedStoreFileManager storeFileManager =
+      new PersistedStoreFileManager(DEFAULT_CELL_COMPARATOR, COMPARATOR, conf,
+        Mockito.mock(CompactionPolicy.class).getConf(), mockFs, regioninfo,
+        DEFAULT_STORE_NAME, mockStoreFilePathAccessor);
+
+    // make sure the tracking table is not empty and return the list of initialStoreFiles
+    List<Path> storeFilePaths =
+      StoreFileTrackingUtils.convertStoreFilesToPaths(initialStoreFiles);
+    when(mockStoreFilePathAccessor
+      .getIncludedStoreFilePaths(tableName.getNameAsString(), regionName, DEFAULT_STORE_NAME))
+      .thenReturn(storeFilePaths);
+
+    Collection<StoreFileInfo> expectedStoreFileInfos =
+      convertToStoreFileInfos(mockFs.getFileSystem(), initialStoreFiles);
+    Collection<StoreFileInfo> actualStoreFileInfos = storeFileManager.loadInitialFiles();
+    verify(mockFs, times(0)).getStoreFiles(DEFAULT_STORE_NAME);
+    assertEquals(expectedStoreFileInfos, actualStoreFileInfos);
+  }
+
+  @Test
+  public void testLoadInitialFilesWithRefreshFileSystem() throws IOException {
+    StoreFilePathAccessor mockStoreFilePathAccessor = Mockito.mock(StoreFilePathAccessor.class);
+    HRegionFileSystem mockFs = Mockito.mock(HRegionFileSystem.class);
+    PersistedStoreFileManager storeFileManager =
+      new PersistedStoreFileManager(DEFAULT_CELL_COMPARATOR, COMPARATOR, conf,
+        Mockito.mock(CompactionPolicy.class).getConf(), mockFs, regioninfo,
+        DEFAULT_STORE_NAME, mockStoreFilePathAccessor, false);
+
+    Collection<StoreFileInfo> expectedStoreFileInfos =
+      convertToStoreFileInfos(fs, initialStoreFiles);
+
+    when(mockFs.getStoreFiles(DEFAULT_STORE_NAME)).thenReturn(expectedStoreFileInfos);
+
+    Collection<StoreFileInfo> actualStoreFileInfos = storeFileManager.loadInitialFiles();
+    verify(mockFs, times(1)).getStoreFiles(DEFAULT_STORE_NAME);
+    assertEquals(expectedStoreFileInfos, actualStoreFileInfos);
+  }
+
+  @Test
+  public void testLoadInitialFilesWithNoFiles() throws IOException {
+    HRegionFileSystem mockFs = Mockito.mock(HRegionFileSystem.class);
+    when(mockFs.getStoreFiles(DEFAULT_STORE_NAME)).thenReturn(null);
+
+    PersistedStoreFileManager storeFileManager =
+      new PersistedStoreFileManager(DEFAULT_CELL_COMPARATOR, COMPARATOR, conf,
+        Mockito.mock(CompactionPolicy.class).getConf(), mockFs, regioninfo,
+        DEFAULT_STORE_NAME, storeFilePathAccessor);
+
+    Collection<StoreFileInfo> actualStoreFileInfos = storeFileManager.loadInitialFiles();
+    verify(mockFs, times(1)).getStoreFiles(DEFAULT_STORE_NAME);
+    assertNull(actualStoreFileInfos);
+    assertEquals(EMPTY_LIST, storeFileManager.getStorefiles());
+  }
+
+  @Test
+  public void testLoadInitialFilesWithFilesFromFileSystem() throws IOException {
+    HRegionFileSystem mockFs = Mockito.mock(HRegionFileSystem.class);
+    when(mockFs.getFileSystem()).thenReturn(fs);
+    Collection<StoreFileInfo> expectedStoreFileInfos =
+      convertToStoreFileInfos(fs, initialStoreFiles);
+    when(mockFs.getStoreFiles(DEFAULT_STORE_NAME)).thenReturn(expectedStoreFileInfos);
+
+    PersistedStoreFileManager storeFileManager =
+      new PersistedStoreFileManager(DEFAULT_CELL_COMPARATOR, COMPARATOR, conf,
+        Mockito.mock(CompactionPolicy.class).getConf(), mockFs, regioninfo,
+        DEFAULT_STORE_NAME, storeFilePathAccessor);
+
+    // try to check if there is any store file to be loaded
+    Collection<StoreFileInfo> actualStoreFileInfos = storeFileManager.loadInitialFiles();
+    verify(mockFs, times(1)).getStoreFiles(DEFAULT_STORE_NAME);
+    assertEquals(expectedStoreFileInfos, actualStoreFileInfos);
+  }
+
+  @Test
+  public void testClearFiles() throws IOException {
+    storeFileManager.clearFiles();
+    compareIncludedInManagerVsTable(EMPTY_LIST);
+
+    storeFileManager.loadFiles(initialStoreFiles);
+    compareIncludedInManagerVsTable(sortedInitialStoreFiles);
+
+    storeFileManager.clearFiles();
+    compareIncludedInManagerVsTable(EMPTY_LIST, sortedInitialStoreFiles);
+  }
+
+  @Test
+  public void testClearCompactedFiles() throws IOException {
+    storeFileManager.clearCompactedFiles();
+    verifyCompactedfiles(EMPTY_LIST);
+
+    storeFileManager.addCompactionResults(initialStoreFiles, initialStoreFiles);
+    verifyCompactedfiles(sortedInitialStoreFiles);
+    storeFileManager.clearCompactedFiles();
+    verifyCompactedfiles(EMPTY_LIST);
+  }
+
+  @Test
+  public void testAddCompactionResults() throws IOException {
+    storeFileManager.clearFiles();
+    storeFileManager.clearCompactedFiles();
+
+    List<HStoreFile> firstCompactionResult = createStoreFilesList();
+    List<Path> firstCompactionResultPath =
+      StoreFileTrackingUtils.convertStoreFilesToPaths(firstCompactionResult);
+    List<HStoreFile> secondCompactionResult = createStoreFilesList();
+    List<Path> secondCompactionResultPath =
+      StoreFileTrackingUtils.convertStoreFilesToPaths(secondCompactionResult);
+    List<HStoreFile> thirdCompactionResult = createStoreFilesList();
+    List<Path> thirdCompactionResultPath =
+      StoreFileTrackingUtils.convertStoreFilesToPaths(thirdCompactionResult);
+    // Composition of all compaction results, loaded into tmpFiles to track that tmpFiles are being
+    // removed correctly
+    List<Path> initialCompactionTmpPaths = ImmutableList.copyOf(Iterables
+      .concat(firstCompactionResultPath, secondCompactionResultPath, thirdCompactionResultPath));
+
+    // manager.storefiles    = EMPTY_LIST            -> firstCompactionResult
+    // manager.compactedfiles= [empty]               -> EMPTY_LIST
+    // manager.tmpFiles = EMPTY_LIST                 -> initialCompactionTmpPaths (all results)
+    storeFileManager.addCompactionResults(EMPTY_LIST, firstCompactionResult);
+    ImmutableList<HStoreFile> expectedFirstCompactionResult =
+      ImmutableList.sortedCopyOf(COMPARATOR, firstCompactionResult);
+    compareIncludedInManagerVsTable(expectedFirstCompactionResult);
+    verifyCompactedfiles(EMPTY_LIST);
+
+    // manager.storefiles    = firstCompactionResult -> secondCompactionResult
+    // manager.compactedfiles= EMPTY_LIST  -> firstCompactionResult
+    // manager.tmpFiles      = second + third -> third
+    storeFileManager.addCompactionResults(firstCompactionResult, secondCompactionResult);
+    ImmutableList<HStoreFile> expectedSecondCompactionResult =
+      ImmutableList.sortedCopyOf(COMPARATOR, secondCompactionResult);
+    compareIncludedInManagerVsTable(expectedSecondCompactionResult);
+    verifyCompactedfiles(expectedFirstCompactionResult);
+
+    // check manager.compactedfiles accumulates
+    // manager.storefiles    = secondCompactionResult-> thirdCompactionResult
+    // manager.compactedfiles= firstCompactionResult -> firstCompactionResult+secondCompactionResult
+    // manager.tmpFiles      = third -> EMPTY
+    storeFileManager.addCompactionResults(secondCompactionResult, thirdCompactionResult);
+    ImmutableList<HStoreFile> expectedThirdCompactionResult =
+      ImmutableList.sortedCopyOf(COMPARATOR, thirdCompactionResult);
+    compareIncludedInManagerVsTable(expectedThirdCompactionResult);
+    secondCompactionResult.addAll(firstCompactionResult);
+    ImmutableList<HStoreFile> expectedCompactedfiles =
+      ImmutableList.sortedCopyOf(COMPARATOR, secondCompactionResult);
+    verifyCompactedfiles(expectedCompactedfiles);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testAddCompactionResultsWithEmptyResults() throws IOException {
+    // PersistedStoreFileManager can only perform addCompactionResults
+    // after storefiles are flushed and being tracking in-memory
+    storeFileManager.addCompactionResults(initialStoreFiles, EMPTY_LIST);
+  }
+
+  @Test
+  public void testRemoveCompactedFiles() throws IOException {
+    // make sure the store file tracking is empty
+    compareIncludedInManagerVsTable(EMPTY_LIST);
+    verifyCompactedfiles(EMPTY_LIST);
+
+    List<HStoreFile> storefilesSet1 = initialStoreFiles;
+    List<HStoreFile> storefilesSet2 = additionalStoreFiles;
+    List<HStoreFile> compactedFiles = Lists.newArrayList(storefilesSet1);
+    // load some files into store file manager
+    storeFileManager.loadFiles(storefilesSet1);
+
+    storeFileManager.addCompactionResults(storefilesSet1, storefilesSet2);
+    List<HStoreFile> expectedCompactedfiles = sortedInitialStoreFiles;
+    List<HStoreFile> expectedIncluded = sortedAdditionalStoreFiles;
+    verifyCompactedfiles(expectedCompactedfiles);
+    compareIncludedInManagerVsTable(expectedIncluded);
+
+    storeFileManager.removeCompactedFiles(compactedFiles);
+    verifyCompactedfiles(EMPTY_LIST);
+    compareIncludedInManagerVsTable(expectedIncluded);
+  }
+
+  @Test
+  public void testRemoveCompactedFilesWhenEmpty() throws IOException {
+    // simulate the store.close() remove again and it should not change the table both in memory
+    // and the table.
+    HRegionFileSystem mockFs = Mockito.mock(HRegionFileSystem.class);
+    StoreFilePathAccessor mockStoreFilePathAccessor = Mockito.mock(StoreFilePathAccessor.class);
+    PersistedStoreFileManager storeFileManager =
+      new PersistedStoreFileManager(DEFAULT_CELL_COMPARATOR, COMPARATOR, conf,
+        Mockito.mock(CompactionPolicy.class).getConf(), mockFs, regioninfo,
+        DEFAULT_STORE_NAME, mockStoreFilePathAccessor);
+    assertTrue(storeFileManager.getCompactedfiles().isEmpty());
+    storeFileManager.removeCompactedFiles(sortedInitialStoreFiles);
+    assertTrue(storeFileManager.getCompactedfiles().isEmpty());
+  }
+
+  @Test
+  public void testRemoveCompactedFilesNormalOperation() throws IOException {
+    // simulate the store.close() remove again and it should not change the table both in memory
+    // and the table.
+    HRegionFileSystem mockFs = Mockito.mock(HRegionFileSystem.class);
+    StoreFilePathAccessor mockStoreFilePathAccessor = Mockito.mock(StoreFilePathAccessor.class);
+    PersistedStoreFileManager storeFileManager =
+      new PersistedStoreFileManager(DEFAULT_CELL_COMPARATOR, COMPARATOR, conf,
+        Mockito.mock(CompactionPolicy.class).getConf(), mockFs, regioninfo, DEFAULT_STORE_NAME,
+        mockStoreFilePathAccessor);
+    assertTrue(storeFileManager.getCompactedfiles().isEmpty());
+
+    storeFileManager.addCompactionResults(initialStoreFiles, initialStoreFiles);
+    assertEquals(storeFileManager.getCompactedfiles(), sortedInitialStoreFiles);
+    storeFileManager.removeCompactedFiles(sortedInitialStoreFiles);
+    assertTrue(storeFileManager.getCompactedfiles().isEmpty());
+  }
+
+  @Test
+  public void testUpdatePathListToTracker_ReadOnly() throws IOException {
+    HRegionFileSystem mockFs = Mockito.mock(HRegionFileSystem.class);
+    StoreFilePathAccessor mockStoreFilePathAccessor = Mockito.mock(StoreFilePathAccessor.class);
+    PersistedStoreFileManager storeFileManager =
+      new PersistedStoreFileManager(DEFAULT_CELL_COMPARATOR, COMPARATOR, conf,
+        Mockito.mock(CompactionPolicy.class).getConf(), mockFs, regioninfo, DEFAULT_STORE_NAME,
+        mockStoreFilePathAccessor, true);
+
+    StoreFilePathUpdate storeFilePathUpdate = StoreFilePathUpdate.builder()
+      .withStoreFiles(initialStoreFiles).build();
+    storeFileManager.updatePathListToTracker(storeFilePathUpdate);
+    verify(mockStoreFilePathAccessor, times(0))
+      .writeStoreFilePaths(regioninfo.getTable().getNameAsString(), regioninfo.getEncodedName(),
+        DEFAULT_STORE_NAME, storeFilePathUpdate);
+  }
+
+  private void compareIncludedInManagerVsTable(List<HStoreFile> expectedFiles) throws IOException {
+    compareIncludedInManagerVsTable(expectedFiles, expectedFiles);
+  }
+
+  private void compareIncludedInManagerVsTable(List<HStoreFile> expectedStoreFilesOnHeap,
+    List<HStoreFile> expectedStoreFilesInTable) throws IOException {
+    Collection<HStoreFile> storeFilesOnHeap = storeFileManager.getStorefiles();
+    assertEquals(expectedStoreFilesOnHeap, storeFilesOnHeap);
+
+    Collection<Path> includedPathsFromAccessor = storeFilePathAccessor
+      .getIncludedStoreFilePaths(tableName.getNameAsString(), regionName, DEFAULT_STORE_NAME);
+    assertEquals(StoreFileTrackingUtils.convertStoreFilesToPaths(expectedStoreFilesInTable),
+      includedPathsFromAccessor);
+  }
+
+  private void verifyCompactedfiles(List<HStoreFile> expectedCompactedfilesOnHeap) {
+    Collection<HStoreFile> compactedFilesOnHeap = storeFileManager.getCompactedfiles();
+    assertEquals(expectedCompactedfilesOnHeap, compactedFilesOnHeap);
+  }
+
+  private List<HStoreFile> createStoreFilesList() throws IOException {
+    HStoreFile sf1 = createFile();
+    HStoreFile sf2 = createFile();
+    HStoreFile sf3 = createFile();
+    return Lists.newArrayList(sf1, sf2, sf3);
+  }
+
+  private List<Path> createPathList() throws IOException {
+    Path path1 = createFilePath();
+    Path path2 = createFilePath();
+    Path path3 = createFilePath();
+    return Lists.newArrayList(path1, path2, path3);
+  }
+
+  private MockHStoreFile createFile() throws IOException {
+    return new MockHStoreFile(TEST_UTIL, createFilePath(), 0, 0, false, 1);
+  }
+
+  private Path createFilePath() throws IOException {
+    Path testFilePath = StoreFileWriter.getUniqueFile(fs, baseDir);
+    FSDataOutputStream out = fs.create(testFilePath);
+    out.write(0);
+    out.close();
+    return testFilePath;
+  }
+
+  private void verifyStoreFileManagerWhenStarts() {
+    assertTrue(storeFileManager.getStorefiles().isEmpty());
+    assertTrue(storeFileManager.getCompactedfiles().isEmpty());
+  }
+
+  private Collection<StoreFileInfo> convertToStoreFileInfos(FileSystem fs,
+    List<HStoreFile> storeFiles)
+    throws IOException {
+    ArrayList<StoreFileInfo> result = new ArrayList<>(storeFiles.size());
+    for (HStoreFile storeFile: storeFiles) {
+      StoreFileInfo info = new StoreFileInfo(conf, fs, storeFile.getFileInfo().getFileStatus());
+      result.add(info);
+    }
+    return result;
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileTrackingUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileTrackingUtils.java
new file mode 100644
index 0000000..82040bf
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileTrackingUtils.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MiscTests.class, SmallTests.class })
+public class TestStoreFileTrackingUtils {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestStoreFileTrackingUtils.class);
+
+  private Configuration conf;
+  private boolean isFeatureEnabled;
+
+  @Before
+  public void setup() throws IOException {
+    conf = HBaseConfiguration.create();
+  }
+
+  @Test
+  public void testIsStoreFileTrackingPersistEnabled() {
+    conf.setBoolean(HConstants.STOREFILE_TRACKING_PERSIST_ENABLED, true);
+    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, PersistedStoreEngine.class.getName());
+    isFeatureEnabled = StoreFileTrackingUtils.isStoreFileTrackingPersistEnabled(conf);
+    assertTrue(isFeatureEnabled);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testIsStorefileTrackingPersistDisabledWithStoreEngineSet() {
+    conf.setBoolean(HConstants.STOREFILE_TRACKING_PERSIST_ENABLED, false);
+    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, PersistedStoreEngine.class.getName());
+    isFeatureEnabled = StoreFileTrackingUtils.isStoreFileTrackingPersistEnabled(conf);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testisStoreFileTrackingPersistEnabledWithMismatchedStoreEngine() {
+    conf.setBoolean(HConstants.STOREFILE_TRACKING_PERSIST_ENABLED, true);
+    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
+    isFeatureEnabled = StoreFileTrackingUtils.isStoreFileTrackingPersistEnabled(conf);
+  }
+
+  @Test
+  public void testIsStorefileTrackingPersistDisabled() {
+    conf.setBoolean(HConstants.STOREFILE_TRACKING_PERSIST_ENABLED, false);
+    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
+    isFeatureEnabled = StoreFileTrackingUtils.isStoreFileTrackingPersistEnabled(conf);
+    assertFalse(isFeatureEnabled);
+  }
+
+  @Test
+  public void testGetFamilyFromKey() {
+    String separator = "-";
+    String rowkey1 = "region-cf-table";
+    String rowkey2 = "region-new-cf-table";
+    assertEquals("cf",
+      StoreFileTrackingUtils.getFamilyFromKey(rowkey1, "table", "region", separator));
+    assertEquals("new-cf",
+      StoreFileTrackingUtils.getFamilyFromKey(rowkey2, "table", "region", separator));
+  }
+
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java
index 1d1e9fb..2f9fcf2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java
@@ -611,10 +611,11 @@ public class TestStripeStoreFileManager {
 
   private static StripeStoreFileManager createManager(
       ArrayList<HStoreFile> sfs, Configuration conf) throws Exception {
+    HStore store = Mockito.mock(HStore.class);
     StripeStoreConfig config = new StripeStoreConfig(
-        conf, Mockito.mock(StoreConfigInformation.class));
+        conf, store);
     StripeStoreFileManager result = new StripeStoreFileManager(CellComparatorImpl.COMPARATOR, conf,
-        config);
+        config, store.getRegionFileSystem(), store.getColumnFamilyName());
     result.loadFiles(sfs);
     return result;
   }