You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2022/01/01 15:56:12 UTC

[hbase] 13/15: HBASE-26271 Cleanup the broken store files under data directory (#3786)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-26067-branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 951e6dabb09f6b0bfef8c9f5b1e68a3e9ba7207a
Author: BukrosSzabolcs <sz...@cloudera.com>
AuthorDate: Tue Nov 9 17:19:00 2021 +0100

    HBASE-26271 Cleanup the broken store files under data directory (#3786)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
    Signed-off-by: Josh Elser <el...@apache.org>
    Signed-off-by: Wellington Ramos Chevreuil <wc...@apache.org>
---
 .../hadoop/hbase/mob/DefaultMobStoreCompactor.java |  16 +-
 .../regionserver/AbstractMultiFileWriter.java      |   6 +-
 .../hbase/regionserver/BrokenStoreFileCleaner.java | 202 ++++++++++++++++++
 .../regionserver/DateTieredMultiFileWriter.java    |   2 +-
 .../hbase/regionserver/HRegionFileSystem.java      |   2 +-
 .../hadoop/hbase/regionserver/HRegionServer.java   |  26 +++
 .../apache/hadoop/hbase/regionserver/HStore.java   |   6 +
 .../hadoop/hbase/regionserver/StoreEngine.java     |  21 ++
 .../hbase/regionserver/StripeMultiFileWriter.java  |   2 +-
 .../compactions/AbstractMultiOutputCompactor.java  |   4 +-
 .../hbase/regionserver/compactions/Compactor.java  |  45 +++-
 .../compactions/DateTieredCompactor.java           |   6 +-
 .../regionserver/compactions/DefaultCompactor.java |   9 +-
 .../regionserver/compactions/StripeCompactor.java  |   2 +-
 .../FileBasedStoreFileTracker.java                 |   2 +-
 .../MigrationStoreFileTracker.java                 |   2 +-
 .../storefiletracker/StoreFileTracker.java         |   6 +
 .../storefiletracker/StoreFileTrackerBase.java     |   6 -
 .../hbase/snapshot/RestoreSnapshotHelper.java      |   2 +-
 .../regionserver/TestBrokenStoreFileCleaner.java   | 226 +++++++++++++++++++++
 .../hbase/regionserver/TestCompactorMemLeak.java   |   4 +-
 .../storefiletracker/TestStoreFileTracker.java     |   1 -
 22 files changed, 566 insertions(+), 32 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
index 1b218b4..dd800f9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.PrivateCellUtil;
-import org.apache.hadoop.hbase.regionserver.CellSink;
 import org.apache.hadoop.hbase.regionserver.HMobStore;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
@@ -52,6 +51,8 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
 /**
  * Compact passed set of files in the mob-enabled column family.
  */
@@ -154,7 +155,6 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
    * the scanner to filter the deleted cells.
    * @param fd File details
    * @param scanner Where to read from.
-   * @param writer Where to write to.
    * @param smallestReadPoint Smallest read point.
    * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
    * @param throughputController The compaction throughput controller.
@@ -163,7 +163,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
    * @return Whether compaction ended; false if it was interrupted for any reason.
    */
   @Override
-  protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
+  protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
       long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
       boolean major, int numofFilesToCompact) throws IOException {
     long bytesWrittenProgressForCloseCheck = 0;
@@ -369,4 +369,14 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
     progress.complete();
     return true;
   }
+
+
+  @Override
+  protected List<Path> commitWriter(FileDetails fd,
+      CompactionRequestImpl request) throws IOException {
+    List<Path> newFiles = Lists.newArrayList(writer.getPath());
+    writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());
+    writer.close();
+    return newFiles;
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
index f250304..82c3867 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
@@ -110,7 +110,11 @@ public abstract class AbstractMultiFileWriter implements CellSink, ShipperListen
     return paths;
   }
 
