You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/08/09 12:47:49 UTC

[hbase] branch HBASE-26067 updated (0875933 -> aca033c)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a change to branch HBASE-26067
in repository https://gitbox.apache.org/repos/asf/hbase.git.


 discard 0875933  HBASE-26064 Introduce a StoreFileTracker to abstract the store file tracking logic
     add a79a9cc5 HBASE-26179 TestRequestTooBigException spends too much time to finish (#3571)
     add 6a1382a  HBASE-26176 Correct regex in hbase-personality.sh (#3567)
     new aca033c  HBASE-26064 Introduce a StoreFileTracker to abstract the store file tracking logic

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (0875933)
            \
             N -- N -- N   refs/heads/HBASE-26067 (aca033c)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 dev-support/hbase-personality.sh                   | 10 ++--
 .../hadoop/hbase/ipc/NettyRpcFrameDecoder.java     |  9 ++--
 .../org/apache/hadoop/hbase/ipc/RpcServer.java     |  7 +--
 .../hbase/ipc/SimpleServerRpcConnection.java       | 12 +++--
 .../hbase/client/TestRequestTooBigException.java   | 57 ++++++++++------------
 5 files changed, 45 insertions(+), 50 deletions(-)

[hbase] 01/01: HBASE-26064 Introduce a StoreFileTracker to abstract the store file tracking logic

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-26067
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit aca033c10c74e3861a32a116808ceb7aaba332c2
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Thu Jul 29 18:35:19 2021 +0800

    HBASE-26064 Introduce a StoreFileTracker to abstract the store file tracking logic
    
    Signed-off-by: Wellington Chevreuil <wc...@apache.org>
---
 .../hadoop/hbase/mob/DefaultMobStoreCompactor.java |  22 +-
 .../hadoop/hbase/mob/DefaultMobStoreFlusher.java   |   4 +-
 .../regionserver/CreateStoreFileWriterParams.java  | 134 ++++
 .../hbase/regionserver/DateTieredStoreEngine.java  |   5 +-
 .../hbase/regionserver/DefaultStoreEngine.java     |   5 +-
 .../hbase/regionserver/DefaultStoreFlusher.java    |  11 +-
 .../hadoop/hbase/regionserver/HMobStore.java       |   3 +-
 .../hbase/regionserver/HRegionFileSystem.java      |  10 +-
 .../apache/hadoop/hbase/regionserver/HStore.java   | 699 +++++----------------
 .../hadoop/hbase/regionserver/StoreContext.java    |   9 +
 .../hadoop/hbase/regionserver/StoreEngine.java     | 438 ++++++++++++-
 .../hbase/regionserver/StoreFileManager.java       |   9 +
 .../hadoop/hbase/regionserver/StoreFlusher.java    |   9 +-
 .../hadoop/hbase/regionserver/StoreUtils.java      |  36 +-
 .../hbase/regionserver/StripeStoreEngine.java      |   9 +-
 .../hbase/regionserver/StripeStoreFlusher.java     |   9 +-
 .../compactions/AbstractMultiOutputCompactor.java  |   7 +-
 .../hbase/regionserver/compactions/Compactor.java  |  36 +-
 .../regionserver/compactions/DefaultCompactor.java |  16 +-
 .../storefiletracker/DefaultStoreFileTracker.java  |  61 ++
 .../storefiletracker/StoreFileTracker.java         |  75 +++
 .../storefiletracker/StoreFileTrackerBase.java     | 178 ++++++
 .../storefiletracker/StoreFileTrackerFactory.java  |  35 ++
 .../util/compaction/MajorCompactionRequest.java    |   1 -
 .../org/apache/hadoop/hbase/TestIOFencing.java     |  12 +-
 .../regionserver/TestCacheOnWriteInSchema.java     |   6 +-
 .../hbase/regionserver/TestDefaultStoreEngine.java |   5 +-
 .../hadoop/hbase/regionserver/TestHRegion.java     |   4 +-
 .../hadoop/hbase/regionserver/TestHStore.java      |  28 +-
 .../TestRegionMergeTransactionOnCluster.java       |   6 +-
 .../regionserver/TestStoreFileRefresherChore.java  |   3 +-
 .../regionserver/TestStoreScannerClosure.java      |   6 +-
 .../hbase/regionserver/TestStripeStoreEngine.java  |   2 +
 .../compactions/TestDateTieredCompactor.java       |  12 +-
 .../compactions/TestStripeCompactionPolicy.java    |  12 +-
 .../compactions/TestStripeCompactor.java           |  12 +-
 36 files changed, 1224 insertions(+), 705 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
index c45fdff..01fe000 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
@@ -29,7 +29,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.Optional;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -144,17 +143,16 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
   };
 
   private final CellSinkFactory<StoreFileWriter> writerFactory =
-      new CellSinkFactory<StoreFileWriter>() {
-        @Override
-        public StoreFileWriter createWriter(InternalScanner scanner,
-            org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
-            boolean shouldDropBehind, boolean major) throws IOException {
-          // make this writer with tags always because of possible new cells with tags.
-          return store.createWriterInTmp(fd.maxKeyCount,
-            major ? majorCompactionCompression : minorCompactionCompression,
-            true, true, true, shouldDropBehind);
-        }
-      };
+    new CellSinkFactory<StoreFileWriter>() {
+      @Override
+      public StoreFileWriter createWriter(InternalScanner scanner,
+        org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
+        boolean shouldDropBehind, boolean major) throws IOException {
+        // make this writer with tags always because of possible new cells with tags.
+        return store.getStoreEngine().createWriter(
+          createParams(fd, shouldDropBehind, major).includeMVCCReadpoint(true).includesTag(true));
+      }
+    };
 
   public DefaultMobStoreCompactor(Configuration conf, HStore store) {
     super(conf, store);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
index 480b85c..4a1dc7b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
@@ -25,7 +25,6 @@ import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
@@ -127,8 +126,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
       synchronized (flushLock) {
         status.setStatus("Flushing " + store + ": creating writer");
         // Write the map out to the disk
-        writer = store.createWriterInTmp(cellsCount, store.getColumnFamilyDescriptor().getCompressionType(),
-            false, true, true, false);
+        writer = createWriter(snapshot, true);
         IOException e = null;
         try {
           // It's a mob store, flush the cells in a mob way. This is the difference of flushing
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CreateStoreFileWriterParams.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CreateStoreFileWriterParams.java
new file mode 100644
index 0000000..10cd9f0
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CreateStoreFileWriterParams.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public final class CreateStoreFileWriterParams {
+
+  private long maxKeyCount;
+
+  private Compression.Algorithm compression;
+
+  private boolean isCompaction;
+
+  private boolean includeMVCCReadpoint;
+
+  private boolean includesTag;
+
+  private boolean shouldDropBehind;
+
+  private long totalCompactedFilesSize = -1;
+
+  private String fileStoragePolicy = HConstants.EMPTY_STRING;
+
+  private CreateStoreFileWriterParams() {
+  }
+
+  public long maxKeyCount() {
+    return maxKeyCount;
+  }
+
+  public CreateStoreFileWriterParams maxKeyCount(long maxKeyCount) {
+    this.maxKeyCount = maxKeyCount;
+    return this;
+  }
+
+  public Compression.Algorithm compression() {
+    return compression;
+  }
+
+  /**
+   * Set the compression algorithm to use
+   */
+  public CreateStoreFileWriterParams compression(Compression.Algorithm compression) {
+    this.compression = compression;
+    return this;
+  }
+
+  public boolean isCompaction() {
+    return isCompaction;
+  }
+
+  /**
+   * Whether we are creating a new file in a compaction
+   */
+  public CreateStoreFileWriterParams isCompaction(boolean isCompaction) {
+    this.isCompaction = isCompaction;
+    return this;
+  }
+
+  public boolean includeMVCCReadpoint() {
+    return includeMVCCReadpoint;
+  }
+
+  /**
+   * Whether to include MVCC or not
+   */
+  public CreateStoreFileWriterParams includeMVCCReadpoint(boolean includeMVCCReadpoint) {
+    this.includeMVCCReadpoint = includeMVCCReadpoint;
+    return this;
+  }
+
+  public boolean includesTag() {
+    return includesTag;
+  }
+
+  /**
+   * Whether to includesTag or not
+   */
+  public CreateStoreFileWriterParams includesTag(boolean includesTag) {
+    this.includesTag = includesTag;
+    return this;
+  }
+
+  public boolean shouldDropBehind() {
+    return shouldDropBehind;
+  }
+
+  public CreateStoreFileWriterParams shouldDropBehind(boolean shouldDropBehind) {
+    this.shouldDropBehind = shouldDropBehind;
+    return this;
+  }
+
+  public long totalCompactedFilesSize() {
+    return totalCompactedFilesSize;
+  }
+
+  public CreateStoreFileWriterParams totalCompactedFilesSize(long totalCompactedFilesSize) {
+    this.totalCompactedFilesSize = totalCompactedFilesSize;
+    return this;
+  }
+
+  public String fileStoragePolicy() {
+    return fileStoragePolicy;
+  }
+
+  public CreateStoreFileWriterParams fileStoragePolicy(String fileStoragePolicy) {
+    this.fileStoragePolicy = fileStoragePolicy;
+    return this;
+  }
+
+  public static CreateStoreFileWriterParams create() {
+    return new CreateStoreFileWriterParams();
+  }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
index 1df953d..7422d91 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
@@ -19,18 +19,17 @@ package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
 import java.util.List;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
 import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy;
 import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest;
 import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactor;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * HBASE-15400 This store engine allows us to store data in date tiered layout with exponential
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
index 58f8bbb..693b9c9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
 import java.util.List;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellComparator;
@@ -39,8 +38,8 @@ import org.apache.yetus.audience.InterfaceAudience;
  * their derivatives.
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class DefaultStoreEngine extends StoreEngine<
-  DefaultStoreFlusher, RatioBasedCompactionPolicy, DefaultCompactor, DefaultStoreFileManager> {
+public class DefaultStoreEngine extends StoreEngine<DefaultStoreFlusher,
+  RatioBasedCompactionPolicy, DefaultCompactor, DefaultStoreFileManager> {
 
   public static final String DEFAULT_STORE_FLUSHER_CLASS_KEY =
       "hbase.hstore.defaultengine.storeflusher.class";
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
index a7d7fb1..306760d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
@@ -21,15 +21,14 @@ package org.apache.hadoop.hbase.regionserver;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Default implementation of StoreFlusher.
@@ -60,9 +59,7 @@ public class DefaultStoreFlusher extends StoreFlusher {
       synchronized (flushLock) {
         status.setStatus("Flushing " + store + ": creating writer");
         // Write the map out to the disk
-        writer = store.createWriterInTmp(cellsCount,
-            store.getColumnFamilyDescriptor().getCompressionType(), false, true,
-            snapshot.isTagsPresent(), false);
+        writer = createWriter(snapshot, false);
         IOException e = null;
         try {
           performFlush(scanner, writer, throughputController);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
index 7ce7f03..b00a50c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@ -28,7 +28,6 @@ import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -158,7 +157,7 @@ public class HMobStore extends HStore {
   protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,
       CellComparator cellComparator) throws IOException {
     MobStoreEngine engine = new MobStoreEngine();
-    engine.createComponents(conf, store, cellComparator);
+    engine.createComponentsOnce(conf, store, cellComparator);
     return engine;
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index dd26f1c..506e816 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -142,7 +142,7 @@ public class HRegionFileSystem {
   //  Temp Helpers
   // ===========================================================================
   /** @return {@link Path} to the region's temp directory, used for file creations */
-  Path getTempDir() {
+  public Path getTempDir() {
     return new Path(getRegionDir(), REGION_TEMP_DIR);
   }
 
@@ -237,11 +237,7 @@ public class HRegionFileSystem {
    * @param familyName Column Family Name
    * @return a set of {@link StoreFileInfo} for the specified family.
    */
-  public Collection<StoreFileInfo> getStoreFiles(final byte[] familyName) throws IOException {
-    return getStoreFiles(Bytes.toString(familyName));
-  }
-
-  public Collection<StoreFileInfo> getStoreFiles(final String familyName) throws IOException {
+  public List<StoreFileInfo> getStoreFiles(final String familyName) throws IOException {
     return getStoreFiles(familyName, true);
   }
 
@@ -251,7 +247,7 @@ public class HRegionFileSystem {
    * @param familyName Column Family Name
    * @return a set of {@link StoreFileInfo} for the specified family.
    */
-  public Collection<StoreFileInfo> getStoreFiles(final String familyName, final boolean validate)
+  public List<StoreFileInfo> getStoreFiles(final String familyName, final boolean validate)
       throws IOException {
     Path familyDir = getStoreDir(familyName);
     FileStatus[] files = CommonFSUtils.listStatus(this.fs, familyDir);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index c4e34be..34365b9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import com.google.errorprone.annotations.RestrictedApi;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.net.InetSocketAddress;
@@ -47,8 +48,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Predicate;
 import java.util.function.ToLongFunction;
 import java.util.stream.Collectors;
 import java.util.stream.LongStream;
@@ -70,17 +69,12 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.conf.ConfigurationManager;
 import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
 import org.apache.hadoop.hbase.io.HeapSize;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.crypto.Encryption;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFileContext;
-import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
-import org.apache.hadoop.hbase.log.HBaseMarkers;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.quotas.RegionSizeStore;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
@@ -110,7 +104,6 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection
 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
-import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
 import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
 import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils;
 
@@ -166,16 +159,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
   private boolean cacheOnWriteLogged;
 
   /**
-   * RWLock for store operations.
-   * Locked in shared mode when the list of component stores is looked at:
-   *   - all reads/writes to table data
-   *   - checking for split
-   * Locked in exclusive mode when the list of component stores is modified:
-   *   - closing
-   *   - completing a compaction
-   */
-  final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-  /**
    * Lock specific to archiving compacted store files.  This avoids races around
    * the combination of retrieving the list of compacted files and moving them to
    * the archive directory.  Since this is usually a background process (other than
@@ -290,14 +273,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
     }
 
     this.storeEngine = createStoreEngine(this, this.conf, region.getCellComparator());
-    List<HStoreFile> hStoreFiles = loadStoreFiles(warmup);
-    // Move the storeSize calculation out of loadStoreFiles() method, because the secondary read
-    // replica's refreshStoreFiles() will also use loadStoreFiles() to refresh its store files and
-    // update the storeSize in the refreshStoreSizeAndTotalBytes() finally (just like compaction) , so
-    // no need calculate the storeSize twice.
-    this.storeSize.addAndGet(getStorefilesSize(hStoreFiles, sf -> true));
-    this.totalUncompressedBytes.addAndGet(getTotalUncompressedBytes(hStoreFiles));
-    this.storeEngine.getStoreFileManager().loadFiles(hStoreFiles);
+    storeEngine.initialize(warmup);
+    refreshStoreSizeAndTotalBytes();
 
     flushRetriesNumber = conf.getInt(
         "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
@@ -513,102 +490,18 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
     this.dataBlockEncoder = blockEncoder;
   }
 
-  /**
-   * Creates an unsorted list of StoreFile loaded in parallel
-   * from the given directory.
-   */
-  private List<HStoreFile> loadStoreFiles(boolean warmup) throws IOException {
-    Collection<StoreFileInfo> files = getRegionFileSystem().getStoreFiles(getColumnFamilyName());
-    return openStoreFiles(files, warmup);
-  }
-
-  private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files, boolean warmup)
-      throws IOException {
-    if (CollectionUtils.isEmpty(files)) {
-      return Collections.emptyList();
-    }
-    // initialize the thread pool for opening store files in parallel..
-    ThreadPoolExecutor storeFileOpenerThreadPool =
-      this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpener-"
-        + this.region.getRegionInfo().getEncodedName() + "-" + this.getColumnFamilyName());
-    CompletionService<HStoreFile> completionService =
-      new ExecutorCompletionService<>(storeFileOpenerThreadPool);
-
-    int totalValidStoreFile = 0;
-    for (StoreFileInfo storeFileInfo : files) {
-      // open each store file in parallel
-      completionService.submit(() -> this.createStoreFileAndReader(storeFileInfo));
-      totalValidStoreFile++;
-    }
-
-    Set<String> compactedStoreFiles = new HashSet<>();
-    ArrayList<HStoreFile> results = new ArrayList<>(files.size());
-    IOException ioe = null;
-    try {
-      for (int i = 0; i < totalValidStoreFile; i++) {
-        try {
-          HStoreFile storeFile = completionService.take().get();
-          if (storeFile != null) {
-            LOG.debug("loaded {}", storeFile);
-            results.add(storeFile);
-            compactedStoreFiles.addAll(storeFile.getCompactedStoreFiles());
-          }
-        } catch (InterruptedException e) {
-          if (ioe == null) {
-            ioe = new InterruptedIOException(e.getMessage());
-          }
-        } catch (ExecutionException e) {
-          if (ioe == null) {
-            ioe = new IOException(e.getCause());
-          }
-        }
-      }
-    } finally {
-      storeFileOpenerThreadPool.shutdownNow();
-    }
-    if (ioe != null) {
-      // close StoreFile readers
-      boolean evictOnClose =
-          getCacheConfig() != null? getCacheConfig().shouldEvictOnClose(): true;
-      for (HStoreFile file : results) {
-        try {
-          if (file != null) {
-            file.closeStoreFile(evictOnClose);
-          }
-        } catch (IOException e) {
-          LOG.warn("Could not close store file {}", file, e);
-        }
-      }
-      throw ioe;
-    }
-
-    // Should not archive the compacted store files when region warmup. See HBASE-22163.
-    if (!warmup) {
-      // Remove the compacted files from result
-      List<HStoreFile> filesToRemove = new ArrayList<>(compactedStoreFiles.size());
-      for (HStoreFile storeFile : results) {
-        if (compactedStoreFiles.contains(storeFile.getPath().getName())) {
-          LOG.warn("Clearing the compacted storefile {} from {}", storeFile, this);
-          storeFile.getReader().close(storeFile.getCacheConf() != null ?
-            storeFile.getCacheConf().shouldEvictOnClose() : true);
-          filesToRemove.add(storeFile);
-        }
-      }
-      results.removeAll(filesToRemove);
-      if (!filesToRemove.isEmpty() && this.isPrimaryReplicaStore()) {
-        LOG.debug("Moving the files {} to archive", filesToRemove);
-        getRegionFileSystem().removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(),
-            filesToRemove);
-      }
-    }
-
-    return results;
+  private void postRefreshStoreFiles() throws IOException {
+    // Advance the memstore read point to be at least the new store files seqIds so that
+    // readers might pick it up. This assumes that the store is not getting any writes (otherwise
+    // in-flight transactions might be made visible)
+    getMaxSequenceId().ifPresent(region.getMVCC()::advanceTo);
+    refreshStoreSizeAndTotalBytes();
   }
 
   @Override
   public void refreshStoreFiles() throws IOException {
-    Collection<StoreFileInfo> newFiles = getRegionFileSystem().getStoreFiles(getColumnFamilyName());
-    refreshStoreFilesInternal(newFiles);
+    storeEngine.refreshStoreFiles();
+    postRefreshStoreFiles();
   }
 
   /**
@@ -616,89 +509,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
    * region replicas to keep up to date with the primary region files.
    */
   public void refreshStoreFiles(Collection<String> newFiles) throws IOException {
-    List<StoreFileInfo> storeFiles = new ArrayList<>(newFiles.size());
-    for (String file : newFiles) {
-      storeFiles.add(getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), file));
-    }
-    refreshStoreFilesInternal(storeFiles);
-  }
-
-  /**
-   * Checks the underlying store files, and opens the files that  have not
-   * been opened, and removes the store file readers for store files no longer
-   * available. Mainly used by secondary region replicas to keep up to date with
-   * the primary region files.
-   */
-  private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException {
-    StoreFileManager sfm = storeEngine.getStoreFileManager();
-    Collection<HStoreFile> currentFiles = sfm.getStorefiles();
-    Collection<HStoreFile> compactedFiles = sfm.getCompactedfiles();
-    if (currentFiles == null) {
-      currentFiles = Collections.emptySet();
-    }
-    if (newFiles == null) {
-      newFiles = Collections.emptySet();
-    }
-    if (compactedFiles == null) {
-      compactedFiles = Collections.emptySet();
-    }
-
-    HashMap<StoreFileInfo, HStoreFile> currentFilesSet = new HashMap<>(currentFiles.size());
-    for (HStoreFile sf : currentFiles) {
-      currentFilesSet.put(sf.getFileInfo(), sf);
-    }
-    HashMap<StoreFileInfo, HStoreFile> compactedFilesSet = new HashMap<>(compactedFiles.size());
-    for (HStoreFile sf : compactedFiles) {
-      compactedFilesSet.put(sf.getFileInfo(), sf);
-    }
-
-    Set<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles);
-    // Exclude the files that have already been compacted
-    newFilesSet = Sets.difference(newFilesSet, compactedFilesSet.keySet());
-    Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet());
-    Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet);
-
-    if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) {
-      return;
-    }
-
-    LOG.info("Refreshing store files for " + this + " files to add: "
-      + toBeAddedFiles + " files to remove: " + toBeRemovedFiles);
-
-    Set<HStoreFile> toBeRemovedStoreFiles = new HashSet<>(toBeRemovedFiles.size());
-    for (StoreFileInfo sfi : toBeRemovedFiles) {
-      toBeRemovedStoreFiles.add(currentFilesSet.get(sfi));
-    }
-
-    // try to open the files
-    List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false);
-
-    // propogate the file changes to the underlying store file manager
-    replaceStoreFiles(toBeRemovedStoreFiles, openedFiles); //won't throw an exception
-
-    // Advance the memstore read point to be at least the new store files seqIds so that
-    // readers might pick it up. This assumes that the store is not getting any writes (otherwise
-    // in-flight transactions might be made visible)
-    if (!toBeAddedFiles.isEmpty()) {
-      // we must have the max sequence id here as we do have several store files
-      region.getMVCC().advanceTo(this.getMaxSequenceId().getAsLong());
-    }
-
-    refreshStoreSizeAndTotalBytes();
-  }
-
-  protected HStoreFile createStoreFileAndReader(final Path p) throws IOException {
-    StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(),
-        p, isPrimaryReplicaStore());
-    return createStoreFileAndReader(info);
-  }
-
-  private HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException {
-    info.setRegionCoprocessorHost(this.region.getCoprocessorHost());
-    HStoreFile storeFile = new HStoreFile(info, getColumnFamilyDescriptor().getBloomFilterType(),
-            getCacheConfig());
-    storeFile.initReader();
-    return storeFile;
+    storeEngine.refreshStoreFiles(newFiles);
+    postRefreshStoreFiles();
   }
 
   /**
@@ -721,7 +533,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
    * Adds a value to the memstore
    */
   public void add(final Cell cell, MemStoreSizing memstoreSizing) {
-    lock.readLock().lock();
+    storeEngine.readLock();
     try {
       if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) {
         LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!",
@@ -729,7 +541,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
       }
       this.memstore.add(cell, memstoreSizing);
     } finally {
-      lock.readLock().unlock();
+      storeEngine.readUnlock();
       currentParallelPutCount.decrementAndGet();
     }
   }
@@ -738,7 +550,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
    * Adds the specified value to the memstore
    */
   public void add(final Iterable<Cell> cells, MemStoreSizing memstoreSizing) {
-    lock.readLock().lock();
+    storeEngine.readLock();
     try {
       if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) {
         LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!",
@@ -746,7 +558,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
       }
       memstore.add(cells, memstoreSizing);
     } finally {
-      lock.readLock().unlock();
+      storeEngine.readUnlock();
       currentParallelPutCount.decrementAndGet();
     }
   }
@@ -869,17 +681,16 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
     LOG.info("Loaded HFile " + srcPath + " into " + this + " as "
         + dstPath + " - updating store file list.");
 
-    HStoreFile sf = createStoreFileAndReader(dstPath);
+    HStoreFile sf = storeEngine.createStoreFileAndReader(dstPath);
     bulkLoadHFile(sf);
 
-    LOG.info("Successfully loaded {} into {} (new location: {})",
-        srcPath, this, dstPath);
+    LOG.info("Successfully loaded {} into {} (new location: {})", srcPath, this, dstPath);
 
     return dstPath;
   }
 
   public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException {
-    HStoreFile sf = createStoreFileAndReader(fileInfo);
+    HStoreFile sf = storeEngine.createStoreFileAndReader(fileInfo);
     bulkLoadHFile(sf);
   }
 
@@ -887,28 +698,74 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
     StoreFileReader r = sf.getReader();
     this.storeSize.addAndGet(r.length());
     this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
-
-    // Append the new storefile into the list
-    this.lock.writeLock().lock();
-    try {
-      this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf));
-    } finally {
-      // We need the lock, as long as we are updating the storeFiles
-      // or changing the memstore. Let us release it before calling
-      // notifyChangeReadersObservers. See HBASE-4485 for a possible
-      // deadlock scenario that could have happened if continue to hold
-      // the lock.
-      this.lock.writeLock().unlock();
-    }
+    storeEngine.addStoreFiles(Lists.newArrayList(sf));
     LOG.info("Loaded HFile " + sf.getFileInfo() + " into " + this);
     if (LOG.isTraceEnabled()) {
-      String traceMessage = "BULK LOAD time,size,store size,store files ["
-          + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize
-          + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
+      String traceMessage = "BULK LOAD time,size,store size,store files [" +
+        EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize + "," +
+        storeEngine.getStoreFileManager().getStorefileCount() + "]";
       LOG.trace(traceMessage);
     }
   }
 
+  private ImmutableCollection<HStoreFile> closeWithoutLock() throws IOException {
+    // Clear so metrics doesn't find them.
+    ImmutableCollection<HStoreFile> result = storeEngine.getStoreFileManager().clearFiles();
+    Collection<HStoreFile> compactedfiles = storeEngine.getStoreFileManager().clearCompactedFiles();
+    // clear the compacted files
+    if (CollectionUtils.isNotEmpty(compactedfiles)) {
+      removeCompactedfiles(compactedfiles,
+        getCacheConfig() != null ? getCacheConfig().shouldEvictOnClose() : true);
+    }
+    if (!result.isEmpty()) {
+      // initialize the thread pool for closing store files in parallel.
+      ThreadPoolExecutor storeFileCloserThreadPool =
+        this.region.getStoreFileOpenAndCloseThreadPool("StoreFileCloser-" +
+          this.region.getRegionInfo().getEncodedName() + "-" + this.getColumnFamilyName());
+
+      // close each store file in parallel
+      CompletionService<Void> completionService =
+        new ExecutorCompletionService<>(storeFileCloserThreadPool);
+      for (HStoreFile f : result) {
+        completionService.submit(new Callable<Void>() {
+          @Override
+          public Void call() throws IOException {
+            boolean evictOnClose =
+              getCacheConfig() != null ? getCacheConfig().shouldEvictOnClose() : true;
+            f.closeStoreFile(evictOnClose);
+            return null;
+          }
+        });
+      }
+
+      IOException ioe = null;
+      try {
+        for (int i = 0; i < result.size(); i++) {
+          try {
+            Future<Void> future = completionService.take();
+            future.get();
+          } catch (InterruptedException e) {
+            if (ioe == null) {
+              ioe = new InterruptedIOException();
+              ioe.initCause(e);
+            }
+          } catch (ExecutionException e) {
+            if (ioe == null) {
+              ioe = new IOException(e.getCause());
+            }
+          }
+        }
+      } finally {
+        storeFileCloserThreadPool.shutdownNow();
+      }
+      if (ioe != null) {
+        throw ioe;
+      }
+    }
+    LOG.trace("Closed {}", this);
+    return result;
+  }
+
   /**
    * Close all the readers We don't need to worry about subsequent requests because the Region holds
    * a write lock that will prevent any more reads or writes.
@@ -916,67 +773,18 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
    * @throws IOException on failure
    */
   public ImmutableCollection<HStoreFile> close() throws IOException {
+    // findbugs can not recognize storeEngine.writeLock is just a lock operation so it will report
+    // UL_UNRELEASED_LOCK_EXCEPTION_PATH, so here we have to use two try finally...
+    // Change later if findbugs becomes smarter in the future.
     this.archiveLock.lock();
-    this.lock.writeLock().lock();
     try {
-      // Clear so metrics doesn't find them.
-      ImmutableCollection<HStoreFile> result = storeEngine.getStoreFileManager().clearFiles();
-      Collection<HStoreFile> compactedfiles =
-          storeEngine.getStoreFileManager().clearCompactedFiles();
-      // clear the compacted files
-      if (CollectionUtils.isNotEmpty(compactedfiles)) {
-        removeCompactedfiles(compactedfiles, getCacheConfig() != null ?
-            getCacheConfig().shouldEvictOnClose() : true);
-      }
-      if (!result.isEmpty()) {
-        // initialize the thread pool for closing store files in parallel.
-        ThreadPoolExecutor storeFileCloserThreadPool = this.region
-            .getStoreFileOpenAndCloseThreadPool("StoreFileCloser-"
-              + this.region.getRegionInfo().getEncodedName() + "-" + this.getColumnFamilyName());
-
-        // close each store file in parallel
-        CompletionService<Void> completionService =
-          new ExecutorCompletionService<>(storeFileCloserThreadPool);
-        for (HStoreFile f : result) {
-          completionService.submit(new Callable<Void>() {
-            @Override
-            public Void call() throws IOException {
-              boolean evictOnClose =
-                  getCacheConfig() != null? getCacheConfig().shouldEvictOnClose(): true;
-              f.closeStoreFile(evictOnClose);
-              return null;
-            }
-          });
-        }
-
-        IOException ioe = null;
-        try {
-          for (int i = 0; i < result.size(); i++) {
-            try {
-              Future<Void> future = completionService.take();
-              future.get();
-            } catch (InterruptedException e) {
-              if (ioe == null) {
-                ioe = new InterruptedIOException();
-                ioe.initCause(e);
-              }
-            } catch (ExecutionException e) {
-              if (ioe == null) {
-                ioe = new IOException(e.getCause());
-              }
-            }
-          }
-        } finally {
-          storeFileCloserThreadPool.shutdownNow();
-        }
-        if (ioe != null) {
-          throw ioe;
-        }
+      this.storeEngine.writeLock();
+      try {
+        return closeWithoutLock();
+      } finally {
+        this.storeEngine.writeUnlock();
       }
-      LOG.trace("Closed {}", this);
-      return result;
     } finally {
-      this.lock.writeLock().unlock();
       this.archiveLock.unlock();
     }
   }
@@ -1006,7 +814,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
         try {
           for (Path pathName : pathNames) {
             lastPathName = pathName;
-            validateStoreFile(pathName);
+            storeEngine.validateStoreFile(pathName);
           }
           return pathNames;
         } catch (Exception e) {
@@ -1052,167 +860,18 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
     }
 
     Path dstPath = getRegionFileSystem().commitStoreFile(getColumnFamilyName(), path);
-    HStoreFile sf = createStoreFileAndReader(dstPath);
+    HStoreFile sf = storeEngine.createStoreFileAndReader(dstPath);
     StoreFileReader r = sf.getReader();
     this.storeSize.addAndGet(r.length());
     this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
 
-    this.lock.writeLock().lock();
-    try {
-      this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf));
-    } finally {
-      this.lock.writeLock().unlock();
-    }
+    storeEngine.addStoreFiles(Lists.newArrayList(sf));
 
     LOG.info("Loaded recovered hfile to {}, entries={}, sequenceid={}, filesize={}", sf,
       r.getEntries(), r.getSequenceID(), TraditionalBinaryPrefix.long2String(r.length(), "B", 1));
     return sf;
   }
 
