You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2022/04/17 14:54:14 UTC

[hbase] branch branch-2 updated: HBASE-26938 Compaction failures after StoreFileTracker integration (#4350)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new ae8eccd5c09 HBASE-26938 Compaction failures after StoreFileTracker integration (#4350)
ae8eccd5c09 is described below

commit ae8eccd5c09d534169987390d0b957c714c257c7
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Sun Apr 17 21:58:12 2022 +0800

    HBASE-26938 Compaction failures after StoreFileTracker integration (#4350)
    
    Introduce a StoreFileWriterCreationTracker to track the store files being written
    
    Signed-off-by: Josh Elser <el...@apache.org>
    Signed-off-by: Andrew Purtell <ap...@apache.org>
    (cherry picked from commit 48c4a4626e54724beb4ed46becbedbe292841dc4)
---
 .../hadoop/hbase/mob/DefaultMobStoreCompactor.java |  23 ++-
 .../hadoop/hbase/mob/DefaultMobStoreFlusher.java   |   5 +-
 .../regionserver/AbstractMultiFileWriter.java      |   8 +-
 .../hbase/regionserver/BrokenStoreFileCleaner.java |   2 +-
 .../regionserver/CreateStoreFileWriterParams.java  |  14 +-
 .../regionserver/DateTieredMultiFileWriter.java    |   2 +-
 .../hbase/regionserver/DefaultStoreFlusher.java    |   7 +-
 .../apache/hadoop/hbase/regionserver/HStore.java   | 180 +++++++++++++++------
 .../hadoop/hbase/regionserver/StoreEngine.java     |  17 +-
 .../hadoop/hbase/regionserver/StoreFileWriter.java |  34 +++-
 .../hadoop/hbase/regionserver/StoreFlusher.java    |  22 +--
 .../hbase/regionserver/StripeMultiFileWriter.java  |   2 +-
 .../hbase/regionserver/StripeStoreFlusher.java     |  21 ++-
 .../compactions/AbstractMultiOutputCompactor.java  |  19 +--
 .../compactions/CompactionProgress.java            |   2 +-
 .../compactions/CompactionRequestImpl.java         |  12 +-
 .../hbase/regionserver/compactions/Compactor.java  | 128 ++++++++-------
 .../compactions/DateTieredCompactor.java           |  21 ++-
 .../regionserver/compactions/DefaultCompactor.java |  19 +--
 .../regionserver/compactions/StripeCompactor.java  |  38 +++--
 .../storefiletracker/StoreFileTrackerBase.java     |  12 +-
 .../hbase/regionserver/TestCompactorMemLeak.java   |   5 +-
 .../hadoop/hbase/regionserver/TestHStore.java      |   7 +-
 .../hbase/regionserver/TestMajorCompaction.java    |  19 ---
 .../TestSplitTransactionOnCluster.java             |  17 +-
 .../regionserver/wal/AbstractTestWALReplay.java    |   8 +-
 26 files changed, 395 insertions(+), 249 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
index dd800f9b2c3..03b861be7a3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
@@ -22,12 +22,14 @@ import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.function.Consumer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.regionserver.CellSink;
 import org.apache.hadoop.hbase.regionserver.HMobStore;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
@@ -40,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
 import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
 import org.apache.hadoop.hbase.regionserver.StoreScanner;
 import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
@@ -84,10 +87,14 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
       @Override
       public StoreFileWriter createWriter(InternalScanner scanner,
         org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
-        boolean shouldDropBehind, boolean major) throws IOException {
+        boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker)
+        throws IOException {
         // make this writer with tags always because of possible new cells with tags.
-        return store.getStoreEngine().createWriter(
-          createParams(fd, shouldDropBehind, major).includeMVCCReadpoint(true).includesTag(true));
+        return store.getStoreEngine()
+          .createWriter(
+            createParams(fd, shouldDropBehind, major, writerCreationTracker)
+              .includeMVCCReadpoint(true)
+              .includesTag(true));
       }
     };
 
@@ -155,18 +162,19 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
    * the scanner to filter the deleted cells.
    * @param fd File details
    * @param scanner Where to read from.
+   * @param writer Where to write to.
    * @param smallestReadPoint Smallest read point.
    * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
    * @param throughputController The compaction throughput controller.
    * @param major Is a major compaction.
    * @param numofFilesToCompact the number of files to compact
+   * @param progress Progress reporter.
    * @return Whether compaction ended; false if it was interrupted for any reason.
    */
   @Override
-  protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
+  protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
       long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
-      boolean major, int numofFilesToCompact) throws IOException {
-    long bytesWrittenProgressForCloseCheck = 0;
+      boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException {
     long bytesWrittenProgressForLog = 0;
     long bytesWrittenProgressForShippedCall = 0;
     // Since scanner.next() can return 'false' but still be delivering data,
@@ -370,9 +378,8 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
     return true;
   }
 
-
   @Override
-  protected List<Path> commitWriter(FileDetails fd,
+  protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
       CompactionRequestImpl request) throws IOException {
     List<Path> newFiles = Lists.newArrayList(writer.getPath());
     writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
index a52ce2b05c0..10ef4c450e7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
@@ -23,6 +23,7 @@ import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.function.Consumer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
@@ -100,7 +101,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
   @Override
   public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
       MonitoredTask status, ThroughputController throughputController,
-      FlushLifeCycleTracker tracker) throws IOException {
+      FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException {
     ArrayList<Path> result = new ArrayList<>();
     long cellsCount = snapshot.getCellsCount();
     if (cellsCount == 0) return result; // don't flush if there are no entries
@@ -114,7 +115,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
       synchronized (flushLock) {
         status.setStatus("Flushing " + store + ": creating writer");
         // Write the map out to the disk
-        writer = createWriter(snapshot, true);
+        writer = createWriter(snapshot, true, writerCreationTracker);
         IOException e = null;
         try {
           // It's a mob store, flush the cells in a mob way. This is the difference of flushing
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
index 82c3867c103..a824b501c82 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
@@ -67,7 +67,7 @@ public abstract class AbstractMultiFileWriter implements CellSink, ShipperListen
    * comments in HBASE-15400 for more details.
    */
   public List<Path> commitWriters(long maxSeqId, boolean majorCompaction) throws IOException {
-    return commitWriters(maxSeqId, majorCompaction, Collections.EMPTY_SET);
+    return commitWriters(maxSeqId, majorCompaction, Collections.emptyList());
   }
 
   public List<Path> commitWriters(long maxSeqId, boolean majorCompaction,
@@ -110,11 +110,7 @@ public abstract class AbstractMultiFileWriter implements CellSink, ShipperListen
     return paths;
   }
 
-  /**
-   * Returns all writers. This is used to prevent deleting currently writen storefiles
-   * during cleanup.
-   */
-  public abstract Collection<StoreFileWriter> writers();
+  protected abstract Collection<StoreFileWriter> writers();
 
   /**
    * Subclasses override this method to be called at the end of a successful sequence of append; all
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java
index 0c4807d8bad..042acb0bbc9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java
@@ -152,7 +152,7 @@ public class BrokenStoreFileCleaner extends ScheduledChore {
   }
 
   private boolean isCompactionResultFile(FileStatus file, HStore store) {
-    return store.getStoreEngine().getCompactor().getCompactionTargets().contains(file.getPath());
+    return store.getStoreFilesBeingWritten().contains(file.getPath());
   }
 
   // Compacted files can still have readers and are cleaned by a separate chore, so they have to
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CreateStoreFileWriterParams.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CreateStoreFileWriterParams.java
index 10cd9f009e4..1d45e1c51c5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CreateStoreFileWriterParams.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CreateStoreFileWriterParams.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import java.util.function.Consumer;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -40,6 +42,8 @@ public final class CreateStoreFileWriterParams {
 
   private String fileStoragePolicy = HConstants.EMPTY_STRING;
 
+  private Consumer<Path> writerCreationTracker;
+
   private CreateStoreFileWriterParams() {
   }
 
@@ -127,8 +131,16 @@ public final class CreateStoreFileWriterParams {
     return this;
   }
 
+  public Consumer<Path> writerCreationTracker() {
+    return writerCreationTracker;
+  }
+
+  public CreateStoreFileWriterParams writerCreationTracker(Consumer<Path> writerCreationTracker) {
+    this.writerCreationTracker = writerCreationTracker;
+    return this;
+  }
+
   public static CreateStoreFileWriterParams create() {
     return new CreateStoreFileWriterParams();
   }
-
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
index 1e10eb2db23..8201cb152c0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
@@ -71,7 +71,7 @@ public class DateTieredMultiFileWriter extends AbstractMultiFileWriter {
   }
 
   @Override
-  public Collection<StoreFileWriter> writers() {
+  protected Collection<StoreFileWriter> writers() {
     return lowerBoundary2Writer.values();
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
index 306760d7ce6..0f3daa4c177 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.function.Consumer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@@ -44,8 +45,8 @@ public class DefaultStoreFlusher extends StoreFlusher {
 
   @Override
   public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
-      MonitoredTask status, ThroughputController throughputController,
-      FlushLifeCycleTracker tracker) throws IOException {
+    MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker,
+    Consumer<Path> writerCreationTracker) throws IOException {
     ArrayList<Path> result = new ArrayList<>();
     int cellsCount = snapshot.getCellsCount();
     if (cellsCount == 0) return result; // don't flush if there are no entries
@@ -59,7 +60,7 @@ public class DefaultStoreFlusher extends StoreFlusher {
       synchronized (flushLock) {
         status.setStatus("Flushing " + store + ": creating writer");
         // Write the map out to the disk
-        writer = createWriter(snapshot, false);
+        writer = createWriter(snapshot, false, writerCreationTracker);
         IOException e = null;
         try {
           performFlush(scanner, writer, throughputController);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 41f18382073..3da95c18142 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -48,6 +48,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
 import java.util.function.ToLongFunction;
 import java.util.stream.Collectors;
 import java.util.stream.LongStream;
@@ -156,8 +158,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
   // rows that has cells from both memstore and files (or only files)
   private LongAdder mixedRowReadsCount = new LongAdder();
 
-  private boolean cacheOnWriteLogged;
-
   /**
    * Lock specific to archiving compacted store files.  This avoids races around
    * the combination of retrieving the list of compacted files and moving them to
@@ -215,14 +215,46 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
 
   private final StoreContext storeContext;
 
+  // Used to track the store files which are currently being written. For compaction, if we want to
+  // compact store file [a, b, c] to [d], then here we will record 'd'. And we will also use it to
+  // track the store files being written when flushing.
+  // Notice that the creation is in the background compaction or flush thread and we will get the
+  // files in other thread, so it needs to be thread safe.
+  private static final class StoreFileWriterCreationTracker implements Consumer<Path> {
+
+    private final Set<Path> files = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+    @Override
+    public void accept(Path t) {
+      files.add(t);
+    }
+
+    public Set<Path> get() {
+      return Collections.unmodifiableSet(files);
+    }
+  }
+
+  // We may have multiple compaction running at the same time, and flush can also happen at the same
+  // time, so here we need to use a collection, and the collection needs to be thread safe.
+  // The implementation of StoreFileWriterCreationTracker is very simple and we will not likely to
+  // implement hashCode or equals for it, so here we just use ConcurrentHashMap. Changed to
+  // IdentityHashMap if later we want to implement hashCode or equals.
+  private final Set<StoreFileWriterCreationTracker> storeFileWriterCreationTrackers =
+      Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+  // For the SFT implementation which we will write tmp store file first, we do not need to clean up
+  // the broken store files under the data directory, which means we do not need to track the store
+  // file writer creation. So here we abstract a factory to return different trackers for different
+  // SFT implementations.
+  private final Supplier<StoreFileWriterCreationTracker> storeFileWriterCreationTrackerFactory;
+
   /**
    * Constructor
    * @param family HColumnDescriptor for this column
-   * @param confParam configuration object failed.  Can be null.
+   * @param confParam configuration object failed. Can be null.
    */
   protected HStore(final HRegion region, final ColumnFamilyDescriptor family,
       final Configuration confParam, boolean warmup) throws IOException {
-
     this.conf = StoreUtils.createStoreConfiguration(confParam, region.getTableDescriptor(), family);
 
     this.region = region;
@@ -267,6 +299,12 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
 
     this.storeEngine = createStoreEngine(this, this.conf, region.getCellComparator());
     storeEngine.initialize(warmup);
+    // if require writing to tmp dir first, then we just return null, which indicate that we do not
+    // need to track the creation of store file writer, otherwise we return a new
+    // StoreFileWriterCreationTracker.
+    this.storeFileWriterCreationTrackerFactory =
+        storeEngine.requireWritingToTmpDirFirst() ? () -> null
+            : () -> new StoreFileWriterCreationTracker();
     refreshStoreSizeAndTotalBytes();
 
     flushRetriesNumber = conf.getInt(
@@ -290,7 +328,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
         this, memstore.getClass().getSimpleName(), policyName, verifyBulkLoads,
         parallelPutCountPrintThreshold, family.getDataBlockEncoding(),
         family.getCompressionType());
-    cacheOnWriteLogged = false;
   }
 
   private StoreContext initializeStoreContext(ColumnFamilyDescriptor family) throws IOException {
@@ -795,8 +832,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
    * @throws IOException if exception occurs during process
    */
   protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot,
-      MonitoredTask status, ThroughputController throughputController,
-      FlushLifeCycleTracker tracker) throws IOException {
+    MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker,
+    Consumer<Path> writerCreationTracker) throws IOException {
     // If an exception happens flushing, we let it out without clearing
     // the memstore snapshot.  The old snapshot will be returned when we say
     // 'snapshot', the next time flush comes around.
@@ -806,8 +843,13 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
     IOException lastException = null;
     for (int i = 0; i < flushRetriesNumber; i++) {
       try {
-        List<Path> pathNames =
-            flusher.flushSnapshot(snapshot, logCacheFlushId, status, throughputController, tracker);
+        List<Path> pathNames = flusher.flushSnapshot(
+          snapshot,
+          logCacheFlushId,
+          status,
+          throughputController,
+          tracker,
+          writerCreationTracker);
         Path lastPathName = null;
         try {
           for (Path pathName : pathNames) {
@@ -1118,6 +1160,12 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
     ThroughputController throughputController, User user) throws IOException {
     assert compaction != null;
     CompactionRequestImpl cr = compaction.getRequest();
+    StoreFileWriterCreationTracker writerCreationTracker =
+        storeFileWriterCreationTrackerFactory.get();
+    if (writerCreationTracker != null) {
+      cr.setWriterCreationTracker(writerCreationTracker);
+      storeFileWriterCreationTrackers.add(writerCreationTracker);
+    }
     try {
       // Do all sanity checking in here if we have a valid CompactionRequestImpl
       // because we need to clean up after it on the way out in a finally
@@ -1157,18 +1205,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
     }
     replaceStoreFiles(filesToCompact, sfs, true);
 
-    // This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
-    // CleanerChore know that compaction is done and the file can be cleaned up if compaction
-    // have failed.
-    storeEngine.resetCompactionWriter();
-
-    if (cr.isMajor()) {
-      majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs());
-      majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);
-    } else {
-      compactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs());
-      compactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);
-    }
     long outputBytes = getTotalSize(sfs);
 
     // At this point the store will use new files for all new scanners.
@@ -1576,6 +1612,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
     synchronized (filesCompacting) {
       filesCompacting.removeAll(cr.getFiles());
     }
+    // The tracker could be null, for example, we do not need to track the creation of store file
+    // writer due to different implementation of SFT, or the compaction is canceled.
+    if (cr.getWriterCreationTracker() != null) {
+      storeFileWriterCreationTrackers.remove(cr.getWriterCreationTracker());
+    }
   }
 
   /**
@@ -1899,6 +1940,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
   private final class StoreFlusherImpl implements StoreFlushContext {
 
     private final FlushLifeCycleTracker tracker;
+    private final StoreFileWriterCreationTracker writerCreationTracker;
     private final long cacheFlushSeqNum;
     private MemStoreSnapshot snapshot;
     private List<Path> tempFiles;
@@ -1910,6 +1952,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
     private StoreFlusherImpl(long cacheFlushSeqNum, FlushLifeCycleTracker tracker) {
       this.cacheFlushSeqNum = cacheFlushSeqNum;
       this.tracker = tracker;
+      this.writerCreationTracker = storeFileWriterCreationTrackerFactory.get();
     }
 
     /**
@@ -1930,41 +1973,61 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
     public void flushCache(MonitoredTask status) throws IOException {
       RegionServerServices rsService = region.getRegionServerServices();
       ThroughputController throughputController =
-          rsService == null ? null : rsService.getFlushThroughputController();
-      tempFiles =
-          HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController, tracker);
+        rsService == null ? null : rsService.getFlushThroughputController();
+      // it could be null if we do not need to track the creation of store file writer due to
+      // different SFT implementation.
+      if (writerCreationTracker != null) {
+        HStore.this.storeFileWriterCreationTrackers.add(writerCreationTracker);
+      }
+      tempFiles = HStore.this.flushCache(
+        cacheFlushSeqNum,
+        snapshot,
+        status,
+        throughputController,
+        tracker,
+        writerCreationTracker);
     }
 
     @Override
     public boolean commit(MonitoredTask status) throws IOException {
-      if (CollectionUtils.isEmpty(this.tempFiles)) {
-        return false;
-      }
-      status.setStatus("Flushing " + this + ": reopening flushed file");
-      List<HStoreFile> storeFiles = storeEngine.commitStoreFiles(tempFiles, false);
-      for (HStoreFile sf : storeFiles) {
-        StoreFileReader r = sf.getReader();
-        if (LOG.isInfoEnabled()) {
-          LOG.info("Added {}, entries={}, sequenceid={}, filesize={}", sf, r.getEntries(),
-            cacheFlushSeqNum, TraditionalBinaryPrefix.long2String(r.length(), "", 1));
+      try {
+        if (CollectionUtils.isEmpty(this.tempFiles)) {
+          return false;
+        }
+        status.setStatus("Flushing " + this + ": reopening flushed file");
+        List<HStoreFile> storeFiles = storeEngine.commitStoreFiles(tempFiles, false);
+        for (HStoreFile sf : storeFiles) {
+          StoreFileReader r = sf.getReader();
+          if (LOG.isInfoEnabled()) {
+            LOG.info(
+              "Added {}, entries={}, sequenceid={}, filesize={}",
+              sf,
+              r.getEntries(),
+              cacheFlushSeqNum,
+              TraditionalBinaryPrefix.long2String(r.length(), "", 1));
+          }
+          outputFileSize += r.length();
+          storeSize.addAndGet(r.length());
+          totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
+          committedFiles.add(sf.getPath());
         }
-        outputFileSize += r.length();
-        storeSize.addAndGet(r.length());
-        totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
-        committedFiles.add(sf.getPath());
-      }
 
-      flushedCellsCount.addAndGet(cacheFlushCount);
-      flushedCellsSize.addAndGet(cacheFlushSize);
-      flushedOutputFileSize.addAndGet(outputFileSize);
-      // call coprocessor after we have done all the accounting above
-      for (HStoreFile sf : storeFiles) {
-        if (getCoprocessorHost() != null) {
-          getCoprocessorHost().postFlush(HStore.this, sf, tracker);
+        flushedCellsCount.addAndGet(cacheFlushCount);
+        flushedCellsSize.addAndGet(cacheFlushSize);
+        flushedOutputFileSize.addAndGet(outputFileSize);
+        // call coprocessor after we have done all the accounting above
+        for (HStoreFile sf : storeFiles) {
+          if (getCoprocessorHost() != null) {
+            getCoprocessorHost().postFlush(HStore.this, sf, tracker);
+          }
+        }
+        // Add new file to store files. Clear snapshot too while we have the Store write lock.
+        return completeFlush(storeFiles, snapshot.getId());
+      } finally {
+        if (writerCreationTracker != null) {
+          HStore.this.storeFileWriterCreationTrackers.remove(writerCreationTracker);
         }
       }
-      // Add new file to store files. Clear snapshot too while we have the Store write lock.
-      return completeFlush(storeFiles, snapshot.getId());
     }
 
     @Override
@@ -2110,6 +2173,16 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
     return majorCompactedCellsSize.get();
   }
 
+  public void updateCompactedMetrics(boolean isMajor, CompactionProgress progress) {
+    if (isMajor) {
+      majorCompactedCellsCount.addAndGet(progress.getTotalCompactingKVs());
+      majorCompactedCellsSize.addAndGet(progress.totalCompactedSize);
+    } else {
+      compactedCellsCount.addAndGet(progress.getTotalCompactingKVs());
+      compactedCellsSize.addAndGet(progress.totalCompactedSize);
+    }
+  }
+
   /**
    * Returns the StoreEngine that is backing this concrete implementation of Store.
    * @return Returns the {@link StoreEngine} object used internally inside this HStore object.
@@ -2404,4 +2477,15 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
       mixedRowReadsCount.increment();
     }
   }
+
+  /**
+   * Return the storefiles which are currently being written to. Mainly used by
+   * {@link BrokenStoreFileCleaner} to prevent deleting the these files as they are not present in
+   * SFT yet.
+   */
+  Set<Path> getStoreFilesBeingWritten() {
+    return storeFileWriterCreationTrackers.stream()
+      .flatMap(t -> t.get().stream())
+      .collect(Collectors.toSet());
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
index d85553ac808..847187074df 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
@@ -42,11 +42,9 @@ import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
 import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
 import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
 import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
-import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -94,7 +92,7 @@ import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUti
  */
 @InterfaceAudience.Private
 public abstract class StoreEngine<SF extends StoreFlusher, CP extends CompactionPolicy,
-  C extends Compactor, SFM extends StoreFileManager> {
+  C extends Compactor<?>, SFM extends StoreFileManager> {
 
   private static final Logger LOG = LoggerFactory.getLogger(StoreEngine.class);
 
@@ -157,7 +155,7 @@ public abstract class StoreEngine<SF extends StoreFlusher, CP extends Compaction
   /**
    * @return Compactor to use.
    */
-  public Compactor getCompactor() {
+  public Compactor<?> getCompactor() {
     return this.compactor;
   }
 
@@ -544,17 +542,6 @@ public abstract class StoreEngine<SF extends StoreFlusher, CP extends Compaction
     return storeFileTracker.requireWritingToTmpDirFirst();
   }
 
-  /**
-   * Resets the compaction writer when the new file is committed and used as active storefile.
-   * This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
-   * CleanerChore know that compaction is done and the file can be cleaned up if compaction
-   * have failed. Currently called in
-   * @see HStore#doCompaction(CompactionRequestImpl, Collection, User, long, List)
-   */
-  public void resetCompactionWriter(){
-    compactor.resetWriter();
-  }
-
   @RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
     allowedOnPath = ".*/TestHStore.java")
   ReadWriteLock getLock() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
index a4084cbd379..829028eec63 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
@@ -34,6 +34,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -423,9 +424,14 @@ public class StoreFileWriter implements CellSink, ShipperListener {
     private boolean shouldDropCacheBehind;
     private Supplier<Collection<HStoreFile>> compactedFilesSupplier = () -> Collections.emptySet();
     private String fileStoragePolicy;
-
-    public Builder(Configuration conf, CacheConfig cacheConf,
-        FileSystem fs) {
+    // this is used to track the creation of the StoreFileWriter, mainly used for the SFT
+    // implementation where we will write store files directly to the final place, instead of
+    // writing a tmp file first. Under this scenario, we will have a background task to purge the
+    // store files which are not recorded in the SFT, but for the newly created store file writer,
+    // they are not tracked in SFT, so here we need to record them and treat them specially.
+    private Consumer<Path> writerCreationTracker;
+
+    public Builder(Configuration conf, CacheConfig cacheConf, FileSystem fs) {
       this.conf = conf;
       this.cacheConf = cacheConf;
       this.fs = fs;
@@ -509,6 +515,11 @@ public class StoreFileWriter implements CellSink, ShipperListener {
       return this;
     }
 
+    public Builder withWriterCreationTracker(Consumer<Path> writerCreationTracker) {
+      this.writerCreationTracker = writerCreationTracker;
+      return this;
+    }
+
     /**
      * Create a store file writer. Client is responsible for closing file when
      * done. If metadata, add BEFORE closing using
@@ -557,8 +568,21 @@ public class StoreFileWriter implements CellSink, ShipperListener {
           bloomType = BloomType.NONE;
         }
       }
-
-      return new StoreFileWriter(fs, filePath, conf, cacheConf, bloomType, maxKeyCount,
+      // make sure we call this before actually create the writer
+      // in fact, it is not a big deal to even add an inexistent file to the track, as we will never
+      // try to delete it and finally we will clean the tracker up after compaction. But if the file
+      // cleaner find the file but we haven't recorded it yet, it may accidentally delete the file
+      // and cause problem.
+      if (writerCreationTracker != null) {
+        writerCreationTracker.accept(filePath);
+      }
+      return new StoreFileWriter(
+        fs,
+        filePath,
+        conf,
+        cacheConf,
+        bloomType,
+        maxKeyCount,
           favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier);
     }
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
index 1095854273a..d6461f7729a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.List;
-
+import java.util.function.Consumer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
@@ -55,8 +55,8 @@ abstract class StoreFlusher {
    * @return List of files written. Can be empty; must not be null.
    */
   public abstract List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
-      MonitoredTask status, ThroughputController throughputController,
-      FlushLifeCycleTracker tracker) throws IOException;
+    MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker,
+    Consumer<Path> writerCreationTracker) throws IOException;
 
   protected void finalizeWriter(StoreFileWriter writer, long cacheFlushSeqNum,
       MonitoredTask status) throws IOException {
@@ -69,13 +69,17 @@ abstract class StoreFlusher {
     writer.close();
   }
 
-  protected final StoreFileWriter createWriter(MemStoreSnapshot snapshot, boolean alwaysIncludesTag)
-    throws IOException {
+  protected final StoreFileWriter createWriter(MemStoreSnapshot snapshot, boolean alwaysIncludesTag,
+    Consumer<Path> writerCreationTracker) throws IOException {
     return store.getStoreEngine()
-      .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(snapshot.getCellsCount())
-        .compression(store.getColumnFamilyDescriptor().getCompressionType()).isCompaction(false)
-        .includeMVCCReadpoint(true).includesTag(alwaysIncludesTag || snapshot.isTagsPresent())
-        .shouldDropBehind(false));
+      .createWriter(
+        CreateStoreFileWriterParams.create()
+          .maxKeyCount(snapshot.getCellsCount())
+          .compression(store.getColumnFamilyDescriptor().getCompressionType())
+          .isCompaction(false)
+          .includeMVCCReadpoint(true)
+          .includesTag(alwaysIncludesTag || snapshot.isTagsPresent())
+          .shouldDropBehind(false).writerCreationTracker(writerCreationTracker));
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java
index a4e943ac8b0..fc0598d89ac 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java
@@ -58,7 +58,7 @@ public abstract class StripeMultiFileWriter extends AbstractMultiFileWriter {
   }
 
   @Override
-  public Collection<StoreFileWriter> writers() {
+  protected Collection<StoreFileWriter> writers() {
     return existingWriters;
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
index f8183b7645a..fb9115e01ec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
@@ -23,7 +23,7 @@ import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_K
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-
+import java.util.function.Consumer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellComparator;
@@ -54,11 +54,14 @@ public class StripeStoreFlusher extends StoreFlusher {
 
   @Override
   public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
-      MonitoredTask status, ThroughputController throughputController,
-      FlushLifeCycleTracker tracker) throws IOException {
+    MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker,
+    Consumer<Path> writerCreationTracker) throws IOException {
     List<Path> result = new ArrayList<>();
     int cellsCount = snapshot.getCellsCount();
-    if (cellsCount == 0) return result; // don't flush if there are no entries
+    if (cellsCount == 0) {
+      // don't flush if there are no entries
+      return result;
+    }
 
     InternalScanner scanner = createScanner(snapshot.getScanners(), tracker);
 
@@ -70,8 +73,9 @@ public class StripeStoreFlusher extends StoreFlusher {
     StripeMultiFileWriter mw = null;
     try {
       mw = req.createWriter(); // Writer according to the policy.
-      StripeMultiFileWriter.WriterFactory factory = createWriterFactory(snapshot);
-      StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
+      StripeMultiFileWriter.WriterFactory factory =
+        createWriterFactory(snapshot, writerCreationTracker);
+      StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner) scanner : null;
       mw.init(storeScanner, factory);
 
       synchronized (flushLock) {
@@ -98,12 +102,13 @@ public class StripeStoreFlusher extends StoreFlusher {
     return result;
   }
 
-  private StripeMultiFileWriter.WriterFactory createWriterFactory(MemStoreSnapshot snapshot) {
+  private StripeMultiFileWriter.WriterFactory createWriterFactory(MemStoreSnapshot snapshot,
+    Consumer<Path> writerCreationTracker) {
     return new StripeMultiFileWriter.WriterFactory() {
       @Override
       public StoreFileWriter createWriter() throws IOException {
         // XXX: it used to always pass true for includesTag, re-consider?
-        return StripeStoreFlusher.this.createWriter(snapshot, true);
+        return StripeStoreFlusher.this.createWriter(snapshot, true, writerCreationTracker);
       }
     };
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
index 19b7a98627e..23d16934b65 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
@@ -18,7 +18,7 @@
 package org.apache.hadoop.hbase.regionserver.compactions;
 
 import java.io.IOException;
-
+import java.util.function.Consumer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -46,19 +46,21 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
     super(conf, store);
   }
 
-  protected void initMultiWriter(AbstractMultiFileWriter writer, InternalScanner scanner,
-      final FileDetails fd, final boolean shouldDropBehind, boolean major) {
+  protected final void initMultiWriter(AbstractMultiFileWriter writer, InternalScanner scanner,
+    final FileDetails fd, final boolean shouldDropBehind, boolean major,
+    Consumer<Path> writerCreationTracker) {
     WriterFactory writerFactory = new WriterFactory() {
       @Override
       public StoreFileWriter createWriter() throws IOException {
-        return AbstractMultiOutputCompactor.this.createWriter(fd, shouldDropBehind, major);
+        return AbstractMultiOutputCompactor.this
+          .createWriter(fd, shouldDropBehind, major, writerCreationTracker);
       }
 
       @Override
       public StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy)
         throws IOException {
-        return AbstractMultiOutputCompactor.this.createWriter(fd, shouldDropBehind,
-          fileStoragePolicy, major);
+        return AbstractMultiOutputCompactor.this
+          .createWriter(fd, shouldDropBehind, fileStoragePolicy, major, writerCreationTracker);
       }
     };
     // Prepare multi-writer, and perform the compaction using scanner and writer.
@@ -68,7 +70,7 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
   }
 
   @Override
-  protected void abortWriter() throws IOException {
+  protected void abortWriter(AbstractMultiFileWriter writer) throws IOException {
     FileSystem fs = store.getFileSystem();
     for (Path leftoverFile : writer.abortWriters()) {
       try {
@@ -79,7 +81,6 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
           e);
       }
     }
-    //this step signals that the target file is no longer writen and can be cleaned up
-    writer = null;
   }
+
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java
index 942cc4f3fd6..2ccdd150cd2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java
@@ -37,7 +37,7 @@ public class CompactionProgress {
   private static final Logger LOG = LoggerFactory.getLogger(CompactionProgress.class);
 
   /** the total compacting key values in currently running compaction */
-  private long totalCompactingKVs;
+  public long totalCompactingKVs;
   /** the completed count of key values in currently running compaction */
   public long currentCompactedKVs = 0;
   /** the total size of data processed by the currently running compaction, in bytes */
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequestImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequestImpl.java
index 899219d70b2..5d8285aecdb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequestImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequestImpl.java
@@ -22,8 +22,9 @@ import static org.apache.hadoop.hbase.regionserver.Store.NO_PRIORITY;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
-
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.regionserver.HStoreFile;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
@@ -51,6 +52,7 @@ public class CompactionRequestImpl implements CompactionRequest {
   private String storeName = "";
   private long totalSize = -1L;
   private CompactionLifeCycleTracker tracker = CompactionLifeCycleTracker.DUMMY;
+  private Consumer<Path> writerCreationTracker;
 
   public CompactionRequestImpl(Collection<HStoreFile> files) {
     this.selectionTime = EnvironmentEdgeManager.currentTime();
@@ -137,6 +139,14 @@ public class CompactionRequestImpl implements CompactionRequest {
     return tracker;
   }
 
+  public Consumer<Path> getWriterCreationTracker() {
+    return writerCreationTracker;
+  }
+
+  public void setWriterCreationTracker(Consumer<Path> writerCreationTracker) {
+    this.writerCreationTracker = writerCreationTracker;
+  }
+
   public boolean isAfterSplit() {
     return isAfterSplit;
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index d934ecb0c16..5ac64823ae0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -25,12 +25,13 @@ import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELET
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.Set;
+import java.util.function.Consumer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
@@ -39,7 +40,6 @@ import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileInfo;
-import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter;
 import org.apache.hadoop.hbase.regionserver.CellSink;
 import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
 import org.apache.hadoop.hbase.regionserver.HStore;
@@ -70,15 +70,18 @@ import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
 /**
  * A compactor is a compaction algorithm associated a given policy. Base class also contains
  * reusable parts for implementing compactors (what is common and what isn't is evolving).
+ * <p>
+ * Compactions might be concurrent against a given store and the Compactor is shared among
+ * them. Do not put mutable state into class fields. All Compactor class fields should be
+ * final or effectively final.
+ * 'keepSeqIdPeriod' is an exception to this rule because unit tests may set it.
  */
 @InterfaceAudience.Private
 public abstract class Compactor<T extends CellSink> {
   private static final Logger LOG = LoggerFactory.getLogger(Compactor.class);
   protected static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000;
-  protected volatile CompactionProgress progress;
   protected final Configuration conf;
   protected final HStore store;
-
   protected final int compactionKVMax;
   protected final Compression.Algorithm majorCompactionCompression;
   protected final Compression.Algorithm minorCompactionCompression;
@@ -92,15 +95,15 @@ public abstract class Compactor<T extends CellSink> {
   protected static final String MINOR_COMPACTION_DROP_CACHE =
       "hbase.regionserver.minorcompaction.pagecache.drop";
 
-  private final boolean dropCacheMajor;
-  private final boolean dropCacheMinor;
+  protected final boolean dropCacheMajor;
+  protected final boolean dropCacheMinor;
 
-  // In compaction process only a single thread will access and write to this field, and
-  // getCompactionTargets is the only place we will access it other than the compaction thread, so
-  // make it volatile.
-  protected volatile T writer = null;
+  // We track progress per request using the CompactionRequestImpl identity as key.
+  // completeCompaction() cleans up this state.
+  private final Set<CompactionProgress> progressSet =
+    Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>()));
 
-  //TODO: depending on Store is not good but, realistically, all compactors currently do.
+  // TODO: depending on Store is not good but, realistically, all compactors currently do.
   Compactor(Configuration conf, HStore store) {
     this.conf = conf;
     this.store = store;
@@ -116,15 +119,9 @@ public abstract class Compactor<T extends CellSink> {
     this.dropCacheMinor = conf.getBoolean(MINOR_COMPACTION_DROP_CACHE, true);
   }
 
-
-
   protected interface CellSinkFactory<S> {
-    S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind, boolean major)
-        throws IOException;
-  }
-
-  public CompactionProgress getProgress() {
-    return this.progress;
+    S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind, boolean major,
+      Consumer<Path> writerCreationTracker) throws IOException;
   }
 
   /** The sole reason this class exists is that java has no ref/out/pointer parameters. */
@@ -271,12 +268,13 @@ public abstract class Compactor<T extends CellSink> {
   };
 
   protected final CreateStoreFileWriterParams createParams(FileDetails fd, boolean shouldDropBehind,
-    boolean major) {
+    boolean major, Consumer<Path> writerCreationTracker) {
     return CreateStoreFileWriterParams.create().maxKeyCount(fd.maxKeyCount)
       .compression(major ? majorCompactionCompression : minorCompactionCompression)
       .isCompaction(true).includeMVCCReadpoint(fd.maxMVCCReadpoint > 0)
       .includesTag(fd.maxTagsLength > 0).shouldDropBehind(shouldDropBehind)
-      .totalCompactedFilesSize(fd.totalCompactedFilesSize);
+      .totalCompactedFilesSize(fd.totalCompactedFilesSize)
+      .writerCreationTracker(writerCreationTracker);
   }
 
   /**
@@ -286,16 +284,20 @@ public abstract class Compactor<T extends CellSink> {
    * @throws IOException if creation failed
    */
   protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind,
-    boolean major) throws IOException {
+    boolean major, Consumer<Path> writerCreationTracker) throws IOException {
     // When all MVCC readpoints are 0, don't write them.
     // See HBASE-8166, HBASE-12600, and HBASE-13389.
-    return store.getStoreEngine().createWriter(createParams(fd, shouldDropBehind, major));
+    return store.getStoreEngine()
+      .createWriter(createParams(fd, shouldDropBehind, major, writerCreationTracker));
   }
 
   protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind,
-    String fileStoragePolicy, boolean major) throws IOException {
+    String fileStoragePolicy, boolean major, Consumer<Path> writerCreationTracker)
+    throws IOException {
     return store.getStoreEngine()
-      .createWriter(createParams(fd, shouldDropBehind, major).fileStoragePolicy(fileStoragePolicy));
+      .createWriter(
+        createParams(fd, shouldDropBehind, major, writerCreationTracker)
+          .fileStoragePolicy(fileStoragePolicy));
   }
 
   private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,
@@ -327,7 +329,6 @@ public abstract class Compactor<T extends CellSink> {
       InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory,
       ThroughputController throughputController, User user) throws IOException {
     FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles(), request.isMajor());
-    this.progress = new CompactionProgress(fd.maxKeyCount);
 
     // Find the smallest read point across all the Scanners.
     long smallestReadPoint = getSmallestReadPoint();
@@ -343,6 +344,9 @@ public abstract class Compactor<T extends CellSink> {
     boolean finished = false;
     List<StoreFileScanner> scanners =
       createFileScanners(request.getFiles(), smallestReadPoint, dropCache);
+    T writer = null;
+    CompactionProgress progress = new CompactionProgress(fd.maxKeyCount);
+    progressSet.add(progress);
     try {
       /* Include deletes, unless we are doing a major compaction */
       ScanType scanType = scannerFactory.getScanType(request);
@@ -355,14 +359,14 @@ public abstract class Compactor<T extends CellSink> {
         smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
         cleanSeqId = true;
       }
-      if (writer != null){
-        LOG.warn("Writer exists when it should not: " + getCompactionTargets().stream()
-          .map(n -> n.toString())
-          .collect(Collectors.joining(", ", "{ ", " }")));
-      }
-      writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor());
-      finished = performCompaction(fd, scanner, smallestReadPoint, cleanSeqId,
-        throughputController, request.isAllFiles(), request.getFiles().size());
+      writer = sinkFactory.createWriter(
+        scanner,
+        fd,
+        dropCache,
+        request.isMajor(),
+        request.getWriterCreationTracker());
+      finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
+        throughputController, request.isAllFiles(), request.getFiles().size(), progress);
       if (!finished) {
         throw new InterruptedIOException("Aborting compaction of store " + store + " in region "
             + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted.");
@@ -380,34 +384,41 @@ public abstract class Compactor<T extends CellSink> {
       } else {
         Closeables.close(scanner, true);
       }
-      if (!finished && writer != null) {
-        abortWriter();
+      if (!finished) {
+        if (writer != null) {
+          abortWriter(writer);
+        }
+      } else {
+        store.updateCompactedMetrics(request.isMajor(), progress);
       }
+      progressSet.remove(progress);
     }
     assert finished : "We should have exited the method on all error paths";
     assert writer != null : "Writer should be non-null if no error";
-    return commitWriter(fd, request);
+    return commitWriter(writer, fd, request);
   }
 
-  protected abstract List<Path> commitWriter(FileDetails fd,
+  protected abstract List<Path> commitWriter(T writer, FileDetails fd,
       CompactionRequestImpl request) throws IOException;
 
-  protected abstract void abortWriter() throws IOException;
+  protected abstract void abortWriter(T writer) throws IOException;
 
   /**
    * Performs the compaction.
    * @param fd FileDetails of cell sink writer
    * @param scanner Where to read from.
+   * @param writer Where to write to.
    * @param smallestReadPoint Smallest read point.
    * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is &lt;=
    *          smallestReadPoint
    * @param major Is a major compaction.
    * @param numofFilesToCompact the number of files to compact
+   * @param progress Progress reporter.
    * @return Whether compaction ended; false if it was interrupted for some reason.
    */
-  protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
+  protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
       long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
-      boolean major, int numofFilesToCompact) throws IOException {
+      boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException {
     assert writer instanceof ShipperListener;
     long bytesWrittenProgressForLog = 0;
     long bytesWrittenProgressForShippedCall = 0;
@@ -549,22 +560,27 @@ public abstract class Compactor<T extends CellSink> {
         dropDeletesFromRow, dropDeletesToRow);
   }
 
-  public List<Path> getCompactionTargets() {
-    T writer = this.writer;
-    if (writer == null) {
-      return Collections.emptyList();
-    }
-    if (writer instanceof StoreFileWriter) {
-      return Arrays.asList(((StoreFileWriter) writer).getPath());
+  /**
+   * Return the aggregate progress for all currently active compactions.
+   */
+  public CompactionProgress getProgress() {
+    synchronized (progressSet) {
+      long totalCompactingKVs = 0;
+      long currentCompactedKVs = 0;
+      long totalCompactedSize = 0;
+      for (CompactionProgress progress: progressSet) {
+        totalCompactingKVs += progress.totalCompactingKVs;
+        currentCompactedKVs += progress.currentCompactedKVs;
+        totalCompactedSize += progress.totalCompactedSize;
+      }
+      CompactionProgress result = new CompactionProgress(totalCompactingKVs);
+      result.currentCompactedKVs = currentCompactedKVs;
+      result.totalCompactedSize = totalCompactedSize;
+      return result;
     }
-    return ((AbstractMultiFileWriter) writer).writers().stream().map(sfw -> sfw.getPath())
-      .collect(Collectors.toList());
   }
 
-  /**
-   * Reset the Writer when the new storefiles were successfully added
-   */
-  public void resetWriter(){
-    writer = null;
+  public boolean isCompacting() {
+    return !progressSet.isEmpty();
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
index 43e037c5e70..c8c10e16ff1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
@@ -21,7 +21,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.OptionalLong;
-
+import java.util.function.Consumer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.regionserver.DateTieredMultiFileWriter;
@@ -68,21 +68,26 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTiered
 
         @Override
         public DateTieredMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
-            boolean shouldDropBehind, boolean major) throws IOException {
-          DateTieredMultiFileWriter writer = new DateTieredMultiFileWriter(lowerBoundaries,
-              lowerBoundariesPolicies,
-              needEmptyFile(request));
-          initMultiWriter(writer, scanner, fd, shouldDropBehind, major);
+          boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker)
+          throws IOException {
+          DateTieredMultiFileWriter writer = new DateTieredMultiFileWriter(
+            lowerBoundaries,
+            lowerBoundariesPolicies,
+            needEmptyFile(request));
+          initMultiWriter(writer, scanner, fd, shouldDropBehind, major, writerCreationTracker);
           return writer;
         }
-      }, throughputController, user);
+      },
+      throughputController,
+      user);
   }
 
   @Override
-  protected List<Path> commitWriter(FileDetails fd,
+  protected List<Path> commitWriter(DateTieredMultiFileWriter writer, FileDetails fd,
       CompactionRequestImpl request) throws IOException {
     List<Path> pathList =
       writer.commitWriters(fd.maxSeqId, request.isAllFiles(), request.getFiles());
     return pathList;
   }
+
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
index 03e3a1b5f39..0e91d8870b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver.compactions;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.function.Consumer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.regionserver.HStore;
@@ -47,10 +48,11 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
   private final CellSinkFactory<StoreFileWriter> writerFactory =
     new CellSinkFactory<StoreFileWriter>() {
       @Override
-      public StoreFileWriter createWriter(InternalScanner scanner,
-        org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
-        boolean shouldDropBehind, boolean major) throws IOException {
-        return DefaultCompactor.this.createWriter(fd, shouldDropBehind, major);
+      public StoreFileWriter createWriter(InternalScanner scanner, FileDetails fd,
+        boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker)
+        throws IOException {
+        return DefaultCompactor.this
+          .createWriter(fd, shouldDropBehind, major, writerCreationTracker);
       }
     };
 
@@ -63,7 +65,7 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
   }
 
   @Override
-  protected List<Path> commitWriter(FileDetails fd,
+  protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
       CompactionRequestImpl request) throws IOException {
     List<Path> newFiles = Lists.newArrayList(writer.getPath());
     writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());
@@ -72,12 +74,6 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
   }
 
   @Override
-  protected void abortWriter() throws IOException {
-    abortWriter(writer);
-    // this step signals that the target file is no longer written and can be cleaned up
-    writer = null;
-  }
-
   protected final void abortWriter(StoreFileWriter writer) throws IOException {
     Path leftoverFile = writer.getPath();
     try {
@@ -92,4 +88,5 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
         leftoverFile, e);
     }
   }
+
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
index 060a11b41fe..6413a304d55 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hbase.regionserver.compactions;
 
 import java.io.IOException;
 import java.util.List;
-
+import java.util.function.Consumer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.regionserver.HStore;
@@ -88,18 +88,26 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
       }
       LOG.debug(sb.toString());
     }
-    return compact(request, new StripeInternalScannerFactory(majorRangeFromRow, majorRangeToRow),
+    return compact(
+      request,
+      new StripeInternalScannerFactory(majorRangeFromRow, majorRangeToRow),
       new CellSinkFactory<StripeMultiFileWriter>() {
 
         @Override
         public StripeMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
-            boolean shouldDropBehind, boolean major) throws IOException {
+          boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker)
+          throws IOException {
           StripeMultiFileWriter writer = new StripeMultiFileWriter.BoundaryMultiWriter(
-              store.getComparator(), targetBoundaries, majorRangeFromRow, majorRangeToRow);
-          initMultiWriter(writer, scanner, fd, shouldDropBehind, major);
+            store.getComparator(),
+            targetBoundaries,
+            majorRangeFromRow,
+            majorRangeToRow);
+          initMultiWriter(writer, scanner, fd, shouldDropBehind, major, writerCreationTracker);
           return writer;
         }
-      }, throughputController, user);
+      },
+      throughputController,
+      user);
   }
 
   public List<Path> compact(CompactionRequestImpl request, final int targetCount, final long targetSize,
@@ -115,20 +123,28 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
 
         @Override
         public StripeMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
-            boolean shouldDropBehind, boolean major) throws IOException {
+          boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker)
+          throws IOException {
           StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter(
-              store.getComparator(), targetCount, targetSize, left, right);
-          initMultiWriter(writer, scanner, fd, shouldDropBehind, major);
+            store.getComparator(),
+            targetCount,
+            targetSize,
+            left,
+            right);
+          initMultiWriter(writer, scanner, fd, shouldDropBehind, major, writerCreationTracker);
           return writer;
         }
-      }, throughputController, user);
+      },
+      throughputController,
+      user);
   }
 
   @Override
-  protected List<Path> commitWriter(FileDetails fd,
+  protected List<Path> commitWriter(StripeMultiFileWriter writer, FileDetails fd,
       CompactionRequestImpl request) throws IOException {
     List<Path> newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor(), request.getFiles());
     assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata.";
     return newFiles;
   }
+
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java
index 1bf354f00a0..f3e62670796 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java
@@ -177,11 +177,15 @@ abstract class StoreFileTrackerBase implements StoreFileTracker {
     }
     StoreFileWriter.Builder builder =
       new StoreFileWriter.Builder(conf, writerCacheConf, ctx.getRegionFileSystem().getFileSystem())
-        .withOutputDir(outputDir).withBloomType(ctx.getBloomFilterType())
-        .withMaxKeyCount(params.maxKeyCount()).withFavoredNodes(ctx.getFavoredNodes())
-        .withFileContext(hFileContext).withShouldDropCacheBehind(params.shouldDropBehind())
+        .withOutputDir(outputDir)
+        .withBloomType(ctx.getBloomFilterType())
+        .withMaxKeyCount(params.maxKeyCount())
+        .withFavoredNodes(ctx.getFavoredNodes())
+        .withFileContext(hFileContext)
+        .withShouldDropCacheBehind(params.shouldDropBehind())
         .withCompactedFilesSupplier(ctx.getCompactedFilesSupplier())
-        .withFileStoragePolicy(params.fileStoragePolicy());
+        .withFileStoragePolicy(params.fileStoragePolicy())
+        .withWriterCreationTracker(params.writerCreationTracker());
     return builder.build();
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java
index 1b76c52cd6e..eaf40616711 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java
@@ -128,13 +128,14 @@ public class TestCompactorMemLeak {
     }
 
     @Override
-    protected List<Path> commitWriter(FileDetails fd,
+    protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
         CompactionRequestImpl request) throws IOException {
       HFileWriterImpl writerImpl = (HFileWriterImpl) writer.writer;
       Cell cell = writerImpl.getLastCell();
       // The cell should be backend with an KeyOnlyKeyValue.
       IS_LAST_CELL_ON_HEAP.set(cell instanceof KeyOnlyKeyValue);
-      return super.commitWriter(fd, request);
+      return super.commitWriter(writer, fd, request);
     }
   }
+
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index ea7ff61986b..80dbe28af3d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -54,6 +54,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
 import java.util.function.IntBinaryOperator;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -97,6 +98,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.nio.RefCnt;
 import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
 import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
@@ -2491,9 +2493,10 @@ public class TestHStore {
     @Override
     public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
         MonitoredTask status, ThroughputController throughputController,
-        FlushLifeCycleTracker tracker) throws IOException {
+        FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException {
       counter.incrementAndGet();
-      return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker);
+      return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker,
+        writerCreationTracker);
     }
 
     @Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
index dc699a62c0c..b9cf81f51df 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
@@ -23,7 +23,6 @@ import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
 import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
@@ -54,7 +53,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
 import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -211,26 +209,9 @@ public class TestMajorCompaction {
     Result result = r.get(new Get(STARTROW).addFamily(COLUMN_FAMILY_TEXT).readVersions(100));
     assertEquals(compactionThreshold, result.size());
 
-    // see if CompactionProgress is in place but null
-    for (HStore store : r.getStores()) {
-      assertNull(store.getCompactionProgress());
-    }
-
     r.flush(true);
     r.compact(true);
 
-    // see if CompactionProgress has done its thing on at least one store
-    int storeCount = 0;
-    for (HStore store : r.getStores()) {
-      CompactionProgress progress = store.getCompactionProgress();
-      if (progress != null) {
-        ++storeCount;
-        assertTrue(progress.currentCompactedKVs > 0);
-        assertTrue(progress.getTotalCompactingKVs() > 0);
-      }
-      assertTrue(storeCount > 0);
-    }
-
     // look at the second row
     // Increment the least significant character so we get to next row.
     byte[] secondRowBytes = START_KEY_BYTES.clone();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
index 8738b9a554f..afff1a68389 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
@@ -92,7 +92,6 @@ import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@@ -118,11 +117,9 @@ import org.junit.rules.TestName;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
 import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
 import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
-
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
@@ -621,17 +618,11 @@ public class TestSplitTransactionOnCluster {
     assertEquals(1, region.getStores().size());
     HStore store = region.getStores().get(0);
     while (store.hasReferences()) {
-      // Wait on any current compaction to complete first.
-      CompactionProgress progress = store.getCompactionProgress();
-      if (progress != null && progress.getProgressPct() < 1.0f) {
-        while (progress.getProgressPct() < 1.0f) {
-          LOG.info("Waiting, progress={}", progress.getProgressPct());
-          Threads.sleep(1000);
-        }
-      } else {
-        // Run new compaction. Shoudn't be any others running.
-        region.compact(true);
+      while (store.storeEngine.getCompactor().isCompacting()) {
+        Threads.sleep(100);
       }
+      // Run new compaction. Shoudn't be any others running.
+      region.compact(true);
       store.closeAndArchiveCompactedFiles();
     }
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
index fd5404c4242..0f18b16fe48 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
@@ -43,6 +43,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -67,8 +68,6 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
@@ -654,11 +653,12 @@ public abstract class AbstractTestWALReplay {
     @Override
     public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
         MonitoredTask status, ThroughputController throughputController,
-        FlushLifeCycleTracker tracker) throws IOException {
+        FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException {
       if (throwExceptionWhenFlushing.get()) {
         throw new IOException("Simulated exception by tests");
       }
-      return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker);
+      return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker,
+        writerCreationTracker);
     }
   };