-  protected abstract Collection<StoreFileWriter> writers();
+  /**
+   * Returns all writers. This is used to prevent deleting currently writen storefiles
+   * during cleanup.
+   */
+  public abstract Collection<StoreFileWriter> writers();
 
   /**
    * Subclasses override this method to be called at the end of a successful sequence of append; all
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java
new file mode 100644
index 0000000..0c4807d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This Chore, every time it runs, will clear the unsused HFiles in the data
+ * folder.
+ */
+@InterfaceAudience.Private
+public class BrokenStoreFileCleaner extends ScheduledChore {
+  private static final Logger LOG = LoggerFactory.getLogger(BrokenStoreFileCleaner.class);
+  public static final String BROKEN_STOREFILE_CLEANER_ENABLED =
+      "hbase.region.broken.storefilecleaner.enabled";
+  public static final boolean DEFAULT_BROKEN_STOREFILE_CLEANER_ENABLED = false;
+  public static final String BROKEN_STOREFILE_CLEANER_TTL =
+      "hbase.region.broken.storefilecleaner.ttl";
+  public static final long DEFAULT_BROKEN_STOREFILE_CLEANER_TTL = 1000 * 60 * 60 * 12; //12h
+  public static final String BROKEN_STOREFILE_CLEANER_DELAY =
+      "hbase.region.broken.storefilecleaner.delay";
+  public static final int DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY = 1000 * 60 * 60 * 2; //2h
+  public static final String BROKEN_STOREFILE_CLEANER_DELAY_JITTER =
+      "hbase.region.broken.storefilecleaner.delay.jitter";
+  public static final double DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER = 0.25D;
+  public static final String BROKEN_STOREFILE_CLEANER_PERIOD =
+      "hbase.region.broken.storefilecleaner.period";
+  public static final int DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD = 1000 * 60 * 60 * 6; //6h
+
+  private HRegionServer regionServer;
+  private final AtomicBoolean enabled = new AtomicBoolean(true);
+  private long fileTtl;
+
+  public BrokenStoreFileCleaner(final int delay, final int period, final Stoppable stopper,
+    Configuration conf, HRegionServer regionServer) {
+    super("BrokenStoreFileCleaner", stopper, period, delay);
+    this.regionServer = regionServer;
+    setEnabled(
+      conf.getBoolean(BROKEN_STOREFILE_CLEANER_ENABLED, DEFAULT_BROKEN_STOREFILE_CLEANER_ENABLED));
+    fileTtl = conf.getLong(BROKEN_STOREFILE_CLEANER_TTL, DEFAULT_BROKEN_STOREFILE_CLEANER_TTL);
+  }
+
+  public boolean setEnabled(final boolean enabled) {
+    return this.enabled.getAndSet(enabled);
+  }
+
+  public boolean getEnabled() {
+    return this.enabled.get();
+  }
+
+  @Override
+  public void chore() {
+    if (getEnabled()) {
+      long start = EnvironmentEdgeManager.currentTime();
+      AtomicLong deletedFiles = new AtomicLong(0);
+      AtomicLong failedDeletes = new AtomicLong(0);
+      for (HRegion region : regionServer.getRegions()) {
+        for (HStore store : region.getStores()) {
+          //only do cleanup in stores not using tmp directories
+          if (store.getStoreEngine().requireWritingToTmpDirFirst()) {
+            continue;
+          }
+          Path storePath =
+              new Path(region.getRegionFileSystem().getRegionDir(), store.getColumnFamilyName());
+
+          try {
+            List<FileStatus> fsStoreFiles =
+              Arrays.asList(region.getRegionFileSystem().fs.listStatus(storePath));
+            fsStoreFiles.forEach(
+              file -> cleanFileIfNeeded(file, store, deletedFiles, failedDeletes));
+          } catch (IOException e) {
+            LOG.warn("Failed to list files in {}, cleanup is skipped there",storePath);
+            continue;
+          }
+        }
+      }
+      LOG.debug(
+        "BrokenStoreFileCleaner on {} run for: {}ms. It deleted {} files and tried but failed "
+        + "to delete {}",
+        regionServer.getServerName().getServerName(), EnvironmentEdgeManager.currentTime() - start,
+        deletedFiles.get(), failedDeletes.get());
+    } else {
+      LOG.trace("Broken storefile Cleaner chore disabled! Not cleaning.");
+    }
+  }
+
+  private void cleanFileIfNeeded(FileStatus file, HStore store,
+    AtomicLong deletedFiles, AtomicLong failedDeletes) {
+    if(file.isDirectory()){
+      LOG.trace("This is a Directory {}, skip cleanup", file.getPath());
+      return;
+    }
+
+    if(!validate(file.getPath())){
+      LOG.trace("Invalid file {}, skip cleanup", file.getPath());
+      return;
+    }
+
+    if(!isOldEnough(file)){
+      LOG.trace("Fresh file {}, skip cleanup", file.getPath());
+      return;
+    }
+
+    if(isActiveStorefile(file, store)){
+      LOG.trace("Actively used storefile file {}, skip cleanup", file.getPath());
+      return;
+    }
+
+    // Compacted files can still have readers and are cleaned by a separate chore, so they have to
+    // be skipped here
+    if(isCompactedFile(file, store)){
+      LOG.trace("Cleanup is done by a different chore for file {}, skip cleanup", file.getPath());
+      return;
+    }
+
+    if(isCompactionResultFile(file, store)){
+      LOG.trace("The file is the result of an ongoing compaction {}, skip cleanup", file.getPath());
+      return;
+    }
+
+    deleteFile(file, store, deletedFiles, failedDeletes);
+  }
+
+  private boolean isCompactionResultFile(FileStatus file, HStore store) {
+    return store.getStoreEngine().getCompactor().getCompactionTargets().contains(file.getPath());
+  }
+
+  // Compacted files can still have readers and are cleaned by a separate chore, so they have to
+  // be skipped here
+  private boolean isCompactedFile(FileStatus file, HStore store) {
+    return store.getStoreEngine().getStoreFileManager().getCompactedfiles().stream()
+      .anyMatch(sf -> sf.getPath().equals(file.getPath()));
+  }
+
+  private boolean isActiveStorefile(FileStatus file, HStore store) {
+    return store.getStoreEngine().getStoreFileManager().getStorefiles().stream()
+      .anyMatch(sf -> sf.getPath().equals(file.getPath()));
+  }
+
+  boolean validate(Path file) {
+    if (HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent())) {
+      return true;
+    }
+    return StoreFileInfo.validateStoreFileName(file.getName());
+  }
+
+  boolean isOldEnough(FileStatus file){
+    return file.getModificationTime() + fileTtl < EnvironmentEdgeManager.currentTime();
+  }
+
+  private void deleteFile(FileStatus file, HStore store, AtomicLong deletedFiles,
+    AtomicLong failedDeletes) {
+    Path filePath = file.getPath();
+    LOG.debug("Removing {} from store", filePath);
+    try {
+      boolean success = store.getFileSystem().delete(filePath, false);
+      if (!success) {
+        failedDeletes.incrementAndGet();
+        LOG.warn("Attempted to delete:" + filePath
+            + ", but couldn't. Attempt to delete on next pass.");
+      }
+      else{
+        deletedFiles.incrementAndGet();
+      }
+    } catch (IOException e) {
+      e = e instanceof RemoteException ?
+          ((RemoteException)e).unwrapRemoteException() : e;
+      LOG.warn("Error while deleting: " + filePath, e);
+    }
+  }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
index 8201cb1..1e10eb2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
@@ -71,7 +71,7 @@ public class DateTieredMultiFileWriter extends AbstractMultiFileWriter {
   }
 
   @Override