-  /**
-   * Commit the given {@code files}.
-   * <p/>
-   * We will move the file into data directory, and open it.
-   * @param files the files want to commit
-   * @param validate whether to validate the store files
-   * @return the committed store files
-   */
-  private List<HStoreFile> commitStoreFiles(List<Path> files, boolean validate) throws IOException {
-    List<HStoreFile> committedFiles = new ArrayList<>(files.size());
-    HRegionFileSystem hfs = getRegionFileSystem();
-    String familyName = getColumnFamilyName();
-    for (Path file : files) {
-      try {
-        if (validate) {
-          validateStoreFile(file);
-        }
-        Path committedPath = hfs.commitStoreFile(familyName, file);
-        HStoreFile sf = createStoreFileAndReader(committedPath);
-        committedFiles.add(sf);
-      } catch (IOException e) {
-        LOG.error("Failed to commit store file {}", file, e);
-        // Try to delete the files we have committed before.
-        // It is OK to fail when deleting as leaving the file there does not cause any data
-        // corruption problem. It just introduces some duplicated data which may impact read
-        // performance a little when reading before compaction.
-        for (HStoreFile sf : committedFiles) {
-          Path pathToDelete = sf.getPath();
-          try {
-            sf.deleteStoreFile();
-          } catch (IOException deleteEx) {
-            LOG.warn(HBaseMarkers.FATAL, "Failed to delete committed store file {}", pathToDelete,
-              deleteEx);
-          }
-        }
-        throw new IOException("Failed to commit the flush", e);
-      }
-    }
-    return committedFiles;
-  }
-
-  public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
-    boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
-    boolean shouldDropBehind) throws IOException {
-    return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint,
-      includesTag, shouldDropBehind, -1, HConstants.EMPTY_STRING);
-  }
-
-  /**
-   * @param compression Compression algorithm to use
-   * @param isCompaction whether we are creating a new file in a compaction
-   * @param includeMVCCReadpoint - whether to include MVCC or not
-   * @param includesTag - includesTag or not
-   * @return Writer for a new StoreFile in the tmp dir.
-   */
-  // TODO : allow the Writer factory to create Writers of ShipperListener type only in case of
-  // compaction
-  public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
-      boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
-      boolean shouldDropBehind, long totalCompactedFilesSize, String fileStoragePolicy)
-        throws IOException {
-    // creating new cache config for each new writer
-    final CacheConfig cacheConf = getCacheConfig();
-    final CacheConfig writerCacheConf = new CacheConfig(cacheConf);
-    if (isCompaction) {
-      // Don't cache data on write on compactions, unless specifically configured to do so
-      // Cache only when total file size remains lower than configured threshold
-      final boolean cacheCompactedBlocksOnWrite =
-        getCacheConfig().shouldCacheCompactedBlocksOnWrite();
-      // if data blocks are to be cached on write
-      // during compaction, we should forcefully
-      // cache index and bloom blocks as well
-      if (cacheCompactedBlocksOnWrite && totalCompactedFilesSize <= cacheConf
-        .getCacheCompactedBlocksOnWriteThreshold()) {
-        writerCacheConf.enableCacheOnWrite();
-        if (!cacheOnWriteLogged) {
-          LOG.info("For {} , cacheCompactedBlocksOnWrite is true, hence enabled " +
-              "cacheOnWrite for Data blocks, Index blocks and Bloom filter blocks", this);
-          cacheOnWriteLogged = true;
-        }
-      } else {
-        writerCacheConf.setCacheDataOnWrite(false);
-        if (totalCompactedFilesSize > cacheConf.getCacheCompactedBlocksOnWriteThreshold()) {
-          // checking condition once again for logging
-          LOG.debug(
-            "For {}, setting cacheCompactedBlocksOnWrite as false as total size of compacted "
-              + "files - {}, is greater than cacheCompactedBlocksOnWriteThreshold - {}",
-            this, totalCompactedFilesSize,
-            cacheConf.getCacheCompactedBlocksOnWriteThreshold());
-        }
-      }
-    } else {
-      final boolean shouldCacheDataOnWrite = cacheConf.shouldCacheDataOnWrite();
-      if (shouldCacheDataOnWrite) {
-        writerCacheConf.enableCacheOnWrite();
-        if (!cacheOnWriteLogged) {
-          LOG.info("For {} , cacheDataOnWrite is true, hence enabled cacheOnWrite for " +
-            "Index blocks and Bloom filter blocks", this);
-          cacheOnWriteLogged = true;
-        }
-      }
-    }
-    Encryption.Context encryptionContext = storeContext.getEncryptionContext();
-    HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag,
-      encryptionContext);
-    Path familyTempDir = new Path(getRegionFileSystem().getTempDir(), getColumnFamilyName());
-    StoreFileWriter.Builder builder =
-      new StoreFileWriter.Builder(conf, writerCacheConf, getFileSystem())
-        .withOutputDir(familyTempDir)
-        .withBloomType(storeContext.getBloomFilterType())
-        .withMaxKeyCount(maxKeyCount)
-        .withFavoredNodes(storeContext.getFavoredNodes())
-        .withFileContext(hFileContext)
-        .withShouldDropCacheBehind(shouldDropBehind)
-        .withCompactedFilesSupplier(storeContext.getCompactedFilesSupplier())
-        .withFileStoragePolicy(fileStoragePolicy);
-    return builder.build();
-  }
-
-  private HFileContext createFileContext(Compression.Algorithm compression,
-    boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context encryptionContext) {
-    if (compression == null) {
-      compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
-    }
-    ColumnFamilyDescriptor family = getColumnFamilyDescriptor();
-    HFileContext hFileContext = new HFileContextBuilder()
-      .withIncludesMvcc(includeMVCCReadpoint)
-      .withIncludesTags(includesTag)
-      .withCompression(compression)
-      .withCompressTags(family.isCompressTags())
-      .withChecksumType(StoreUtils.getChecksumType(conf))
-      .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf))
-      .withBlockSize(family.getBlocksize())
-      .withHBaseCheckSum(true)
-      .withDataBlockEncoding(family.getDataBlockEncoding())
-      .withEncryptionContext(encryptionContext)
-      .withCreateTime(EnvironmentEdgeManager.currentTime())
-      .withColumnFamily(getColumnFamilyDescriptor().getName())
-      .withTableName(getTableName().getName())
-      .withCellComparator(getComparator())
-      .build();
-    return hFileContext;
-  }
-
   private long getTotalSize(Collection<HStoreFile> sfs) {
     return sfs.stream().mapToLong(sf -> sf.getReader().length()).sum();
   }
@@ -1222,18 +881,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
    * @param sfs Store files
    * @return Whether compaction is required.
    */
-  private boolean updateStorefiles(List<HStoreFile> sfs, long snapshotId) throws IOException {
-    this.lock.writeLock().lock();
-    try {
-      this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
-    } finally {
-      // We need the lock, as long as we are updating the storeFiles
-      // or changing the memstore. Let us release it before calling
-      // notifyChangeReadersObservers. See HBASE-4485 for a possible
-      // deadlock scenario that could have happened if continue to hold
-      // the lock.
-      this.lock.writeLock().unlock();
-    }
+  private boolean completeFlush(List<HStoreFile> sfs, long snapshotId) throws IOException {
+    storeEngine.addStoreFiles(sfs);
     // We do not need to call clearSnapshot method inside the write lock.
     // The clearSnapshot itself is thread safe, which can be called at the same time with other
     // memstore operations expect snapshot and clearSnapshot. And for these two methods, in HRegion
@@ -1259,11 +908,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
   private void notifyChangedReadersObservers(List<HStoreFile> sfs) throws IOException {
     for (ChangedReadersObserver o : this.changedReaderObservers) {
       List<KeyValueScanner> memStoreScanners;
-      this.lock.readLock().lock();
+      this.storeEngine.readLock();
       try {
         memStoreScanners = this.memstore.getScanners(o.getReadPoint());
       } finally {
-        this.lock.readLock().unlock();
+        this.storeEngine.readUnlock();
       }
       o.updateReaders(sfs, memStoreScanners);
     }
@@ -1305,13 +954,13 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
       byte[] stopRow, boolean includeStopRow, long readPt) throws IOException {
     Collection<HStoreFile> storeFilesToScan;
     List<KeyValueScanner> memStoreScanners;
-    this.lock.readLock().lock();
+    this.storeEngine.readLock();
     try {
       storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow,
         includeStartRow, stopRow, includeStopRow);
       memStoreScanners = this.memstore.getScanners(readPt);
     } finally {
-      this.lock.readLock().unlock();
+      this.storeEngine.readUnlock();
     }
 
     try {
@@ -1388,11 +1037,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
       boolean includeMemstoreScanner) throws IOException {
     List<KeyValueScanner> memStoreScanners = null;
     if (includeMemstoreScanner) {
-      this.lock.readLock().lock();
+      this.storeEngine.readLock();
       try {
         memStoreScanners = this.memstore.getScanners(readPt);
       } finally {
-        this.lock.readLock().unlock();
+        this.storeEngine.readUnlock();
       }
     }
     try {
@@ -1508,14 +1157,13 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
       List<Path> newFiles) throws IOException {
     // Do the steps necessary to complete the compaction.
     setStoragePolicyFromFileName(newFiles);
-    List<HStoreFile> sfs = commitStoreFiles(newFiles, true);
+    List<HStoreFile> sfs = storeEngine.commitStoreFiles(newFiles, true);
     if (this.getCoprocessorHost() != null) {
       for (HStoreFile sf : sfs) {
         getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user);
       }
     }
-    writeCompactionWalRecord(filesToCompact, sfs);
-    replaceStoreFiles(filesToCompact, sfs);
+    replaceStoreFiles(filesToCompact, sfs, true);
     if (cr.isMajor()) {
       majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs());
       majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);
@@ -1579,25 +1227,24 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
         this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC());
   }
 