-  protected Collection<StoreFileWriter> writers() {
+  public Collection<StoreFileWriter> writers() {
     return lowerBoundary2Writer.values();
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index 8110025..8920471 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -609,7 +609,7 @@ public class HRegionFileSystem {
       writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
       HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
         env.getMasterConfiguration(), fs, getTableDir(), regionInfo, false);
-        insertRegionFilesIntoStoreTracker(allRegionFiles, env, regionFs);
+      insertRegionFilesIntoStoreTracker(allRegionFiles, env, regionFs);
     }
     return regionDir;
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 57ab15b..3066b50 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -546,6 +546,8 @@ public class HRegionServer extends Thread implements
    */
   protected final ConfigurationManager configurationManager;
 
+  private BrokenStoreFileCleaner brokenStoreFileCleaner;
+
   @InterfaceAudience.Private
   CompactedHFilesDischarger compactedFileDischarger;
 
@@ -2156,6 +2158,9 @@ public class HRegionServer extends Thread implements
     if (this.slowLogTableOpsChore != null) {
       choreService.scheduleChore(slowLogTableOpsChore);
     }
+    if (this.brokenStoreFileCleaner != null) {
+      choreService.scheduleChore(brokenStoreFileCleaner);
+    }
 
     // Leases is not a Thread. Internally it runs a daemon thread. If it gets
     // an unhandled exception, it will just exit.
@@ -2236,6 +2241,22 @@ public class HRegionServer extends Thread implements
       this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
           onlyMetaRefresh, this, this);
     }