-  void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result)
-      throws IOException {
-    this.lock.writeLock().lock();
-    try {
-      this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, result);
-      synchronized (filesCompacting) {
-        filesCompacting.removeAll(compactedFiles);
-      }
-
-      // These may be null when the RS is shutting down. The space quota Chores will fix the Region
-      // sizes later so it's not super-critical if we miss these.
-      RegionServerServices rsServices = region.getRegionServerServices();
-      if (rsServices != null && rsServices.getRegionServerSpaceQuotaManager() != null) {
-        updateSpaceQuotaAfterFileReplacement(
-            rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), getRegionInfo(),
-            compactedFiles, result);
-      }
-    } finally {
-      this.lock.writeLock().unlock();
+  @RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
+    allowedOnPath = ".*/(HStore|TestHStore).java")
+  void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result,
+    boolean writeCompactionMarker) throws IOException {
+    storeEngine.replaceStoreFiles(compactedFiles, result);
+    if (writeCompactionMarker) {
+      writeCompactionWalRecord(compactedFiles, result);
+    }
+    synchronized (filesCompacting) {
+      filesCompacting.removeAll(compactedFiles);
+    }
+    // These may be null when the RS is shutting down. The space quota Chores will fix the Region
+    // sizes later so it's not super-critical if we miss these.
+    RegionServerServices rsServices = region.getRegionServerServices();
+    if (rsServices != null && rsServices.getRegionServerSpaceQuotaManager() != null) {
+      updateSpaceQuotaAfterFileReplacement(
+        rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), getRegionInfo(),
+        compactedFiles, result);
     }
   }
 
@@ -1720,7 +1367,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
       for (String compactionOutput : compactionOutputs) {
         StoreFileInfo storeFileInfo =
             getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), compactionOutput);
-        HStoreFile storeFile = createStoreFileAndReader(storeFileInfo);
+        HStoreFile storeFile = storeEngine.createStoreFileAndReader(storeFileInfo);
         outputStoreFiles.add(storeFile);
       }
     }
@@ -1728,7 +1375,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
     if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) {
       LOG.info("Replaying compaction marker, replacing input files: " +
           inputStoreFiles + " with output files : " + outputStoreFiles);
-      this.replaceStoreFiles(inputStoreFiles, outputStoreFiles);
+      this.replaceStoreFiles(inputStoreFiles, outputStoreFiles, false);
       this.refreshStoreSizeAndTotalBytes();
     }
   }
@@ -1737,14 +1384,14 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
   public boolean hasReferences() {
     // Grab the read lock here, because we need to ensure that: only when the atomic
     // replaceStoreFiles(..) finished, we can get all the complete store file list.
-    this.lock.readLock().lock();
+    this.storeEngine.readLock();
     try {
       // Merge the current store files with compacted files here due to HBASE-20940.
       Collection<HStoreFile> allStoreFiles = new ArrayList<>(getStorefiles());
       allStoreFiles.addAll(getCompactedFiles());
       return StoreUtils.hasReferences(allStoreFiles);
     } finally {
-      this.lock.readLock().unlock();
+      this.storeEngine.readUnlock();
     }
   }
 
@@ -1784,7 +1431,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
 
     final CompactionContext compaction = storeEngine.createCompaction();
     CompactionRequestImpl request = null;
-    this.lock.readLock().lock();
+    this.storeEngine.readLock();
     try {
       synchronized (filesCompacting) {
         // First, see if coprocessor would want to override selection.
@@ -1857,7 +1504,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
         request.setTracker(tracker);
       }
     } finally {
-      this.lock.readLock().unlock();
+      this.storeEngine.readUnlock();
     }
 
     if (LOG.isDebugEnabled()) {
@@ -1890,7 +1537,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
           this, getColumnFamilyDescriptor().getMinVersions());
       return;
     }
-    this.lock.readLock().lock();
+    this.storeEngine.readLock();
     Collection<HStoreFile> delSfs = null;
     try {
       synchronized (filesCompacting) {
@@ -1902,7 +1549,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
         }
       }
     } finally {
-      this.lock.readLock().unlock();
+      this.storeEngine.readUnlock();
     }
 
     if (CollectionUtils.isEmpty(delSfs)) {
@@ -1910,8 +1557,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
     }
 
     Collection<HStoreFile> newFiles = Collections.emptyList(); // No new files.
-    writeCompactionWalRecord(delSfs, newFiles);
-    replaceStoreFiles(delSfs, newFiles);
+    replaceStoreFiles(delSfs, newFiles, true);
     refreshStoreSizeAndTotalBytes();
     LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in "
         + this + "; total size is "
@@ -1934,25 +1580,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
   }
 
   /**
-   * Validates a store file by opening and closing it. In HFileV2 this should not be an expensive
-   * operation.
-   * @param path the path to the store file
-   */
-  private void validateStoreFile(Path path) throws IOException {
-    HStoreFile storeFile = null;
-    try {
-      storeFile = createStoreFileAndReader(path);
-    } catch (IOException e) {
-      LOG.error("Failed to open store file : {}, keeping it in tmp location", path, e);
-      throw e;
-    } finally {
-      if (storeFile != null) {
-        storeFile.closeStoreFile(false);
-      }
-    }
-  }
-
-  /**
    * Update counts.
    */
   protected void refreshStoreSizeAndTotalBytes()
@@ -1997,7 +1624,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
    * Determines if Store should be split.
    */
   public Optional<byte[]> getSplitPoint() {
-    this.lock.readLock().lock();
+    this.storeEngine.readLock();
     try {
       // Should already be enforced by the split policy!
       assert !this.getRegionInfo().isMetaRegion();
@@ -2010,7 +1637,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
     } catch(IOException e) {
       LOG.warn("Failed getting store size for {}", this, e);
     } finally {
-      this.lock.readLock().unlock();
+      this.storeEngine.readUnlock();
     }
     return Optional.empty();
   }
@@ -2043,7 +1670,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
    */
   public KeyValueScanner getScanner(Scan scan, final NavigableSet<byte[]> targetCols, long readPt)
       throws IOException {
-    lock.readLock().lock();
+    storeEngine.readLock();
     try {
       ScanInfo scanInfo;
       if (this.getCoprocessorHost() != null) {
@@ -2053,7 +1680,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
       }
       return createScanner(scan, scanInfo, targetCols, readPt);
     } finally {
-      lock.readLock().unlock();
+      storeEngine.readUnlock();
     }
   }
 
@@ -2083,7 +1710,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
       boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
       byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
       boolean includeMemstoreScanner) throws IOException {
-    this.lock.readLock().lock();
+    this.storeEngine.readLock();
     try {
       Map<String, HStoreFile> name2File =
           new HashMap<>(getStorefilesCount() + getCompactedFilesCount());
@@ -2108,7 +1735,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
       return getScanners(filesToReopen, cacheBlocks, false, false, matcher, startRow,
         includeStartRow, stopRow, includeStopRow, readPt, false);
     } finally {
-      this.lock.readLock().unlock();
+      this.storeEngine.readUnlock();
     }
   }
 
@@ -2174,41 +1801,20 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
   @Override
   public long getStorefilesSize() {
     // Include all StoreFiles
-    return getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(), sf -> true);
+    return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(),
+      sf -> true);
   }
 
   @Override
   public long getHFilesSize() {
     // Include only StoreFiles which are HFiles
-    return getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(),
+    return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(),
       HStoreFile::isHFile);
   }
 
-  private long getTotalUncompressedBytes(List<HStoreFile> files) {
-    return files.stream()
-      .mapToLong(file -> getStorefileFieldSize(file, StoreFileReader::getTotalUncompressedBytes))
-      .sum();
-  }
-
-  private long getStorefilesSize(Collection<HStoreFile> files, Predicate<HStoreFile> predicate) {
-    return files.stream().filter(predicate)
-      .mapToLong(file -> getStorefileFieldSize(file, StoreFileReader::length)).sum();
-  }
-
-  private long getStorefileFieldSize(HStoreFile file, ToLongFunction<StoreFileReader> f) {
-    if (file == null) {
-      return 0L;
-    }
-    StoreFileReader reader = file.getReader();
-    if (reader == null) {
-      return 0L;
-    }
-    return f.applyAsLong(reader);
-  }
-
   private long getStorefilesFieldSize(ToLongFunction<StoreFileReader> f) {
     return this.storeEngine.getStoreFileManager().getStorefiles().stream()
-      .mapToLong(file -> getStorefileFieldSize(file, f)).sum();
+      .mapToLong(file -> StoreUtils.getStorefileFieldSize(file, f)).sum();
   }
 
   @Override
@@ -2279,11 +1885,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
    */
   public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing)
       throws IOException {
-    this.lock.readLock().lock();
+    this.storeEngine.readLock();
     try {
       this.memstore.upsert(cells, readpoint, memstoreSizing);
     } finally {
-      this.lock.readLock().unlock();
+      this.storeEngine.readUnlock();
     }
   }
 
@@ -2336,7 +1942,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
         return false;
       }
       status.setStatus("Flushing " + this + ": reopening flushed file");
-      List<HStoreFile> storeFiles = commitStoreFiles(tempFiles, false);
+      List<HStoreFile> storeFiles = storeEngine.commitStoreFiles(tempFiles, false);
       for (HStoreFile sf : storeFiles) {
         StoreFileReader r = sf.getReader();
         if (LOG.isInfoEnabled()) {
@@ -2359,7 +1965,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
         }
       }
       // Add new file to store files. Clear snapshot too while we have the Store write lock.
-      return updateStorefiles(storeFiles, snapshot.getId());
+      return completeFlush(storeFiles, snapshot.getId());
     }
 
     @Override
@@ -2387,7 +1993,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
         // open the file as a store file (hfile link, etc)
         StoreFileInfo storeFileInfo =
           getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), file);
-        HStoreFile storeFile = createStoreFileAndReader(storeFileInfo);
+        HStoreFile storeFile = storeEngine.createStoreFileAndReader(storeFileInfo);
         storeFiles.add(storeFile);
         HStore.this.storeSize.addAndGet(storeFile.getReader().length());
         HStore.this.totalUncompressedBytes
@@ -2404,7 +2010,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
         snapshotId = snapshot.getId();
         snapshot.close();
       }