+
+    int brokenStoreFileCleanerPeriod  = conf.getInt(
+      BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD,
+      BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD);
+    int brokenStoreFileCleanerDelay  = conf.getInt(
+      BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY,
+      BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY);
+    double brokenStoreFileCleanerDelayJitter = conf.getDouble(
+      BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY_JITTER,
+      BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER);
+    double jitterRate = (RandomUtils.nextDouble() - 0.5D) * brokenStoreFileCleanerDelayJitter;
+    long jitterValue = Math.round(brokenStoreFileCleanerDelay * jitterRate);
+    this.brokenStoreFileCleaner =
+      new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue),
+        brokenStoreFileCleanerPeriod, this, conf, this);
+
     registerConfigurationObservers();
   }
 
@@ -4027,4 +4048,9 @@ public class HRegionServer extends Thread implements
   public MetaRegionLocationCache getMetaRegionLocationCache() {
     return this.metaRegionLocationCache;
   }
+
+  @InterfaceAudience.Private
+  public BrokenStoreFileCleaner getBrokenStoreFileCleaner(){
+    return brokenStoreFileCleaner;
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 5309305..e910f3c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1156,6 +1156,12 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
       }
     }
     replaceStoreFiles(filesToCompact, sfs, true);
+
+    // This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
+    // CleanerChore know that compaction is done and the file can be cleaned up if compaction
+    // have failed.
+    storeEngine.resetCompactionWriter();
+
     if (cr.isMajor()) {
       majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs());
       majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
index 0486729..ddb52d1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
@@ -42,9 +42,11 @@ import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
 import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
 import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
 import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -532,6 +534,25 @@ public abstract class StoreEngine<SF extends StoreFlusher, CP extends Compaction
     }
   }
 
+  /**
+   * Whether the implementation of the used storefile tracker requires you to write to temp
+   * directory first, i.e, does not allow broken store files under the actual data directory.
+   */
+  public boolean requireWritingToTmpDirFirst() {
+    return storeFileTracker.requireWritingToTmpDirFirst();
+  }
+
+  /**
+   * Resets the compaction writer when the new file is committed and used as active storefile.
+   * This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
+   * CleanerChore know that compaction is done and the file can be cleaned up if compaction
+   * have failed. Currently called in
+   * @see HStore#doCompaction(CompactionRequestImpl, Collection, User, long, List)
+   */
+  public void resetCompactionWriter(){
+    compactor.resetWriter();
+  }
+
   @RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
     allowedOnPath = ".*/TestHStore.java")
   ReadWriteLock getLock() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java
index fc0598d..a4e943a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java
@@ -58,7 +58,7 @@ public abstract class StripeMultiFileWriter extends AbstractMultiFileWriter {
   }
 
   @Override