-      HStore.this.updateStorefiles(storeFiles, snapshotId);
+      HStore.this.completeFlush(storeFiles, snapshotId);
     }
 
     /**
@@ -2417,7 +2023,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
         //won't be closed. If we are using MSLAB, the chunk referenced by those scanners
         //can't be released, thus memory leak
         snapshot.close();
-        HStore.this.updateStorefiles(Collections.emptyList(), snapshot.getId());
+        HStore.this.completeFlush(Collections.emptyList(), snapshot.getId());
       }
     }
   }
@@ -2583,7 +2189,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
     // ensure other threads do not attempt to archive the same files on close()
     archiveLock.lock();
     try {
-      lock.readLock().lock();
+      storeEngine.readLock();
       Collection<HStoreFile> copyCompactedfiles = null;
       try {
         Collection<HStoreFile> compactedfiles =
@@ -2595,7 +2201,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
           LOG.trace("No compacted files to archive");
         }
       } finally {
-        lock.readLock().unlock();
+        storeEngine.readUnlock();
       }
       if (CollectionUtils.isNotEmpty(copyCompactedfiles)) {
         removeCompactedfiles(copyCompactedfiles, true);
@@ -2730,12 +2336,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
 
   private void clearCompactedfiles(List<HStoreFile> filesToRemove) throws IOException {
     LOG.trace("Clearing the compacted file {} from this store", filesToRemove);
-    try {
-      lock.writeLock().lock();
-      this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToRemove);
-    } finally {
-      lock.writeLock().unlock();
-    }
+    storeEngine.removeCompactedFiles(filesToRemove);
   }
 
   void reportArchivedFilesForQuota(List<? extends StoreFile> archivedFiles, List<Long> fileSizes) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreContext.java
index 2623350..2a9f968 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreContext.java
@@ -23,6 +23,7 @@ import java.util.function.Supplier;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.crypto.Encryption;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -108,6 +109,14 @@ public final class StoreContext implements HeapSize {
     return coprocessorHost;
   }
 
+  public RegionInfo getRegionInfo() {
+    return regionFileSystem.getRegionInfo();
+  }
+
+  public boolean isPrimaryReplicaStore() {
+    return getRegionInfo().getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID;
+  }
+
   public static Builder getBuilder() {
     return new Builder();
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
index 60b3c3d..f9d6c29 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
@@ -20,37 +20,129 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
-
+import java.util.Set;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.log.HBaseMarkers;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
 import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
+import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
+import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
 
 /**
- * StoreEngine is a factory that can create the objects necessary for HStore to operate.
- * Since not all compaction policies, compactors and store file managers are compatible,
- * they are tied together and replaced together via StoreEngine-s.
+ * StoreEngine is a factory that can create the objects necessary for HStore to operate. Since not
+ * all compaction policies, compactors and store file managers are compatible, they are tied
+ * together and replaced together via StoreEngine-s.
+ * <p/>
+ * We expose read write lock methods to upper layer for store operations:<br/>
+ * <ul>
+ * <li>Locked in shared mode when the list of component stores is looked at:
+ * <ul>
+ * <li>all reads/writes to table data</li>
+ * <li>checking for split</li>
+ * </ul>
+ * </li>
+ * <li>Locked in exclusive mode when the list of component stores is modified:
+ * <ul>
+ * <li>closing</li>
+ * <li>completing a compaction</li>
+ * </ul>
+ * </li>
+ * </ul>
+ * <p/>
+ * It is a bit confusing that we have a StoreFileManager(SFM) and then a StoreFileTracker(SFT). As
+ * its name says, SFT is used to track the store files list. The reason why we have a SFT beside SFM
+ * is that, when introducing stripe compaction, we introduced the StoreEngine and also the SFM, but
+ * actually, the SFM here is not a general 'Manager', it is only designed to manage the in memory
+ * 'stripes', so we can select different store files when scanning or compacting. The 'tracking' of
+ * store files is actually done in {@link org.apache.hadoop.hbase.regionserver.HRegionFileSystem}
+ * and {@link HStore} before we have SFT. And since SFM is designed to only holds in memory states,
+ * we will hold write lock when updating it, the lock is also used to protect the normal read/write
+ * requests. This means we'd better not add IO operations to SFM. And also, no matter what the in
+ * memory state is, stripe or not, it does not effect how we track the store files. So consider all
+ * these facts, here we introduce a separated SFT to track the store files.
+ * <p/>
+ * Here, since we always need to update SFM and SFT almost at the same time, we introduce methods in
+ * StoreEngine directly to update them both, so upper layer just need to update StoreEngine once, to
+ * reduce the possible misuse.
  */
 @InterfaceAudience.Private
-public abstract class StoreEngine<SF extends StoreFlusher,
-    CP extends CompactionPolicy, C extends Compactor, SFM extends StoreFileManager> {
+public abstract class StoreEngine<SF extends StoreFlusher, CP extends CompactionPolicy,
+  C extends Compactor, SFM extends StoreFileManager> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(StoreEngine.class);
+
   protected SF storeFlusher;
   protected CP compactionPolicy;
   protected C compactor;
   protected SFM storeFileManager;
+  private Configuration conf;
+  private StoreContext ctx;
+  private RegionCoprocessorHost coprocessorHost;
+  private Function<String, ExecutorService> openStoreFileThreadPoolCreator;
+  private StoreFileTracker storeFileTracker;
+
+  private final ReadWriteLock storeLock = new ReentrantReadWriteLock();
 
   /**
-   * The name of the configuration parameter that specifies the class of
-   * a store engine that is used to manage and compact HBase store files.
+   * The name of the configuration parameter that specifies the class of a store engine that is used
+   * to manage and compact HBase store files.
    */
   public static final String STORE_ENGINE_CLASS_KEY = "hbase.hstore.engine.class";
 
-  private static final Class<? extends StoreEngine<?, ?, ?, ?>>
-    DEFAULT_STORE_ENGINE_CLASS = DefaultStoreEngine.class;
+  private static final Class<? extends StoreEngine<?, ?, ?, ?>> DEFAULT_STORE_ENGINE_CLASS =
+    DefaultStoreEngine.class;
+
+  /**
+   * Acquire read lock of this store.
+   */
+  public void readLock() {
+    storeLock.readLock().lock();
+  }
+
+  /**
+   * Release read lock of this store.
+   */
+  public void readUnlock() {
+    storeLock.readLock().unlock();
+  }
+
+  /**
+   * Acquire write lock of this store.
+   */
+  public void writeLock() {
+    storeLock.writeLock().lock();
+  }
+
+  /**
+   * Release write lock of this store.
+   */
+  public void writeUnlock() {
+    storeLock.writeLock().unlock();
+  }
 
   /**
    * @return Compaction policy to use.
@@ -80,6 +172,11 @@ public abstract class StoreEngine<SF extends StoreFlusher,
     return this.storeFlusher;
   }
 
+  private StoreFileTracker createStoreFileTracker(HStore store) {
+    return StoreFileTrackerFactory.create(store.conf, store.getRegionInfo().getTable(),
+      store.isPrimaryReplicaStore(), store.getStoreContext());
+  }
+
   /**
    * @param filesCompacting Files currently compacting
    * @return whether a compaction selection is possible
@@ -87,8 +184,8 @@ public abstract class StoreEngine<SF extends StoreFlusher,
   public abstract boolean needsCompaction(List<HStoreFile> filesCompacting);
 
   /**
-   * Creates an instance of a compaction context specific to this engine.
-   * Doesn't actually select or start a compaction. See CompactionContext class comment.
+   * Creates an instance of a compaction context specific to this engine. Doesn't actually select or
+   * start a compaction. See CompactionContext class comment.
    * @return New CompactionContext object.
    */
   public abstract CompactionContext createCompaction() throws IOException;
@@ -96,32 +193,321 @@ public abstract class StoreEngine<SF extends StoreFlusher,
   /**
    * Create the StoreEngine's components.
    */
-  protected abstract void createComponents(
-      Configuration conf, HStore store, CellComparator cellComparator) throws IOException;
+  protected abstract void createComponents(Configuration conf, HStore store,
+    CellComparator cellComparator) throws IOException;
 
-  private void createComponentsOnce(
-      Configuration conf, HStore store, CellComparator cellComparator) throws IOException {
-    assert compactor == null && compactionPolicy == null
-        && storeFileManager == null && storeFlusher == null;
+  protected final void createComponentsOnce(Configuration conf, HStore store,
+    CellComparator cellComparator) throws IOException {
+    assert compactor == null && compactionPolicy == null && storeFileManager == null &&
+      storeFlusher == null && storeFileTracker == null;
     createComponents(conf, store, cellComparator);
-    assert compactor != null && compactionPolicy != null
-        && storeFileManager != null && storeFlusher != null;
+    this.conf = conf;
+    this.ctx = store.getStoreContext();
+    this.coprocessorHost = store.getHRegion().getCoprocessorHost();
+    this.openStoreFileThreadPoolCreator = store.getHRegion()::getStoreFileOpenAndCloseThreadPool;
+    this.storeFileTracker = createStoreFileTracker(store);
+    assert compactor != null && compactionPolicy != null && storeFileManager != null &&
+      storeFlusher != null && storeFileTracker != null;
+  }
+
+  /**
+   * Create a writer for writing new store files.
+   * @return Writer for a new StoreFile
+   */
+  public StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException {
+    return storeFileTracker.createWriter(params);
+  }
+
+  public HStoreFile createStoreFileAndReader(Path p) throws IOException {
+    StoreFileInfo info = new StoreFileInfo(conf, ctx.getRegionFileSystem().getFileSystem(), p,
+      ctx.isPrimaryReplicaStore());
+    return createStoreFileAndReader(info);
+  }
+
+  public HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException {
+    info.setRegionCoprocessorHost(coprocessorHost);
+    HStoreFile storeFile =
+      new HStoreFile(info, ctx.getFamily().getBloomFilterType(), ctx.getCacheConf());
+    storeFile.initReader();
+    return storeFile;
+  }
+
+  /**
+   * Validates a store file by opening and closing it. In HFileV2 this should not be an expensive
+   * operation.
+   * @param path the path to the store file
+   */
+  public void validateStoreFile(Path path) throws IOException {
+    HStoreFile storeFile = null;
+    try {
+      storeFile = createStoreFileAndReader(path);
+    } catch (IOException e) {
+      LOG.error("Failed to open store file : {}, keeping it in tmp location", path, e);
+      throw e;
+    } finally {
+      if (storeFile != null) {
+        storeFile.closeStoreFile(false);
+      }
+    }
+  }
+
+  private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files, boolean warmup)
+    throws IOException {
+    if (CollectionUtils.isEmpty(files)) {
+      return Collections.emptyList();
+    }
+    // initialize the thread pool for opening store files in parallel..
+    ExecutorService storeFileOpenerThreadPool =
+      openStoreFileThreadPoolCreator.apply("StoreFileOpener-" +
+        ctx.getRegionInfo().getEncodedName() + "-" + ctx.getFamily().getNameAsString());
+    CompletionService<HStoreFile> completionService =
+      new ExecutorCompletionService<>(storeFileOpenerThreadPool);
+
+    int totalValidStoreFile = 0;
+    for (StoreFileInfo storeFileInfo : files) {
+      // open each store file in parallel
+      completionService.submit(() -> createStoreFileAndReader(storeFileInfo));
+      totalValidStoreFile++;
+    }
+
+    Set<String> compactedStoreFiles = new HashSet<>();
+    ArrayList<HStoreFile> results = new ArrayList<>(files.size());
+    IOException ioe = null;
+    try {
+      for (int i = 0; i < totalValidStoreFile; i++) {
+        try {
+          HStoreFile storeFile = completionService.take().get();
+          if (storeFile != null) {
+            LOG.debug("loaded {}", storeFile);
+            results.add(storeFile);
+            compactedStoreFiles.addAll(storeFile.getCompactedStoreFiles());
+          }
+        } catch (InterruptedException e) {
+          if (ioe == null) {
+            ioe = new InterruptedIOException(e.getMessage());
+          }
+        } catch (ExecutionException e) {
+          if (ioe == null) {
+            ioe = new IOException(e.getCause());
+          }
+        }
+      }
+    } finally {
+      storeFileOpenerThreadPool.shutdownNow();
+    }
+    if (ioe != null) {
+      // close StoreFile readers
+      boolean evictOnClose =
+        ctx.getCacheConf() != null ? ctx.getCacheConf().shouldEvictOnClose() : true;
+      for (HStoreFile file : results) {
+        try {
+          if (file != null) {
+            file.closeStoreFile(evictOnClose);
+          }
+        } catch (IOException e) {
+          LOG.warn("Could not close store file {}", file, e);
+        }
+      }
+      throw ioe;
+    }
+
+    // Should not archive the compacted store files when region warmup. See HBASE-22163.
+    if (!warmup) {
+      // Remove the compacted files from result
+      List<HStoreFile> filesToRemove = new ArrayList<>(compactedStoreFiles.size());
+      for (HStoreFile storeFile : results) {
+        if (compactedStoreFiles.contains(storeFile.getPath().getName())) {
+          LOG.warn("Clearing the compacted storefile {} from {}", storeFile, this);
+          storeFile.getReader().close(
+            storeFile.getCacheConf() != null ? storeFile.getCacheConf().shouldEvictOnClose() :
+              true);
+          filesToRemove.add(storeFile);
+        }
+      }
+      results.removeAll(filesToRemove);
+      if (!filesToRemove.isEmpty() && ctx.isPrimaryReplicaStore()) {
+        LOG.debug("Moving the files {} to archive", filesToRemove);
+        ctx.getRegionFileSystem().removeStoreFiles(ctx.getFamily().getNameAsString(),
+          filesToRemove);
+      }
+    }
+
+    return results;
+  }
+
+  public void initialize(boolean warmup) throws IOException {
+    List<StoreFileInfo> fileInfos = storeFileTracker.load();
+    List<HStoreFile> files = openStoreFiles(fileInfos, warmup);
+    storeFileManager.loadFiles(files);
+  }
+
+  public void refreshStoreFiles() throws IOException {
+    List<StoreFileInfo> fileInfos = storeFileTracker.load();
+    refreshStoreFilesInternal(fileInfos);
+  }
+
+  public void refreshStoreFiles(Collection<String> newFiles) throws IOException {
+    List<StoreFileInfo> storeFiles = new ArrayList<>(newFiles.size());
+    for (String file : newFiles) {
+      storeFiles
+        .add(ctx.getRegionFileSystem().getStoreFileInfo(ctx.getFamily().getNameAsString(), file));
+    }
+    refreshStoreFilesInternal(storeFiles);
+  }
+
+  /**
+   * Checks the underlying store files, and opens the files that have not been opened, and removes
+   * the store file readers for store files no longer available. Mainly used by secondary region
+   * replicas to keep up to date with the primary region files.
+   */
+  private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException {
+    Collection<HStoreFile> currentFiles = storeFileManager.getStorefiles();
+    Collection<HStoreFile> compactedFiles = storeFileManager.getCompactedfiles();
+    if (currentFiles == null) {
+      currentFiles = Collections.emptySet();
+    }
+    if (newFiles == null) {
+      newFiles = Collections.emptySet();
+    }
+    if (compactedFiles == null) {
+      compactedFiles = Collections.emptySet();
+    }
+
+    HashMap<StoreFileInfo, HStoreFile> currentFilesSet = new HashMap<>(currentFiles.size());
+    for (HStoreFile sf : currentFiles) {
+      currentFilesSet.put(sf.getFileInfo(), sf);
+    }
+    HashMap<StoreFileInfo, HStoreFile> compactedFilesSet = new HashMap<>(compactedFiles.size());
+    for (HStoreFile sf : compactedFiles) {
+      compactedFilesSet.put(sf.getFileInfo(), sf);
+    }
+
+    Set<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles);
+    // Exclude the files that have already been compacted
+    newFilesSet = Sets.difference(newFilesSet, compactedFilesSet.keySet());
+    Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet());
+    Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet);
+
+    if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) {
+      return;
+    }
+
+    LOG.info("Refreshing store files for " + this + " files to add: " + toBeAddedFiles +
+      " files to remove: " + toBeRemovedFiles);
+
+    Set<HStoreFile> toBeRemovedStoreFiles = new HashSet<>(toBeRemovedFiles.size());
+    for (StoreFileInfo sfi : toBeRemovedFiles) {
+      toBeRemovedStoreFiles.add(currentFilesSet.get(sfi));
+    }
+
+    // try to open the files
+    List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false);
+
+    // propogate the file changes to the underlying store file manager
+    replaceStoreFiles(toBeRemovedStoreFiles, openedFiles); // won't throw an exception
+  }
+
+  /**
+   * Commit the given {@code files}.
+   * <p/>
+   * We will move the file into data directory, and open it.
+   * @param files the files want to commit
+   * @param validate whether to validate the store files
+   * @return the committed store files
+   */
+  public List<HStoreFile> commitStoreFiles(List<Path> files, boolean validate) throws IOException {
+    List<HStoreFile> committedFiles = new ArrayList<>(files.size());
+    HRegionFileSystem hfs = ctx.getRegionFileSystem();
+    String familyName = ctx.getFamily().getNameAsString();
+    Path storeDir = hfs.getStoreDir(familyName);
+    for (Path file : files) {
+      try {
+        if (validate) {
+          validateStoreFile(file);
+        }
+        Path committedPath;
+        // As we want to support writing to data directory directly, here we need to check whether
+        // the store file is already in the right place
+        if (file.getParent() != null && file.getParent().equals(storeDir)) {
+          // already in the right place, skip renmaing
+          committedPath = file;
+        } else {
+          // Write-out finished successfully, move into the right spot
+          committedPath = hfs.commitStoreFile(familyName, file);
+        }
+        HStoreFile sf = createStoreFileAndReader(committedPath);
+        committedFiles.add(sf);
+      } catch (IOException e) {
+        LOG.error("Failed to commit store file {}", file, e);
+        // Try to delete the files we have committed before.
+        // It is OK to fail when deleting as leaving the file there does not cause any data
+        // corruption problem. It just introduces some duplicated data which may impact read
+        // performance a little when reading before compaction.
+        for (HStoreFile sf : committedFiles) {
+          Path pathToDelete = sf.getPath();
+          try {
+            sf.deleteStoreFile();
+          } catch (IOException deleteEx) {
+            LOG.warn(HBaseMarkers.FATAL, "Failed to delete committed store file {}", pathToDelete,
+              deleteEx);
+          }
+        }
+        throw new IOException("Failed to commit the flush", e);
+      }
+    }
+    return committedFiles;
+  }
+
+  public void addStoreFiles(Collection<HStoreFile> storeFiles) throws IOException {
+    storeFileTracker.add(StoreUtils.toStoreFileInfo(storeFiles));
+    writeLock();
+    try {
+      storeFileManager.insertNewFiles(storeFiles);
+    } finally {
+      // We need the lock, as long as we are updating the storeFiles
+      // or changing the memstore. Let us release it before calling
+      // notifyChangeReadersObservers. See HBASE-4485 for a possible
+      // deadlock scenario that could have happened if continue to hold
+      // the lock.
+      writeUnlock();
+    }
+  }
+
+  public void replaceStoreFiles(Collection<HStoreFile> compactedFiles,
+    Collection<HStoreFile> newFiles) throws IOException {
+    storeFileTracker.replace(StoreUtils.toStoreFileInfo(compactedFiles),
+      StoreUtils.toStoreFileInfo(newFiles));
+    writeLock();
+    try {
+      storeFileManager.addCompactionResults(compactedFiles, newFiles);
+    } finally {
+      writeUnlock();
+    }
+  }
+
+  public void removeCompactedFiles(Collection<HStoreFile> compactedFiles) {
+    writeLock();
+    try {
+      storeFileManager.removeCompactedFiles(compactedFiles);
+    } finally {
+      writeUnlock();
+    }
   }
 
   /**
    * Create the StoreEngine configured for the given Store.
-   * @param store The store. An unfortunate dependency needed due to it
-   *              being passed to coprocessors via the compactor.
+   * @param store The store. An unfortunate dependency needed due to it being passed to coprocessors
+   *          via the compactor.
    * @param conf Store configuration.
    * @param cellComparator CellComparator for storeFileManager.
    * @return StoreEngine to use.
    */