-  protected Collection<StoreFileWriter> writers() {
+  public Collection<StoreFileWriter> writers() {
     return existingWriters;
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
index 533be17..19b7a98 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
@@ -68,7 +68,7 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
   }
 
   @Override
-  protected void abortWriter(T writer) throws IOException {
+  protected void abortWriter() throws IOException {
     FileSystem fs = store.getFileSystem();
     for (Path leftoverFile : writer.abortWriters()) {
       try {
@@ -79,5 +79,7 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
           e);
       }
     }
+    //this step signals that the target file is no longer writen and can be cleaned up
+    writer = null;
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 8178fb1..a821a90 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -25,9 +25,12 @@ import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELET
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
@@ -36,6 +39,7 @@ import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileInfo;
+import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter;
 import org.apache.hadoop.hbase.regionserver.CellSink;
 import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
 import org.apache.hadoop.hbase.regionserver.HStore;
@@ -91,6 +95,8 @@ public abstract class Compactor<T extends CellSink> {
   private final boolean dropCacheMajor;
   private final boolean dropCacheMinor;
 
+  protected T writer = null;
+
   //TODO: depending on Store is not good but, realistically, all compactors currently do.
   Compactor(Configuration conf, HStore store) {
     this.conf = conf;
@@ -323,7 +329,6 @@ public abstract class Compactor<T extends CellSink> {
     // Find the smallest read point across all the Scanners.
     long smallestReadPoint = getSmallestReadPoint();
 
-    T writer = null;
     boolean dropCache;
     if (request.isMajor() || request.isAllFiles()) {
       dropCache = this.dropCacheMajor;
@@ -347,8 +352,13 @@ public abstract class Compactor<T extends CellSink> {
         smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
         cleanSeqId = true;
       }
+      if (writer != null){
+        LOG.warn("Writer exists when it should not: " + getCompactionTargets().stream()
+          .map(n -> n.toString())
+          .collect(Collectors.joining(", ", "{ ", " }")));
+      }
       writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor());
-      finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
+      finished = performCompaction(fd, scanner, smallestReadPoint, cleanSeqId,
         throughputController, request.isAllFiles(), request.getFiles().size());
       if (!finished) {
         throw new InterruptedIOException("Aborting compaction of store " + store + " in region "
@@ -368,24 +378,23 @@ public abstract class Compactor<T extends CellSink> {
         Closeables.close(scanner, true);
       }
       if (!finished && writer != null) {
-        abortWriter(writer);
+        abortWriter();
       }
     }
     assert finished : "We should have exited the method on all error paths";
     assert writer != null : "Writer should be non-null if no error";
-    return commitWriter(writer, fd, request);
+    return commitWriter(fd, request);
   }
 
-  protected abstract List<Path> commitWriter(T writer, FileDetails fd,
+  protected abstract List<Path> commitWriter(FileDetails fd,
       CompactionRequestImpl request) throws IOException;
 
-  protected abstract void abortWriter(T writer) throws IOException;
+  protected abstract void abortWriter() throws IOException;
 
   /**
    * Performs the compaction.
    * @param fd FileDetails of cell sink writer
    * @param scanner Where to read from.
-   * @param writer Where to write to.
    * @param smallestReadPoint Smallest read point.
    * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is &lt;=
    *          smallestReadPoint
@@ -393,7 +402,7 @@ public abstract class Compactor<T extends CellSink> {
    * @param numofFilesToCompact the number of files to compact
    * @return Whether compaction ended; false if it was interrupted for some reason.
    */
-  protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
+  protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
       long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
       boolean major, int numofFilesToCompact) throws IOException {
     assert writer instanceof ShipperListener;
@@ -536,4 +545,24 @@ public abstract class Compactor<T extends CellSink> {
     return new StoreScanner(store, scanInfo, scanners, smallestReadPoint, earliestPutTs,
         dropDeletesFromRow, dropDeletesToRow);
   }
+
+  public List<Path> getCompactionTargets(){
+    if (writer == null){
+      return Collections.emptyList();
+    }
+    synchronized (writer){
+      if (writer instanceof StoreFileWriter){
+        return Arrays.asList(((StoreFileWriter)writer).getPath());
+      }
+      return ((AbstractMultiFileWriter)writer).writers().stream().map(sfw -> sfw.getPath()).collect(
+        Collectors.toList());
+    }
+  }
+
+  /**
+   * Reset the Writer when the new storefiles were successfully added
+   */
+  public void resetWriter(){
+    writer = null;
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
index fd54330..43e037c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
@@ -79,8 +79,10 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTiered
   }
 
   @Override
-  protected List<Path> commitWriter(DateTieredMultiFileWriter writer, FileDetails fd,
+  protected List<Path> commitWriter(FileDetails fd,
       CompactionRequestImpl request) throws IOException {
-    return writer.commitWriters(fd.maxSeqId, request.isAllFiles(), request.getFiles());
+    List<Path> pathList =
+      writer.commitWriters(fd.maxSeqId, request.isAllFiles(), request.getFiles());
+    return pathList;
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
index afa2429..ad2384a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
@@ -63,7 +63,7 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
   }
 
   @Override
-  protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
+  protected List<Path> commitWriter(FileDetails fd,
       CompactionRequestImpl request) throws IOException {
     List<Path> newFiles = Lists.newArrayList(writer.getPath());
     writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());
@@ -72,12 +72,19 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
   }
 
   @Override
+  protected void abortWriter() throws IOException {
+    abortWriter(writer);
+  }
+
   protected void abortWriter(StoreFileWriter writer) throws IOException {
     Path leftoverFile = writer.getPath();
     try {
       writer.close();
     } catch (IOException e) {
       LOG.warn("Failed to close the writer after an unfinished compaction.", e);
+    } finally {
+      //this step signals that the target file is no longer writen and can be cleaned up
+      writer = null;
     }
     try {
       store.getFileSystem().delete(leftoverFile, false);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
index 547555e..060a11b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
@@ -125,7 +125,7 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
   }
 
   @Override
-  protected List<Path> commitWriter(StripeMultiFileWriter writer, FileDetails fd,
+  protected List<Path> commitWriter(FileDetails fd,
       CompactionRequestImpl request) throws IOException {
     List<Path> newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor(), request.getFiles());
     assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata.";
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/FileBasedStoreFileTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/FileBasedStoreFileTracker.java
index 079b59b..8d9b66e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/FileBasedStoreFileTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/FileBasedStoreFileTracker.java
@@ -95,7 +95,7 @@ class FileBasedStoreFileTracker extends StoreFileTrackerBase {
   }
 
   @Override
-  protected boolean requireWritingToTmpDirFirst() {
+  public boolean requireWritingToTmpDirFirst() {
     return false;
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrationStoreFileTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrationStoreFileTracker.java
index a6648f2..53a474d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrationStoreFileTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrationStoreFileTracker.java
@@ -57,7 +57,7 @@ class MigrationStoreFileTracker extends StoreFileTrackerBase {
   }
 
   @Override
-  protected boolean requireWritingToTmpDirFirst() {
+  public boolean requireWritingToTmpDirFirst() {
     // Returns true if either of the two StoreFileTracker returns true.
     // For example, if we want to migrate from a tracker implementation which can ignore the broken
     // files under data directory to a tracker implementation which can not, if we still allow
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java
index f56a0dd..aabbe8d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java
@@ -88,4 +88,10 @@ public interface StoreFileTracker {
    * @param builder The table descriptor builder for the given table.
    */
   TableDescriptorBuilder updateWithTrackerConfigs(TableDescriptorBuilder builder);
+
+  /**
+   * Whether the implementation of this tracker requires you to write to temp directory first, i.e,
+   * does not allow broken store files under the actual data directory.
+   */
+  boolean requireWritingToTmpDirFirst();
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java
index b6de32b..db10f4d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java
@@ -173,12 +173,6 @@ abstract class StoreFileTrackerBase implements StoreFileTracker {
     return builder.build();
   }
 
-  /**
-   * Whether the implementation of this tracker requires you to write to temp directory first, i.e,
-   * does not allow broken store files under the actual data directory.
-   */
-  protected abstract boolean requireWritingToTmpDirFirst();
-
   protected abstract void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException;
 
   protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java
index 7c75e46..0f8a95f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java
@@ -549,7 +549,7 @@ public class RestoreSnapshotHelper {
                   " of snapshot=" + snapshotName+
                   " to region=" + regionInfo.getEncodedName() + " table=" + tableName);
           String fileName = restoreStoreFile(familyDir, regionInfo, storeFile, createBackRefs);
-          //mark the reference file to be added to tracker
+          // mark the reference file to be added to tracker
           filesToTrack.add(new StoreFileInfo(conf, fs,
             new Path(familyDir, fileName), true));
         }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBrokenStoreFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBrokenStoreFileCleaner.java
new file mode 100644
index 0000000..eb5e6c7
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBrokenStoreFileCleaner.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, RegionServerTests.class })
+public class TestBrokenStoreFileCleaner {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestBrokenStoreFileCleaner.class);
+
+  private final HBaseTestingUtility testUtil = new HBaseTestingUtility();
+  private final static byte[] fam = Bytes.toBytes("cf_1");
+  private final static byte[] qual1 = Bytes.toBytes("qf_1");
+  private final static byte[] val = Bytes.toBytes("val");
+  private final static  String junkFileName = "409fad9a751c4e8c86d7f32581bdc156";
+  TableName tableName;
+
+
+  @Before
+  public void setUp() throws Exception {
+    testUtil.getConfiguration().set(StoreFileTrackerFactory.TRACKER_IMPL,
+      "org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
+    testUtil.getConfiguration()
+      .set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_ENABLED, "true");
+    testUtil.getConfiguration().set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_TTL, "0");
+    testUtil.getConfiguration()
+      .set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD, "15000000");
+    testUtil.getConfiguration().set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY, "0");
+    testUtil.startMiniCluster(1);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    testUtil.deleteTable(tableName);
+    testUtil.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testDeletingJunkFile() throws Exception {
+    tableName = TableName.valueOf(getClass().getSimpleName() + "testDeletingJunkFile");
+    createTableWithData(tableName);
+
+    HRegion region = testUtil.getMiniHBaseCluster().getRegions(tableName).get(0);
+    ServerName sn = testUtil.getMiniHBaseCluster()
+      .getServerHoldingRegion(tableName, region.getRegionInfo().getRegionName());
+    HRegionServer rs = testUtil.getMiniHBaseCluster().getRegionServer(sn);
+    BrokenStoreFileCleaner cleaner = rs.getBrokenStoreFileCleaner();
+
+    //create junk file
+    HStore store = region.getStore(fam);
+    Path cfPath = store.getRegionFileSystem().getStoreDir(store.getColumnFamilyName());
+    Path junkFilePath = new Path(cfPath, junkFileName);
+
+    FSDataOutputStream junkFileOS = store.getFileSystem().create(junkFilePath);
+    junkFileOS.writeUTF("hello");
+    junkFileOS.close();
+
+    int storeFiles =  store.getStorefilesCount();
+    assertTrue(storeFiles > 0);
+
+    //verify the file exist before the chore and missing afterwards
+    assertTrue(store.getFileSystem().exists(junkFilePath));
+    cleaner.chore();
+    assertFalse(store.getFileSystem().exists(junkFilePath));
+
+    //verify no storefile got deleted
+    int currentStoreFiles =  store.getStorefilesCount();
+    assertEquals(currentStoreFiles, storeFiles);
+
+  }
+
+  @Test
+  public void testSkippingCompactedFiles() throws Exception {
+    tableName = TableName.valueOf(getClass().getSimpleName() + "testSkippningCompactedFiles");
+    createTableWithData(tableName);
+
+    HRegion region = testUtil.getMiniHBaseCluster().getRegions(tableName).get(0);
+
+    ServerName sn = testUtil.getMiniHBaseCluster()
+      .getServerHoldingRegion(tableName, region.getRegionInfo().getRegionName());
+    HRegionServer rs = testUtil.getMiniHBaseCluster().getRegionServer(sn);
+    BrokenStoreFileCleaner cleaner = rs.getBrokenStoreFileCleaner();
+
+    //run major compaction to generate compaced files
+    region.compact(true);
+
+    //make sure there are compacted files
+    HStore store = region.getStore(fam);
+    int compactedFiles =  store.getCompactedFilesCount();
+    assertTrue(compactedFiles > 0);
+
+    cleaner.chore();
+
+    //verify none of the compacted files were deleted
+    int existingCompactedFiles =  store.getCompactedFilesCount();
+    assertEquals(compactedFiles, existingCompactedFiles);
+
+    //verify adding a junk file does not break anything
+    Path cfPath = store.getRegionFileSystem().getStoreDir(store.getColumnFamilyName());
+    Path junkFilePath = new Path(cfPath, junkFileName);
+
+    FSDataOutputStream junkFileOS = store.getFileSystem().create(junkFilePath);
+    junkFileOS.writeUTF("hello");
+    junkFileOS.close();
+
+    assertTrue(store.getFileSystem().exists(junkFilePath));
+    cleaner.setEnabled(true);
+    cleaner.chore();
+    assertFalse(store.getFileSystem().exists(junkFilePath));
+
+    //verify compacted files are still intact
+    existingCompactedFiles =  store.getCompactedFilesCount();
+    assertEquals(compactedFiles, existingCompactedFiles);
+  }
+
+  @Test
+  public void testJunkFileTTL() throws Exception {
+    tableName = TableName.valueOf(getClass().getSimpleName() + "testDeletingJunkFile");
+    createTableWithData(tableName);
+
+    HRegion region = testUtil.getMiniHBaseCluster().getRegions(tableName).get(0);
+    ServerName sn = testUtil.getMiniHBaseCluster()
+      .getServerHoldingRegion(tableName, region.getRegionInfo().getRegionName());
+    HRegionServer rs = testUtil.getMiniHBaseCluster().getRegionServer(sn);
+
+    //create junk file
+    HStore store = region.getStore(fam);
+    Path cfPath = store.getRegionFileSystem().getStoreDir(store.getColumnFamilyName());
+    Path junkFilePath = new Path(cfPath, junkFileName);
+
+    FSDataOutputStream junkFileOS = store.getFileSystem().create(junkFilePath);
+    junkFileOS.writeUTF("hello");
+    junkFileOS.close();
+
+    int storeFiles =  store.getStorefilesCount();
+    assertTrue(storeFiles > 0);
+
+    //verify the file exist before the chore
+    assertTrue(store.getFileSystem().exists(junkFilePath));
+
+    //set a 5 sec ttl
+    rs.getConfiguration().set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_TTL, "5000");
+    BrokenStoreFileCleaner cleaner = new BrokenStoreFileCleaner(15000000,
+      0, rs, rs.getConfiguration(), rs);
+    cleaner.chore();
+    //file is still present after chore run
+    assertTrue(store.getFileSystem().exists(junkFilePath));
+    Thread.sleep(5000);
+    cleaner.chore();
+    assertFalse(store.getFileSystem().exists(junkFilePath));
+
+    //verify no storefile got deleted
+    int currentStoreFiles =  store.getStorefilesCount();
+    assertEquals(currentStoreFiles, storeFiles);
+  }
+
+  private Table createTableWithData(TableName tableName) throws IOException {
+    Table table = testUtil.createTable(tableName, fam);
+    try {
+      for (int i = 1; i < 10; i++) {
+        Put p = new Put(Bytes.toBytes("row" + i));
+        p.addColumn(fam, qual1, val);
+        table.put(p);
+      }
+      // flush them
+      testUtil.getAdmin().flush(tableName);
+      for (int i = 11; i < 20; i++) {
+        Put p = new Put(Bytes.toBytes("row" + i));
+        p.addColumn(fam, qual1, val);
+        table.put(p);
+      }
+      // flush them
+      testUtil.getAdmin().flush(tableName);
+      for (int i = 21; i < 30; i++) {
+        Put p = new Put(Bytes.toBytes("row" + i));
+        p.addColumn(fam, qual1, val);
+        table.put(p);
+      }
+      // flush them
+      testUtil.getAdmin().flush(tableName);
+    } catch (IOException e) {
+      table.close();
+      throw e;
+    }
+    return table;
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java
index 9a6e96a..1b76c52 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java
@@ -128,13 +128,13 @@ public class TestCompactorMemLeak {
     }
 
     @Override
-    protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
+    protected List<Path> commitWriter(FileDetails fd,
         CompactionRequestImpl request) throws IOException {
       HFileWriterImpl writerImpl = (HFileWriterImpl) writer.writer;
       Cell cell = writerImpl.getLastCell();
       // The cell should be backend with an KeyOnlyKeyValue.
       IS_LAST_CELL_ON_HEAP.set(cell instanceof KeyOnlyKeyValue);
-      return super.commitWriter(writer, fd, request);
+      return super.commitWriter(fd, request);
     }
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileTracker.java
index b30ca47..9818972 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileTracker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileTracker.java
@@ -47,7 +47,6 @@ public class TestStoreFileTracker extends DefaultStoreFileTracker {
     } else {
       LOG.info("ctx.getRegionFileSystem() returned null. Leaving storeId null.");
     }
-
   }
 
   @Override