-  public static StoreEngine<?, ?, ?, ?> create(
-      HStore store, Configuration conf, CellComparator cellComparator) throws IOException {
+  public static StoreEngine<?, ?, ?, ?> create(HStore store, Configuration conf,
+    CellComparator cellComparator) throws IOException {
     String className = conf.get(STORE_ENGINE_CLASS_KEY, DEFAULT_STORE_ENGINE_CLASS.getName());
     try {
-      StoreEngine<?,?,?,?> se = ReflectionUtils.instantiateWithCustomCtor(
-          className, new Class[] { }, new Object[] { });
+      StoreEngine<?, ?, ?, ?> se =
+        ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {}, new Object[] {});
       se.createComponentsOnce(conf, store, cellComparator);
       return se;
     } catch (Exception e) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
index 27127f3..a40b209 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import com.google.errorprone.annotations.RestrictedApi;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Comparator;
@@ -49,12 +50,16 @@ public interface StoreFileManager {
    * Loads the initial store files into empty StoreFileManager.
    * @param storeFiles The files to load.
    */
+  @RestrictedApi(explanation = "Should only be called in StoreEngine", link = "",
+    allowedOnPath = ".*(/org/apache/hadoop/hbase/regionserver/StoreEngine.java|/src/test/.*)")
   void loadFiles(List<HStoreFile> storeFiles);
 
   /**
    * Adds new files, either for from MemStore flush or bulk insert, into the structure.
    * @param sfs New store files.
    */
+  @RestrictedApi(explanation = "Should only be called in StoreEngine", link = "",
+    allowedOnPath = ".*(/org/apache/hadoop/hbase/regionserver/StoreEngine.java|/src/test/.*)")
   void insertNewFiles(Collection<HStoreFile> sfs);
 
   /**
@@ -62,12 +67,16 @@ public interface StoreFileManager {
    * @param compactedFiles The input files for the compaction.
    * @param results The resulting files for the compaction.
    */
+  @RestrictedApi(explanation = "Should only be called in StoreEngine", link = "",
+    allowedOnPath = ".*(/org/apache/hadoop/hbase/regionserver/StoreEngine.java|/src/test/.*)")
   void addCompactionResults(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> results);
 
   /**
    * Remove the compacted files
    * @param compactedFiles the list of compacted files
    */
+  @RestrictedApi(explanation = "Should only be called in StoreEngine", link = "",
+    allowedOnPath = ".*(/org/apache/hadoop/hbase/regionserver/StoreEngine.java|/src/test/.*)")
   void removeCompactedFiles(Collection<HStoreFile> compactedFiles);
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
index 1064b6c..5803128 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
@@ -70,10 +70,17 @@ abstract class StoreFlusher {
     writer.close();
   }
 
+  protected final StoreFileWriter createWriter(MemStoreSnapshot snapshot, boolean alwaysIncludesTag)
+    throws IOException {
+    return store.getStoreEngine()
+      .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(snapshot.getCellsCount())
+        .compression(store.getColumnFamilyDescriptor().getCompressionType()).isCompaction(false)
+        .includeMVCCReadpoint(true).includesTag(alwaysIncludesTag || snapshot.isTagsPresent())
+        .shouldDropBehind(false));
+  }
 
   /**
    * Creates the scanner for flushing snapshot. Also calls coprocessors.
-   * @param snapshotScanners
    * @return The scanner; null if coprocessor is canceling the flush.
    */
   protected final InternalScanner createScanner(List<KeyValueScanner> snapshotScanners,
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java
index ac5955f..e464b86 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java
@@ -20,10 +20,13 @@ package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.List;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.OptionalLong;
-
+import java.util.function.Predicate;
+import java.util.function.ToLongFunction;
+import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
@@ -39,10 +42,13 @@ import org.slf4j.LoggerFactory;
  * Utility functions for region server storage layer.
  */
 @InterfaceAudience.Private
-public class StoreUtils {
+public final class StoreUtils {
 
   private static final Logger LOG = LoggerFactory.getLogger(StoreUtils.class);
 
+  private StoreUtils() {
+  }
+
   /**
    * Creates a deterministic hash code for store file collection.
    */
@@ -161,4 +167,30 @@ public class StoreUtils {
         HFile.DEFAULT_BYTES_PER_CHECKSUM);
   }
 
+  public static List<StoreFileInfo> toStoreFileInfo(Collection<HStoreFile> storefiles) {
+    return storefiles.stream().map(HStoreFile::getFileInfo).collect(Collectors.toList());
+  }
+
+  public static long getTotalUncompressedBytes(List<HStoreFile> files) {
+    return files.stream()
+      .mapToLong(file -> getStorefileFieldSize(file, StoreFileReader::getTotalUncompressedBytes))
+      .sum();
+  }
+
+  public static long getStorefilesSize(Collection<HStoreFile> files,
+    Predicate<HStoreFile> predicate) {
+    return files.stream().filter(predicate)
+      .mapToLong(file -> getStorefileFieldSize(file, StoreFileReader::length)).sum();
+  }
+
+  public static long getStorefileFieldSize(HStoreFile file, ToLongFunction<StoreFileReader> f) {
+    if (file == null) {
+      return 0L;
+    }
+    StoreFileReader reader = file.getReader();
+    if (reader == null) {
+      return 0L;
+    }
+    return f.applyAsLong(reader);
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java
index 14863a6..bfb3f64 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java
@@ -20,20 +20,19 @@ package org.apache.hadoop.hbase.regionserver;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
 import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
 import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
index 1560aef..f8183b7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
@@ -70,7 +70,7 @@ public class StripeStoreFlusher extends StoreFlusher {
     StripeMultiFileWriter mw = null;
     try {
       mw = req.createWriter(); // Writer according to the policy.
-      StripeMultiFileWriter.WriterFactory factory = createWriterFactory(cellsCount);
+      StripeMultiFileWriter.WriterFactory factory = createWriterFactory(snapshot);
       StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
       mw.init(storeScanner, factory);
 
@@ -98,13 +98,12 @@ public class StripeStoreFlusher extends StoreFlusher {
     return result;
   }
 
-  private StripeMultiFileWriter.WriterFactory createWriterFactory(final long kvCount) {
+  private StripeMultiFileWriter.WriterFactory createWriterFactory(MemStoreSnapshot snapshot) {
     return new StripeMultiFileWriter.WriterFactory() {
       @Override
       public StoreFileWriter createWriter() throws IOException {
-        StoreFileWriter writer = store.createWriterInTmp(kvCount,
-            store.getColumnFamilyDescriptor().getCompressionType(), false, true, true, false);
-        return writer;
+        // XXX: it used to always pass true for includesTag, re-consider?
+        return StripeStoreFlusher.this.createWriter(snapshot, true);
       }
     };
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
index 42841bf..533be17 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
@@ -51,13 +51,14 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
     WriterFactory writerFactory = new WriterFactory() {
       @Override
       public StoreFileWriter createWriter() throws IOException {
-        return createTmpWriter(fd, shouldDropBehind, major);
+        return AbstractMultiOutputCompactor.this.createWriter(fd, shouldDropBehind, major);
       }
 
       @Override
       public StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy)
-          throws IOException {
-        return createTmpWriter(fd, shouldDropBehind, fileStoragePolicy, major);
+        throws IOException {
+        return AbstractMultiOutputCompactor.this.createWriter(fd, shouldDropBehind,
+          fileStoragePolicy, major);
       }
     };
     // Prepare multi-writer, and perform the compaction using scanner and writer.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index e524f7d..47ef0f2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -28,7 +28,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
@@ -39,6 +38,7 @@ import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileInfo;
 import org.apache.hadoop.hbase.regionserver.CellSink;
+import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.HStoreFile;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
@@ -61,6 +61,7 @@ import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
 
 /**
@@ -261,29 +262,32 @@ public abstract class Compactor<T extends CellSink> {
     }
   };
 
+  protected final CreateStoreFileWriterParams createParams(FileDetails fd, boolean shouldDropBehind,
+    boolean major) {
+    return CreateStoreFileWriterParams.create().maxKeyCount(fd.maxKeyCount)
+      .compression(major ? majorCompactionCompression : minorCompactionCompression)
+      .isCompaction(true).includeMVCCReadpoint(fd.maxMVCCReadpoint > 0)
+      .includesTag(fd.maxTagsLength > 0).shouldDropBehind(shouldDropBehind)
+      .totalCompactedFilesSize(fd.totalCompactedFilesSize);
+  }
+
   /**
-   * Creates a writer for a new file in a temporary directory.
+   * Creates a writer for a new file.
    * @param fd The file details.
-   * @return Writer for a new StoreFile in the tmp dir.
+   * @return Writer for a new StoreFile
    * @throws IOException if creation failed
    */
-  protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind, boolean major)
-      throws IOException {
+  protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind,
+    boolean major) throws IOException {
     // When all MVCC readpoints are 0, don't write them.
     // See HBASE-8166, HBASE-12600, and HBASE-13389.
-    return store.createWriterInTmp(fd.maxKeyCount,
-        major ? majorCompactionCompression : minorCompactionCompression,
-        true, fd.maxMVCCReadpoint > 0,
-        fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize,
-        HConstants.EMPTY_STRING);
+    return store.getStoreEngine().createWriter(createParams(fd, shouldDropBehind, major));
   }
 
-  protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind,
-      String fileStoragePolicy, boolean major) throws IOException {
-    return store.createWriterInTmp(fd.maxKeyCount,
-      major ? majorCompactionCompression : minorCompactionCompression,
-      true, fd.maxMVCCReadpoint > 0,
-      fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize, fileStoragePolicy);
+  protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind,
+    String fileStoragePolicy, boolean major) throws IOException {
+    return store.getStoreEngine()
+      .createWriter(createParams(fd, shouldDropBehind, major).fileStoragePolicy(fileStoragePolicy));
   }
 
   private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
index 49d3e8e..afa2429 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
@@ -45,14 +45,14 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
   }
 
   private final CellSinkFactory<StoreFileWriter> writerFactory =
-      new CellSinkFactory<StoreFileWriter>() {
-        @Override
-        public StoreFileWriter createWriter(InternalScanner scanner,
-            org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
-            boolean shouldDropBehind, boolean major) throws IOException {
-          return createTmpWriter(fd, shouldDropBehind, major);
-        }
-      };
+    new CellSinkFactory<StoreFileWriter>() {
+      @Override
+      public StoreFileWriter createWriter(InternalScanner scanner,
+        org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
+        boolean shouldDropBehind, boolean major) throws IOException {
+        return DefaultCompactor.this.createWriter(fd, shouldDropBehind, major);
+      }
+    };
 
   /**
    * Do a minor/major compaction on an explicit set of storefiles from a Store.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java
new file mode 100644
index 0000000..d4c9a86
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.storefiletracker;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.StoreContext;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * The default implementation for store file tracker, where we do not persist the store file list,
+ * and use listing when loading store files.
+ */
+@InterfaceAudience.Private
+class DefaultStoreFileTracker extends StoreFileTrackerBase {
+
+  public DefaultStoreFileTracker(Configuration conf, TableName tableName, boolean isPrimaryReplica,
+    StoreContext ctx) {
+    super(conf, tableName, isPrimaryReplica, ctx);
+  }
+
+  @Override
+  public List<StoreFileInfo> load() throws IOException {
+    return ctx.getRegionFileSystem().getStoreFiles(ctx.getFamily().getNameAsString());
+  }
+
+  @Override
+  public boolean requireWritingToTmpDirFirst() {
+    return true;
+  }
+
+  @Override
+  protected void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException {
+    // NOOP
+  }
+
+  @Override
+  protected void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
+    Collection<StoreFileInfo> newFiles) throws IOException {
+    // NOOP
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java
new file mode 100644
index 0000000..aadedc8
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.storefiletracker;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * An interface to define how we track the store files for a give store.
+ * <p/>
+ * In the old time, we will write store to a tmp directory first, and then rename it to the actual
+ * data file. And once a store file is under data directory, we will consider it as 'committed'. And
+ * we need to do listing when loading store files.
+ * <p/>
+ * When cloud age is coming, now we want to store the store files on object storage, where rename
+ * and list are not as cheap as on HDFS, especially rename. Although introducing a metadata
+ * management layer for object storage could solve the problem, but we still want HBase to run on
+ * pure object storage, so here we introduce this interface to abstract how we track the store
+ * files. For the old implementation, we just persist nothing here, and do listing to load store
+ * files. When running on object storage, we could persist the store file list in a system region,
+ * or in a file on the object storage, to make it possible to write directly into the data directory
+ * to avoid renaming, and also avoid listing when loading store files.
+ * <p/>
+ * The implementation requires to be thread safe as flush and compaction may occur as the same time,
+ * and we could also do multiple compactions at the same time. As the implementation may choose to
+ * persist the store file list to external storage, which could be slow, it is the duty for the
+ * callers to not call it inside a lock which may block normal read/write requests.
+ */
+@InterfaceAudience.Private
+public interface StoreFileTracker {
+
+  /**
+   * Load the store files list when opening a region.
+   */
+  List<StoreFileInfo> load() throws IOException;
+
+  /**
+   * Add new store files.
+   * <p/>
+   * Used for flush and bulk load.
+   */
+  void add(Collection<StoreFileInfo> newFiles) throws IOException;
+
+  /**
+   * Add new store files and remove compacted store files after compaction.
+   */
+  void replace(Collection<StoreFileInfo> compactedFiles, Collection<StoreFileInfo> newFiles)
+    throws IOException;
+
+  /**
+   * Create a writer for writing new store files.
+   * @return Writer for a new StoreFile
+   */
+  StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException;
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java
new file mode 100644
index 0000000..2451f45
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.storefiletracker;
+
+import java.io.IOException;
+import java.util.Collection;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.crypto.Encryption;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
+import org.apache.hadoop.hbase.regionserver.StoreContext;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
+import org.apache.hadoop.hbase.regionserver.StoreUtils;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class for all store file tracker.
+ * <p/>
+ * Mainly used to place the common logic to skip persistent for secondary replicas.
+ */
+@InterfaceAudience.Private
+abstract class StoreFileTrackerBase implements StoreFileTracker {
+
+  private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerBase.class);
+
+  protected final Configuration conf;
+
+  protected final TableName tableName;
+
+  protected final boolean isPrimaryReplica;
+
+  protected final StoreContext ctx;
+
+  private volatile boolean cacheOnWriteLogged;
+
+  protected StoreFileTrackerBase(Configuration conf, TableName tableName, boolean isPrimaryReplica,
+    StoreContext ctx) {
+    this.conf = conf;
+    this.tableName = tableName;
+    this.isPrimaryReplica = isPrimaryReplica;
+    this.ctx = ctx;
+  }
+
+  @Override
+  public final void add(Collection<StoreFileInfo> newFiles) throws IOException {
+    if (isPrimaryReplica) {
+      doAddNewStoreFiles(newFiles);
+    }
+  }
+
+  @Override
+  public final void replace(Collection<StoreFileInfo> compactedFiles,
+    Collection<StoreFileInfo> newFiles) throws IOException {
+    if (isPrimaryReplica) {
+      doAddCompactionResults(compactedFiles, newFiles);
+    }
+  }
+
+  private HFileContext createFileContext(Compression.Algorithm compression,
+    boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context encryptionContext) {
+    if (compression == null) {
+      compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
+    }
+    ColumnFamilyDescriptor family = ctx.getFamily();
+    HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(includeMVCCReadpoint)
+      .withIncludesTags(includesTag).withCompression(compression)
+      .withCompressTags(family.isCompressTags()).withChecksumType(StoreUtils.getChecksumType(conf))
+      .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf))
+      .withBlockSize(family.getBlocksize()).withHBaseCheckSum(true)
+      .withDataBlockEncoding(family.getDataBlockEncoding()).withEncryptionContext(encryptionContext)
+      .withCreateTime(EnvironmentEdgeManager.currentTime()).withColumnFamily(family.getName())
+      .withTableName(tableName.getName()).withCellComparator(ctx.getComparator()).build();
+    return hFileContext;
+  }
+
+  @Override
+  public final StoreFileWriter createWriter(CreateStoreFileWriterParams params)
+    throws IOException {
+    if (!isPrimaryReplica) {
+      throw new IllegalStateException("Should not call create writer on secondary replicas");
+    }
+    // creating new cache config for each new writer
+    final CacheConfig cacheConf = ctx.getCacheConf();
+    final CacheConfig writerCacheConf = new CacheConfig(cacheConf);
+    long totalCompactedFilesSize = params.totalCompactedFilesSize();
+    if (params.isCompaction()) {
+      // Don't cache data on write on compactions, unless specifically configured to do so
+      // Cache only when total file size remains lower than configured threshold
+      final boolean cacheCompactedBlocksOnWrite = cacheConf.shouldCacheCompactedBlocksOnWrite();
+      // if data blocks are to be cached on write
+      // during compaction, we should forcefully
+      // cache index and bloom blocks as well
+      if (cacheCompactedBlocksOnWrite &&
+        totalCompactedFilesSize <= cacheConf.getCacheCompactedBlocksOnWriteThreshold()) {
+        writerCacheConf.enableCacheOnWrite();
+        if (!cacheOnWriteLogged) {
+          LOG.info("For {} , cacheCompactedBlocksOnWrite is true, hence enabled " +
+            "cacheOnWrite for Data blocks, Index blocks and Bloom filter blocks", this);
+          cacheOnWriteLogged = true;
+        }
+      } else {
+        writerCacheConf.setCacheDataOnWrite(false);
+        if (totalCompactedFilesSize > cacheConf.getCacheCompactedBlocksOnWriteThreshold()) {
+          // checking condition once again for logging
+          LOG.debug(
+            "For {}, setting cacheCompactedBlocksOnWrite as false as total size of compacted " +
+              "files - {}, is greater than cacheCompactedBlocksOnWriteThreshold - {}",
+            this, totalCompactedFilesSize, cacheConf.getCacheCompactedBlocksOnWriteThreshold());
+        }
+      }
+    } else {
+      final boolean shouldCacheDataOnWrite = cacheConf.shouldCacheDataOnWrite();
+      if (shouldCacheDataOnWrite) {
+        writerCacheConf.enableCacheOnWrite();
+        if (!cacheOnWriteLogged) {
+          LOG.info("For {} , cacheDataOnWrite is true, hence enabled cacheOnWrite for " +
+            "Index blocks and Bloom filter blocks", this);
+          cacheOnWriteLogged = true;
+        }
+      }
+    }
+    Encryption.Context encryptionContext = ctx.getEncryptionContext();
+    HFileContext hFileContext = createFileContext(params.compression(),
+      params.includeMVCCReadpoint(), params.includesTag(), encryptionContext);
+    Path outputDir;
+    if (requireWritingToTmpDirFirst()) {
+      outputDir =
+        new Path(ctx.getRegionFileSystem().getTempDir(), ctx.getFamily().getNameAsString());
+    } else {
+      throw new UnsupportedOperationException("not supported yet");
+    }
+    StoreFileWriter.Builder builder =
+      new StoreFileWriter.Builder(conf, writerCacheConf, ctx.getRegionFileSystem().getFileSystem())
+        .withOutputDir(outputDir).withBloomType(ctx.getBloomFilterType())
+        .withMaxKeyCount(params.maxKeyCount()).withFavoredNodes(ctx.getFavoredNodes())
+        .withFileContext(hFileContext).withShouldDropCacheBehind(params.shouldDropBehind())
+        .withCompactedFilesSupplier(ctx.getCompactedFilesSupplier())
+        .withFileStoragePolicy(params.fileStoragePolicy());
+    return builder.build();
+  }
+
+  /**
+   * Whether the implementation of this tracker requires you to write to temp directory first, i.e,
+   * does not allow broken store files under the actual data directory.
+   */
+  protected abstract boolean requireWritingToTmpDirFirst();
+
+  protected abstract void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException;
+
+  protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
+    Collection<StoreFileInfo> newFiles) throws IOException;
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java
new file mode 100644
index 0000000..4f7231b
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.storefiletracker;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.StoreContext;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Factory method for creating store file tracker.
+ */
+@InterfaceAudience.Private
+public final class StoreFileTrackerFactory {
+
+  public static StoreFileTracker create(Configuration conf, TableName tableName,
+    boolean isPrimaryReplica, StoreContext ctx) {
+    return new DefaultStoreFileTracker(conf, tableName, isPrimaryReplica, ctx);
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java
index 22ec6cb..291b909 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java
@@ -105,7 +105,6 @@ class MajorCompactionRequest {
 
   boolean shouldCFBeCompacted(HRegionFileSystem fileSystem, String family, long ts)
       throws IOException {
-
     // do we have any store files?
     Collection<StoreFileInfo> storeFiles = fileSystem.getStoreFiles(family);
     if (storeFiles == null) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index 3c2bc3f..9314d7e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -211,11 +211,13 @@ public class TestIOFencing {
 
     @Override
     protected void refreshStoreSizeAndTotalBytes() throws IOException {
-      try {
-        r.compactionsWaiting.countDown();
-        r.compactionsBlocked.await();
-      } catch (InterruptedException ex) {
-        throw new IOException(ex);
+      if (r != null) {
+        try {
+          r.compactionsWaiting.countDown();
+          r.compactionsBlocked.await();
+        } catch (InterruptedException ex) {
+          throw new IOException(ex);
+        }
       }
       super.refreshStoreSizeAndTotalBytes();
     }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java
index cd83dc8..0a6b37b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java
@@ -215,8 +215,10 @@ public class TestCacheOnWriteInSchema {
   @Test
   public void testCacheOnWriteInSchema() throws IOException {
     // Write some random data into the store
-    StoreFileWriter writer = store.createWriterInTmp(Integer.MAX_VALUE,
-        HFile.DEFAULT_COMPRESSION_ALGORITHM, false, true, false, false);
+    StoreFileWriter writer = store.getStoreEngine()
+      .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(Integer.MAX_VALUE)
+        .compression(HFile.DEFAULT_COMPRESSION_ALGORITHM).isCompaction(false)
+        .includeMVCCReadpoint(true).includesTag(false).shouldDropBehind(false));
     writeStoreFile(writer);
     writer.close();
     // Verify the block types of interest were cached on write
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java
index e832c47..3784876 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java
@@ -65,9 +65,12 @@ public class TestDefaultStoreEngine {
         DummyCompactionPolicy.class.getName());
     conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
         DummyStoreFlusher.class.getName());
+    HRegion mockRegion = Mockito.mock(HRegion.class);
     HStore mockStore = Mockito.mock(HStore.class);
     Mockito.when(mockStore.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO);
-    StoreEngine<?, ?, ?, ?> se = StoreEngine.create(mockStore, conf, CellComparatorImpl.COMPARATOR);
+    Mockito.when(mockStore.getHRegion()).thenReturn(mockRegion);
+    StoreEngine<?, ?, ?, ?> se =
+      StoreEngine.create(mockStore, conf, CellComparatorImpl.COMPARATOR);
     Assert.assertTrue(se instanceof DefaultStoreEngine);
     Assert.assertTrue(se.getCompactionPolicy() instanceof DummyCompactionPolicy);
     Assert.assertTrue(se.getStoreFlusher() instanceof DummyStoreFlusher);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index a5584ff..7fb5768 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -5744,7 +5744,7 @@ public class TestHRegion {
       Collection<HStoreFile> storeFiles = primaryRegion.getStore(families[0]).getStorefiles();
       primaryRegion.getRegionFileSystem().removeStoreFiles(Bytes.toString(families[0]), storeFiles);
       Collection<StoreFileInfo> storeFileInfos = primaryRegion.getRegionFileSystem()
-          .getStoreFiles(families[0]);
+          .getStoreFiles(Bytes.toString(families[0]));
       Assert.assertTrue(storeFileInfos == null || storeFileInfos.isEmpty());
 
       verifyData(secondaryRegion, 0, 1000, cq, families);
@@ -7648,7 +7648,7 @@ public class TestHRegion {
             getCacheConfig() != null? getCacheConfig().shouldEvictOnClose(): true;
         for (Path newFile : newFiles) {
           // Create storefile around what we wrote with a reader on it.
-          HStoreFile sf = createStoreFileAndReader(newFile);
+          HStoreFile sf = storeEngine.createStoreFileAndReader(newFile);
           sf.closeStoreFile(evictOnClose);
           sfs.add(sf);
         }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index 46995d9..f6cfc7b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -305,7 +305,7 @@ public class TestHStore {
 
   /**
    * Verify that compression and data block encoding are respected by the
-   * Store.createWriterInTmp() method, used on store flush.
+   * createWriter method, used on store flush.
    */
   @Test
   public void testCreateWriter() throws Exception {
@@ -317,9 +317,11 @@ public class TestHStore {
         .build();
     init(name.getMethodName(), conf, hcd);
 
-    // Test createWriterInTmp()
-    StoreFileWriter writer =
-        store.createWriterInTmp(4, hcd.getCompressionType(), false, true, false, false);
+    // Test createWriter
+    StoreFileWriter writer = store.getStoreEngine()
+      .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(4)
+        .compression(hcd.getCompressionType()).isCompaction(false).includeMVCCReadpoint(true)
+        .includesTag(false).shouldDropBehind(false));
     Path path = writer.getPath();
     writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1)));
     writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2)));
@@ -1016,19 +1018,19 @@ public class TestHStore {
     // add one more file
     addStoreFile();
 
-    HStore spiedStore = spy(store);
+    StoreEngine<?, ?, ?, ?> spiedStoreEngine = spy(store.getStoreEngine());
 
     // call first time after files changed
-    spiedStore.refreshStoreFiles();
+    spiedStoreEngine.refreshStoreFiles();
     assertEquals(2, this.store.getStorefilesCount());
-    verify(spiedStore, times(1)).replaceStoreFiles(any(), any());
+    verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any());
 
     // call second time
-    spiedStore.refreshStoreFiles();
+    spiedStoreEngine.refreshStoreFiles();
 
     // ensure that replaceStoreFiles is not called, i.e, the times does not change, if files are not
     // refreshed,
-    verify(spiedStore, times(1)).replaceStoreFiles(any(), any());
+    verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any());
   }
 
   private long countMemStoreScanner(StoreScanner scanner) {
@@ -1639,7 +1641,7 @@ public class TestHStore {
     // Do compaction
     MyThread thread = new MyThread(storeScanner);
     thread.start();
-    store.replaceStoreFiles(actualStorefiles, actualStorefiles1);
+    store.replaceStoreFiles(actualStorefiles, actualStorefiles1, false);
     thread.join();
     KeyValueHeap heap2 = thread.getHeap();
     assertFalse(heap.equals(heap2));
@@ -1705,8 +1707,10 @@ public class TestHStore {
   @Test
   public void testHFileContextSetWithCFAndTable() throws Exception {
     init(this.name.getMethodName());
-    StoreFileWriter writer = store.createWriterInTmp(10000L,
-        Compression.Algorithm.NONE, false, true, false, true);
+    StoreFileWriter writer = store.getStoreEngine()
+      .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(10000L)
+        .compression(Compression.Algorithm.NONE).isCompaction(true).includeMVCCReadpoint(true)
+        .includesTag(false).shouldDropBehind(true));
     HFileContext hFileContext = writer.getHFileWriter().getFileContext();
     assertArrayEquals(family, hFileContext.getColumnFamily());
     assertArrayEquals(table, hFileContext.getTableName());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
index 737ba46..3087c82 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
@@ -245,7 +245,7 @@ public class TestRegionMergeTransactionOnCluster {
         TEST_UTIL.getConfiguration(), fs, tabledir, mergedRegionInfo);
       int count = 0;
       for(ColumnFamilyDescriptor colFamily : columnFamilies) {
-        count += hrfs.getStoreFiles(colFamily.getName()).size();
+        count += hrfs.getStoreFiles(colFamily.getNameAsString()).size();
       }
       ADMIN.compactRegion(mergedRegionInfo.getRegionName());
       // clean up the merged region store files
@@ -254,7 +254,7 @@ public class TestRegionMergeTransactionOnCluster {
       int newcount = 0;
       while (EnvironmentEdgeManager.currentTime() < timeout) {
         for(ColumnFamilyDescriptor colFamily : columnFamilies) {
-          newcount += hrfs.getStoreFiles(colFamily.getName()).size();
+          newcount += hrfs.getStoreFiles(colFamily.getNameAsString()).size();
         }
         if(newcount > count) {
           break;
@@ -273,7 +273,7 @@ public class TestRegionMergeTransactionOnCluster {
       while (EnvironmentEdgeManager.currentTime() < timeout) {
         int newcount1 = 0;
         for(ColumnFamilyDescriptor colFamily : columnFamilies) {
-          newcount1 += hrfs.getStoreFiles(colFamily.getName()).size();
+          newcount1 += hrfs.getStoreFiles(colFamily.getNameAsString()).size();
         }
         if(newcount1 <= 1) {
           break;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java
index 9141327..2fab050 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java
@@ -26,7 +26,6 @@ import static org.mockito.Mockito.when;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -98,7 +97,7 @@ public class TestStoreFileRefresherChore {
     }
 
     @Override
-    public Collection<StoreFileInfo> getStoreFiles(String familyName) throws IOException {
+    public List<StoreFileInfo> getStoreFiles(String familyName) throws IOException {
       if (fail) {
         throw new IOException("simulating FS failure");
       }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java
index 4cb023f..121f830 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java
@@ -29,7 +29,6 @@ import java.util.NavigableSet;
 import java.util.Random;
 import java.util.TreeSet;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -128,13 +127,12 @@ public class TestStoreScannerClosure {
       p.addColumn(fam, Bytes.toBytes("q1"), Bytes.toBytes("val"));
       region.put(p);
       HStore store = region.getStore(fam);
-      ReentrantReadWriteLock lock = store.lock;
       // use the lock to manually get a new memstore scanner. this is what
       // HStore#notifyChangedReadersObservers does under the lock.(lock is not needed here
       //since it is just a testcase).
-      lock.readLock().lock();
+      store.getStoreEngine().readLock();
       final List<KeyValueScanner> memScanners = store.memstore.getScanners(Long.MAX_VALUE);
-      lock.readLock().unlock();
+      store.getStoreEngine().readUnlock();
       Thread closeThread = new Thread() {
         public void run() {
           // close should be completed
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java
index f5330f6..eb0b1c1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java
@@ -118,8 +118,10 @@ public class TestStripeStoreEngine {
   }
 
   private static TestStoreEngine createEngine(Configuration conf) throws Exception {
+    HRegion region = mock(HRegion.class);
     HStore store = mock(HStore.class);
     when(store.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO);
+    when(store.getHRegion()).thenReturn(region);
     CellComparatorImpl kvComparator = mock(CellComparatorImpl.class);
     return (TestStoreEngine) StoreEngine.create(store, conf, kvComparator);
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
index d7b7ba7..0ea82c5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
@@ -22,9 +22,6 @@ import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.cre
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -45,11 +42,13 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.HStoreFile;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.ScanInfo;
 import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.StoreEngine;
 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
 import org.apache.hadoop.hbase.regionserver.StoreUtils;
 import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.Scanner;
@@ -110,10 +109,9 @@ public class TestDateTieredCompactor {
     when(store.areWritesEnabled()).thenReturn(true);
     when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
     when(store.getRegionInfo()).thenReturn(RegionInfoBuilder.newBuilder(TABLE_NAME).build());
-    when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
-      anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
-    when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
-      anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyString())).thenAnswer(writers);
+    StoreEngine storeEngine = mock(StoreEngine.class);
+    when(storeEngine.createWriter(any(CreateStoreFileWriterParams.class))).thenAnswer(writers);
+    when(store.getStoreEngine()).thenReturn(storeEngine);
     when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR);
     OptionalLong maxSequenceId = StoreUtils.getMaxSequenceIdInList(storefiles);
     when(store.getMaxSequenceId()).thenReturn(maxSequenceId);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
index 5eb94ac..2841695 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
@@ -27,7 +27,6 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.isNull;
@@ -58,6 +57,7 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.HStoreFile;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
@@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.regionserver.ScanInfo;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
+import org.apache.hadoop.hbase.regionserver.StoreEngine;
 import org.apache.hadoop.hbase.regionserver.StoreFileReader;
 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
 import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
@@ -828,12 +829,9 @@ public class TestStripeCompactionPolicy {
     when(info.getRegionNameAsString()).thenReturn("testRegion");
     when(store.getColumnFamilyDescriptor()).thenReturn(familyDescriptor);
     when(store.getRegionInfo()).thenReturn(info);
-    when(
-      store.createWriterInTmp(anyLong(), any(), anyBoolean(),
-        anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
-    when(
-      store.createWriterInTmp(anyLong(), any(), anyBoolean(),
-        anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyString())).thenAnswer(writers);
+    StoreEngine storeEngine = mock(StoreEngine.class);
+    when(storeEngine.createWriter(any(CreateStoreFileWriterParams.class))).thenAnswer(writers);
+    when(store.getStoreEngine()).thenReturn(storeEngine);
 
     Configuration conf = HBaseConfiguration.create();
     conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
index e49174e..ae59c74 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
@@ -21,9 +21,6 @@ import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_K
 import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyRequest;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -43,10 +40,12 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.ScanInfo;
 import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.StoreEngine;
 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
 import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.Scanner;
 import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture;
@@ -209,10 +208,9 @@ public class TestStripeCompactor {
     when(store.areWritesEnabled()).thenReturn(true);
     when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
     when(store.getRegionInfo()).thenReturn(RegionInfoBuilder.newBuilder(TABLE_NAME).build());
-    when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
-      anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
-    when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
-      anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyString())).thenAnswer(writers);
+    StoreEngine storeEngine = mock(StoreEngine.class);
+    when(storeEngine.createWriter(any(CreateStoreFileWriterParams.class))).thenAnswer(writers);
+    when(store.getStoreEngine()).thenReturn(storeEngine);
     when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR);
 
     return new StripeCompactor(conf, store) {