You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2015/12/04 05:51:24 UTC

[2/2] hbase git commit: HBASE-13082 Coarsen StoreScanner locks to RegionScanner (Ram)

HBASE-13082 Coarsen StoreScanner locks to RegionScanner (Ram)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8b3d1f14
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8b3d1f14
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8b3d1f14

Branch: refs/heads/master
Commit: 8b3d1f144408e4a7a014c5ac46418c9e91b9b0db
Parents: b326042
Author: ramkrishna <ra...@gmail.com>
Authored: Fri Dec 4 10:20:46 2015 +0530
Committer: ramkrishna <ra...@gmail.com>
Committed: Fri Dec 4 10:20:46 2015 +0530

----------------------------------------------------------------------
 .../regionserver/DefaultStoreFileManager.java   |  58 ++-
 .../hadoop/hbase/regionserver/HRegion.java      |  26 +-
 .../hadoop/hbase/regionserver/HStore.java       | 213 ++++++----
 .../regionserver/ReversedStoreScanner.java      |  19 +-
 .../apache/hadoop/hbase/regionserver/Store.java |   5 +
 .../hadoop/hbase/regionserver/StoreFile.java    |  67 +++-
 .../hbase/regionserver/StoreFileManager.java    |  25 +-
 .../hbase/regionserver/StoreFileScanner.java    |   3 +
 .../hadoop/hbase/regionserver/StoreScanner.java | 384 +++++++++---------
 .../regionserver/StripeStoreFileManager.java    |  82 +++-
 .../compactions/CompactedHFilesDischarger.java  |  74 ++++
 .../org/apache/hadoop/hbase/TestIOFencing.java  |  11 -
 .../TestZooKeeperTableArchiveClient.java        |  23 +-
 .../apache/hadoop/hbase/io/TestHeapSize.java    |   2 +-
 .../hbase/mapreduce/TestHFileOutputFormat2.java |  16 +
 .../master/cleaner/TestSnapshotFromMaster.java  |   8 +
 .../hbase/regionserver/MockStoreFile.java       |   5 +
 .../regionserver/TestEncryptionKeyRotation.java |  59 ++-
 .../regionserver/TestHRegionReplayEvents.java   |   3 +
 .../TestRegionMergeTransactionOnCluster.java    |  29 +-
 .../hbase/regionserver/TestRegionReplicas.java  |  11 +-
 .../hadoop/hbase/regionserver/TestStore.java    |  13 +
 .../TestStripeStoreFileManager.java             |  19 +
 .../TestCompactedHFilesDischarger.java          | 389 +++++++++++++++++++
 24 files changed, 1211 insertions(+), 333 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
index 9f38b9e..d38306c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
@@ -54,6 +54,13 @@ class DefaultStoreFileManager implements StoreFileManager {
    * is atomically replaced when its contents change.
    */
   private volatile ImmutableList<StoreFile> storefiles = null;
+  /**
+   * List of compacted files inside this store that needs to be excluded in reads
+   * because further new reads will be using only the newly created files out of compaction.
+   * These compacted files will be deleted/cleared once all the existing readers on these
+   * compacted files are done.
+   */
+  private volatile List<StoreFile> compactedfiles = null;
 
   public DefaultStoreFileManager(CellComparator kvComparator, Configuration conf,
       CompactionConfiguration comConf) {
@@ -75,6 +82,11 @@ class DefaultStoreFileManager implements StoreFileManager {
   }
 
   @Override
+  public Collection<StoreFile> getCompactedfiles() {
+    return compactedfiles;
+  }
+
+  @Override
   public void insertNewFiles(Collection<StoreFile> sfs) throws IOException {
     ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(storefiles);
     newFiles.addAll(sfs);
@@ -89,19 +101,55 @@ class DefaultStoreFileManager implements StoreFileManager {
   }
 
   @Override
+  public Collection<StoreFile> clearCompactedFiles() {
+    List<StoreFile> result = compactedfiles;
+    compactedfiles = new ArrayList<StoreFile>();
+    return result;
+  }
+
+  @Override
   public final int getStorefileCount() {
     return storefiles.size();
   }
 
   @Override
   public void addCompactionResults(
-    Collection<StoreFile> compactedFiles, Collection<StoreFile> results) {
+    Collection<StoreFile> newCompactedfiles, Collection<StoreFile> results) {
     ArrayList<StoreFile> newStoreFiles = Lists.newArrayList(storefiles);
-    newStoreFiles.removeAll(compactedFiles);
+    newStoreFiles.removeAll(newCompactedfiles);
     if (!results.isEmpty()) {
       newStoreFiles.addAll(results);
     }
     sortAndSetStoreFiles(newStoreFiles);
+    ArrayList<StoreFile> updatedCompactedfiles = null;
+    if (this.compactedfiles != null) {
+      updatedCompactedfiles = new ArrayList<StoreFile>(this.compactedfiles);
+      updatedCompactedfiles.addAll(newCompactedfiles);
+    } else {
+      updatedCompactedfiles = new ArrayList<StoreFile>(newCompactedfiles);
+    }
+    markCompactedAway(newCompactedfiles);
+    this.compactedfiles = sortCompactedfiles(updatedCompactedfiles);
+  }
+
+  // Mark the files as compactedAway once the storefiles and compactedfiles list is finalised
+  // Let a background thread close the actual reader on these compacted files and also
+  // ensure to evict the blocks from block cache so that they are no longer in
+  // cache
+  private void markCompactedAway(Collection<StoreFile> compactedFiles) {
+    for (StoreFile file : compactedFiles) {
+      file.markCompactedAway();
+    }
+  }
+
+  @Override
+  public void removeCompactedFiles(Collection<StoreFile> removedCompactedfiles) throws IOException {
+    ArrayList<StoreFile> updatedCompactedfiles = null;
+    if (this.compactedfiles != null) {
+      updatedCompactedfiles = new ArrayList<StoreFile>(this.compactedfiles);
+      updatedCompactedfiles.removeAll(removedCompactedfiles);
+      this.compactedfiles = sortCompactedfiles(updatedCompactedfiles);
+    }
   }
 
   @Override
@@ -166,6 +214,12 @@ class DefaultStoreFileManager implements StoreFileManager {
     storefiles = ImmutableList.copyOf(storeFiles);
   }
 
+  private List<StoreFile> sortCompactedfiles(List<StoreFile> storefiles) {
+    // Sorting may not be really needed here for the compacted files?
+    Collections.sort(storefiles, StoreFile.Comparators.SEQ_ID);
+    return new ArrayList<StoreFile>(storefiles);
+  }
+
   @Override
   public double getCompactionPressure() {
     int storefileCount = getStorefileCount();

http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 557edd9..484d5ee 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.ChoreService;
 import org.apache.hadoop.hbase.CompoundConfiguration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.DroppedSnapshotException;
@@ -148,6 +149,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescripto
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
 import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
@@ -297,6 +299,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   protected final Configuration conf;
   private final Configuration baseConf;
   private final int rowLockWaitDuration;
+  private CompactedHFilesDischarger compactedFileDischarger;
   static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
 
   // The internal wait duration to acquire a lock before read/update
@@ -809,6 +812,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     // Initialize all the HStores
     status.setStatus("Initializing all the Stores");
     long maxSeqId = initializeStores(reporter, status);
+    // Start the CompactedHFilesDischarger here. This chore helps to remove the compacted files
+    // that will no longer be used in reads.
+    if (this.getRegionServerServices() != null) {
+      ChoreService choreService = this.getRegionServerServices().getChoreService();
+      if (choreService != null) {
+        // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
+        // 2 mins so that compacted files can be archived before the TTLCleaner runs
+        int cleanerInterval =
+            conf.getInt("hbase.hfile.compactions.cleaner.interval", 2 * 60 * 1000);
+        this.compactedFileDischarger =
+            new CompactedHFilesDischarger(cleanerInterval, this.getRegionServerServices(), this);
+        choreService.scheduleChore(compactedFileDischarger);
+      }
+    }
     this.mvcc.advanceTo(maxSeqId);
     if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
       // Recover any edits if available.
@@ -1513,6 +1530,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       if (this.metricsRegionWrapper != null) {
         Closeables.closeQuietly(this.metricsRegionWrapper);
       }
+      // stop the Compacted hfile discharger
+      if (this.compactedFileDischarger != null) this.compactedFileDischarger.cancel(true);
+
       status.markComplete("Closed");
       LOG.info("Closed " + this);
       return result;
@@ -6658,6 +6678,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       dstRegion.getRegionFileSystem().logFileSystemState(LOG);
     }
 
+    // clear the compacted files if any
+    for (Store s : dstRegion.getStores()) {
+      s.closeAndArchiveCompactedFiles();
+    }
     if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) {
       throw new IOException("Merged region " + dstRegion
           + " still has references after the compaction, is compaction canceled?");
@@ -7527,7 +7551,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   public static final long FIXED_OVERHEAD = ClassSize.align(
       ClassSize.OBJECT +
       ClassSize.ARRAY +
-      43 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
+      44 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
       (14 * Bytes.SIZEOF_LONG) +
       5 * Bytes.SIZEOF_BOOLEAN);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 184d44f..50b3de7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -39,7 +39,9 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -76,6 +78,7 @@ import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@@ -90,7 +93,7 @@ import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
-import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
 
@@ -138,6 +141,8 @@ public class HStore implements Store {
   static int closeCheckInterval = 0;
   private volatile long storeSize = 0L;
   private volatile long totalUncompressedBytes = 0L;
+  private ThreadPoolExecutor compactionCleanerthreadPoolExecutor = null;
+  private CompletionService<StoreFile> completionService = null;
 
   /**
    * RWLock for store operations.
@@ -181,7 +186,6 @@ public class HStore implements Store {
 
   private long blockingFileCount;
   private int compactionCheckMultiplier;
-
   protected Encryption.Context cryptoContext = Encryption.Context.NONE;
 
   private volatile long flushedCellsCount = 0;
@@ -272,7 +276,10 @@ public class HStore implements Store {
           "hbase.hstore.flush.retries.number must be > 0, not "
               + flushRetriesNumber);
     }
-
+    compactionCleanerthreadPoolExecutor = getThreadPoolExecutor(
+      conf.getInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 10));
+    completionService =
+        new ExecutorCompletionService<StoreFile>(compactionCleanerthreadPoolExecutor);
     // Crypto context for new store files
     String cipherName = family.getEncryptionType();
     if (cipherName != null) {
@@ -551,14 +558,15 @@ public class HStore implements Store {
         try {
           Future<StoreFile> future = completionService.take();
           StoreFile storeFile = future.get();
-          long length = storeFile.getReader().length();
-          this.storeSize += length;
-          this.totalUncompressedBytes +=
-              storeFile.getReader().getTotalUncompressedBytes();
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("loaded " + storeFile.toStringDetailed());
+          if (storeFile != null) {
+            long length = storeFile.getReader().length();
+            this.storeSize += length;
+            this.totalUncompressedBytes += storeFile.getReader().getTotalUncompressedBytes();
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("loaded " + storeFile.toStringDetailed());
+            }
+            results.add(storeFile);
           }
-          results.add(storeFile);
         } catch (InterruptedException e) {
           if (ioe == null) ioe = new InterruptedIOException(e.getMessage());
         } catch (ExecutionException e) {
@@ -656,8 +664,7 @@ public class HStore implements Store {
       region.getMVCC().advanceTo(this.getMaxSequenceId());
     }
 
-    // notify scanners, close file readers, and recompute store size
-    completeCompaction(toBeRemovedStoreFiles, false);
+    completeCompaction(toBeRemovedStoreFiles);
   }
 
   private StoreFile createStoreFileAndReader(final Path p) throws IOException {
@@ -834,7 +841,6 @@ public class HStore implements Store {
       // the lock.
       this.lock.writeLock().unlock();
     }
-    notifyChangedReadersObservers();
     LOG.info("Loaded HFile " + sf.getFileInfo() + " into store '" + getColumnFamilyName());
     if (LOG.isTraceEnabled()) {
       String traceMessage = "BULK LOAD time,size,store size,store files ["
@@ -850,7 +856,10 @@ public class HStore implements Store {
     try {
       // Clear so metrics doesn't find them.
       ImmutableCollection<StoreFile> result = storeEngine.getStoreFileManager().clearFiles();
-
+      Collection<StoreFile> compactedfiles =
+          storeEngine.getStoreFileManager().clearCompactedFiles();
+      // clear the compacted files
+      removeCompactedFiles(compactedfiles);
       if (!result.isEmpty()) {
         // initialize the thread pool for closing store files in parallel.
         ThreadPoolExecutor storeFileCloserThreadPool = this.region
@@ -865,7 +874,7 @@ public class HStore implements Store {
             @Override
             public Void call() throws IOException {
               boolean evictOnClose = 
-                  cacheConf != null? cacheConf.shouldEvictOnClose(): true; 
+                  cacheConf != null? cacheConf.shouldEvictOnClose(): true;
               f.closeReader(evictOnClose);
               return null;
             }
@@ -892,6 +901,9 @@ public class HStore implements Store {
         }
         if (ioe != null) throw ioe;
       }
+      if (compactionCleanerthreadPoolExecutor != null) {
+        compactionCleanerthreadPoolExecutor.shutdownNow();
+      }
       LOG.info("Closed " + this);
       return result;
     } finally {
@@ -1087,10 +1099,8 @@ public class HStore implements Store {
       // the lock.
       this.lock.writeLock().unlock();
     }
-
-    // Tell listeners of the change in readers.
+    // notify to be called here - only in case of flushes
     notifyChangedReadersObservers();
-
     if (LOG.isTraceEnabled()) {
       long totalSize = 0;
       for (StoreFile sf : sfs) {
@@ -1109,7 +1119,7 @@ public class HStore implements Store {
    * @throws IOException
    */
   private void notifyChangedReadersObservers() throws IOException {
-    for (ChangedReadersObserver o: this.changedReaderObservers) {
+    for (ChangedReadersObserver o : this.changedReaderObservers) {
       o.updateReaders();
     }
   }
@@ -1268,7 +1278,7 @@ public class HStore implements Store {
         compactedCellsSize += getCompactionProgress().totalCompactedSize;
       }
       // At this point the store will use new files for all new scanners.
-      completeCompaction(filesToCompact, true); // Archive old files & update store size.
+      completeCompaction(filesToCompact); // update store size.
 
       logCompactionEndMessage(cr, sfs, compactionStartTime);
       return sfs;
@@ -1456,7 +1466,7 @@ public class HStore implements Store {
       LOG.info("Replaying compaction marker, replacing input files: " +
           inputStoreFiles + " with output files : " + outputStoreFiles);
       this.replaceStoreFiles(inputStoreFiles, outputStoreFiles);
-      this.completeCompaction(inputStoreFiles, removeFiles);
+      this.completeCompaction(inputStoreFiles);
     }
   }
 
@@ -1508,7 +1518,7 @@ public class HStore implements Store {
           this.getCoprocessorHost().postCompact(this, sf, null);
         }
         replaceStoreFiles(filesToCompact, Lists.newArrayList(sf));
-        completeCompaction(filesToCompact, true);
+        completeCompaction(filesToCompact);
       }
     } finally {
       synchronized (filesCompacting) {
@@ -1770,54 +1780,7 @@ public class HStore implements Store {
   @VisibleForTesting
   protected void completeCompaction(final Collection<StoreFile> compactedFiles)
     throws IOException {
-    completeCompaction(compactedFiles, true);
-  }
-
-
-  /**
-   * <p>It works by processing a compaction that's been written to disk.
-   *
-   * <p>It is usually invoked at the end of a compaction, but might also be
-   * invoked at HStore startup, if the prior execution died midway through.
-   *
-   * <p>Moving the compacted TreeMap into place means:
-   * <pre>
-   * 1) Unload all replaced StoreFile, close and collect list to delete.
-   * 2) Compute new store size
-   * </pre>
-   *
-   * @param compactedFiles list of files that were compacted
-   */
-  @VisibleForTesting
-  protected void completeCompaction(final Collection<StoreFile> compactedFiles, boolean removeFiles)
-      throws IOException {
-    try {
-      // Do not delete old store files until we have sent out notification of
-      // change in case old files are still being accessed by outstanding scanners.
-      // Don't do this under writeLock; see HBASE-4485 for a possible deadlock
-      // scenario that could have happened if continue to hold the lock.
-      notifyChangedReadersObservers();
-      // At this point the store will use new files for all scanners.
-
-      // let the archive util decide if we should archive or delete the files
-      LOG.debug("Removing store files after compaction...");
-      boolean evictOnClose = 
-          cacheConf != null? cacheConf.shouldEvictOnClose(): true; 
-      for (StoreFile compactedFile : compactedFiles) {
-        compactedFile.closeReader(evictOnClose);
-      }
-      if (removeFiles) {
-        this.fs.removeStoreFiles(this.getColumnFamilyName(), compactedFiles);
-      }
-    } catch (IOException e) {
-      e = e instanceof RemoteException ?
-                ((RemoteException)e).unwrapRemoteException() : e;
-      LOG.error("Failed removing compacted files in " + this +
-        ". Files we were trying to remove are " + compactedFiles.toString() +
-        "; some of them may have been already removed", e);
-    }
-
-    // 4. Compute new store size
+    LOG.debug("Completing compaction...");
     this.storeSize = 0L;
     this.totalUncompressedBytes = 0L;
     for (StoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) {
@@ -2248,7 +2211,7 @@ public class HStore implements Store {
   }
 
   public static final long FIXED_OVERHEAD =
-      ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (10 * Bytes.SIZEOF_LONG)
+      ClassSize.align(ClassSize.OBJECT + (18 * ClassSize.REFERENCE) + (10 * Bytes.SIZEOF_LONG)
               + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
 
   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
@@ -2365,4 +2328,112 @@ public class HStore implements Store {
   public boolean isPrimaryReplicaStore() {
 	   return getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID;
   }
+
+  @Override
+  public void closeAndArchiveCompactedFiles() throws IOException {
+    lock.readLock().lock();
+    Collection<StoreFile> copyCompactedfiles = null;
+    try {
+      Collection<StoreFile> compactedfiles =
+          this.getStoreEngine().getStoreFileManager().getCompactedfiles();
+      if (compactedfiles != null && compactedfiles.size() != 0) {
+        // Do a copy under read lock
+        copyCompactedfiles = new ArrayList<StoreFile>(compactedfiles);
+      } else {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("No compacted files to archive");
+          return;
+        }
+      }
+    } finally {
+      lock.readLock().unlock();
+    }
+    removeCompactedFiles(copyCompactedfiles);
+  }
+
+  private ThreadPoolExecutor getThreadPoolExecutor(int maxThreads) {
+    return Threads.getBoundedCachedThreadPool(maxThreads, maxThreads * 3, TimeUnit.SECONDS,
+      new ThreadFactory() {
+        private int count = 1;
+
+        @Override
+        public Thread newThread(Runnable r) {
+          return new Thread(r, "CompactedfilesArchiver-" + count++);
+        }
+      });
+  }
+
+  private void removeCompactedFiles(Collection<StoreFile> compactedfiles) throws IOException {
+    if (compactedfiles != null && !compactedfiles.isEmpty()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Removing the compacted store files " + compactedfiles);
+      }
+      for (final StoreFile file : compactedfiles) {
+        completionService.submit(new Callable<StoreFile>() {
+          @Override
+          public StoreFile call() throws IOException {
+            synchronized (file) {
+              try {
+                StoreFile.Reader r = file.getReader();
+                if (r == null) {
+                  if (LOG.isDebugEnabled()) {
+                    LOG.debug("The file " + file + " was closed but still not archived.");
+                  }
+                  return file;
+                }
+                if (r != null && r.isCompactedAway() && !r.isReferencedInReads()) {
+                  // Even if deleting fails we need not bother as any new scanners won't be
+                  // able to use the compacted file as the status is already compactedAway
+                  if (LOG.isTraceEnabled()) {
+                    LOG.trace("Closing and archiving the file " + file.getPath());
+                  }
+                  r.close(true);
+                  // Just close and return
+                  return file;
+                }
+              } catch (Exception e) {
+                LOG.error("Exception while trying to close the compacted store file "
+                    + file.getPath().getName());
+              }
+            }
+            return null;
+          }
+        });
+      }
+      final List<StoreFile> filesToRemove = new ArrayList<StoreFile>(compactedfiles.size());
+      try {
+        for (final StoreFile file : compactedfiles) {
+          Future<StoreFile> future = completionService.take();
+          StoreFile closedFile = future.get();
+          if (closedFile != null) {
+            filesToRemove.add(closedFile);
+          }
+        }
+      } catch (InterruptedException ie) {
+        LOG.error("Interrupted exception while closing the compacted files", ie);
+      } catch (Exception e) {
+        LOG.error("Exception occured while closing the compacted files", e);
+      }
+      if (isPrimaryReplicaStore()) {
+        archiveAndRemoveCompactedFiles(filesToRemove);
+      }
+
+    }
+  }
+
+  private void archiveAndRemoveCompactedFiles(List<StoreFile> filesToArchive) throws IOException {
+    if (!filesToArchive.isEmpty()) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Moving the files " + filesToArchive + " to archive");
+      }
+      // Only if this is successful it has to be removed
+      this.fs.removeStoreFiles(this.getFamily().getNameAsString(), filesToArchive);
+      try {
+        lock.writeLock().lock();
+        this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToArchive);
+      } finally {
+        lock.writeLock().unlock();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
index d198d7b..0e1d90f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
@@ -123,24 +123,13 @@ class ReversedStoreScanner extends StoreScanner implements KeyValueScanner {
 
   @Override
   public boolean seekToPreviousRow(Cell key) throws IOException {
-    lock.lock();
-    try {
-      checkReseek();
-      return this.heap.seekToPreviousRow(key);
-    } finally {
-      lock.unlock();
-    }
-
+    checkReseek();
+    return this.heap.seekToPreviousRow(key);
   }
   
   @Override
   public boolean backwardSeek(Cell key) throws IOException {
-    lock.lock();
-    try {
-      checkReseek();
-      return this.heap.backwardSeek(key);
-    } finally {
-      lock.unlock();
-    }
+    checkReseek();
+    return this.heap.backwardSeek(key);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index 83a24a5..f137a8e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -466,4 +466,9 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
   void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException;
 
   boolean isPrimaryReplicaStore();
+
+  /**
+   * Closes and archives the compacted files under this store
+   */
+  void closeAndArchiveCompactedFiles() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index 6e5f441..2b9d101 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -29,6 +29,8 @@ import java.util.Map;
 import java.util.SortedSet;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -61,6 +63,7 @@ import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.hbase.io.hfile.HFileBlock;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
@@ -378,6 +381,19 @@ public class StoreFile {
     return bulkLoadedHFile || metadataMap.containsKey(BULKLOAD_TIME_KEY);
   }
 
+  @VisibleForTesting
+  public boolean isCompactedAway() {
+    if (this.reader != null) {
+      return this.reader.isCompactedAway();
+    }
+    return true;
+  }
+
+  @VisibleForTesting
+  public int getRefCount() {
+    return this.reader.refCount.get();
+  }
+
   /**
    * Return the timestamp at which this bulk load file was generated.
    */
@@ -553,6 +569,15 @@ public class StoreFile {
   }
 
   /**
+   * Marks the status of the file as compactedAway.
+   */
+  public void markCompactedAway() {
+    if (this.reader != null) {
+      this.reader.markCompactedAway();
+    }
+  }
+
+  /**
    * Delete this file
    * @throws IOException
    */
@@ -1137,6 +1162,12 @@ public class StoreFile {
     private boolean bulkLoadResult = false;
     private KeyValue.KeyOnlyKeyValue lastBloomKeyOnlyKV = null;
     private boolean skipResetSeqId = true;
+    // Counter that is incremented every time a scanner is created on the
+    // store file.  It is decremented when the scan on the store file is
+    // done.
+    private AtomicInteger refCount = new AtomicInteger(0);
+    // Indicates if the file got compacted
+    private volatile boolean compactedAway = false;
 
     public Reader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf)
         throws IOException {
@@ -1144,6 +1175,10 @@ public class StoreFile {
       bloomFilterType = BloomType.NONE;
     }
 
+    void markCompactedAway() {
+      this.compactedAway = true;
+    }
+
     public Reader(FileSystem fs, Path path, FSDataInputStreamWrapper in, long size,
         CacheConfig cacheConf, Configuration conf) throws IOException {
       reader = HFile.createReader(fs, path, in, size, cacheConf, conf);
@@ -1195,12 +1230,36 @@ public class StoreFile {
     public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
                                                boolean pread,
                                                boolean isCompaction, long readPt) {
+      // Increment the ref count
+      refCount.incrementAndGet();
       return new StoreFileScanner(this,
                                  getScanner(cacheBlocks, pread, isCompaction),
                                  !isCompaction, reader.hasMVCCInfo(), readPt);
     }
 
     /**
+     * Decrement the ref count associated with the reader when ever a scanner associated
+     * with the reader is closed
+     */
+    void decrementRefCount() {
+      refCount.decrementAndGet();
+    }
+
+    /**
+     * @return true if the file is still used in reads
+     */
+    public boolean isReferencedInReads() {
+      return refCount.get() != 0;
+    }
+ 
+    /**
+     * @return true if the file is compacted
+     */
+    public boolean isCompactedAway() {
+      return this.compactedAway;
+    }
+
+    /**
      * Warning: Do not write further code which depends on this call. Instead
      * use getStoreFileScanner() which uses the StoreFileScanner class/interface
      * which is the preferred way to scan a store with higher level concepts.
@@ -1710,7 +1769,13 @@ public class StoreFile {
     private static class GetFileSize implements Function<StoreFile, Long> {
       @Override
       public Long apply(StoreFile sf) {
-        return sf.getReader().length();
+        if (sf.getReader() != null) {
+          return sf.getReader().length();
+        } else {
+          // the reader may be null for the compacted files and if the archiving
+          // had failed.
+          return -1L;
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
index 11993db..7e70547 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
@@ -53,7 +53,7 @@ public interface StoreFileManager {
   void insertNewFiles(Collection<StoreFile> sfs) throws IOException;
 
   /**
-   * Adds compaction results into the structure.
+   * Adds only the new compaction results into the structure.
    * @param compactedFiles The input files for the compaction.
    * @param results The resulting files for the compaction.
    */
@@ -61,12 +61,26 @@ public interface StoreFileManager {
       Collection<StoreFile> compactedFiles, Collection<StoreFile> results) throws IOException;
 
   /**
+   * Remove the compacted files
+   * @param compactedFiles the list of compacted files
+   * @throws IOException
+   */
+  void removeCompactedFiles(Collection<StoreFile> compactedFiles) throws IOException;
+
+  /**
    * Clears all the files currently in use and returns them.
    * @return The files previously in use.
    */
   ImmutableCollection<StoreFile> clearFiles();
 
   /**
+   * Clears all the compacted files and returns them. This method is expected to be
+   * accessed single threaded.
+   * @return The files compacted previously.
+   */
+  Collection<StoreFile> clearCompactedFiles();
+
+  /**
    * Gets the snapshot of the store files currently in use. Can be used for things like metrics
    * and checks; should not assume anything about relations between store files in the list.
    * @return The list of StoreFiles.
@@ -74,6 +88,15 @@ public interface StoreFileManager {
   Collection<StoreFile> getStorefiles();
 
   /**
+   * List of compacted files inside this store that needs to be excluded in reads
+   * because further new reads will be using only the newly created files out of compaction.
+   * These compacted files will be deleted/cleared once all the existing readers on these
+   * compacted files are done.
+   * @return the list of compacted files
+   */
+  Collection<StoreFile> getCompactedfiles();
+
+  /**
    * Returns the number of files currently in use.
    * @return The number of files.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index fb154c0..c864733 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -248,6 +248,9 @@ public class StoreFileScanner implements KeyValueScanner {
   public void close() {
     cur = null;
     this.hfs.close();
+    if (this.reader != null) {
+      this.reader.decrementRefCount();
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index a4e066c..44f07f7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -27,7 +27,6 @@ import java.util.List;
 import java.util.NavigableSet;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -125,7 +124,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
 
   // A flag whether use pread for scan
   private boolean scanUsePread = false;
-  protected ReentrantLock lock = new ReentrantLock();
+  // Indicates whether there was flush during the course of the scan
+  protected volatile boolean flushed = false;
   
   protected final long readPt;
 
@@ -403,15 +403,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
 
   @Override
   public Cell peek() {
-    lock.lock();
-    try {
+    checkResetHeap();
     if (this.heap == null) {
       return this.lastTop;
     }
     return this.heap.peek();
-    } finally {
-      lock.unlock();
-    }
   }
 
   @Override
@@ -425,46 +421,37 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     close(true);
   }
 
-  private void close(boolean withHeapClose){
-    lock.lock();
-    try {
-      if (this.closing) {
-        return;
+  private void close(boolean withHeapClose) {
+    if (this.closing) {
+      return;
+    }
+    if (withHeapClose) this.closing = true;
+    // Under test, we dont have a this.store
+    if (this.store != null) this.store.deleteChangedReaderObserver(this);
+    if (withHeapClose) {
+      for (KeyValueHeap h : this.heapsForDelayedClose) {
+        h.close();
       }
-      if (withHeapClose) this.closing = true;
-      // Under test, we dont have a this.store
-      if (this.store != null) this.store.deleteChangedReaderObserver(this);
-      if (withHeapClose) {
-        for (KeyValueHeap h : this.heapsForDelayedClose) {
-          h.close();
-        }
-        this.heapsForDelayedClose.clear();
-        if (this.heap != null) {
-          this.heap.close();
-          this.heap = null; // CLOSED!
-        }
-      } else {
-        if (this.heap != null) {
-          this.heapsForDelayedClose.add(this.heap);
-          this.heap = null;
-        }
+      this.heapsForDelayedClose.clear();
+      if (this.heap != null) {
+        this.heap.close();
+        this.heap = null; // CLOSED!
+      }
+    } else {
+      if (this.heap != null) {
+        this.heapsForDelayedClose.add(this.heap);
+        this.heap = null;
       }
-      this.lastTop = null; // If both are null, we are closed.
-    } finally {
-      lock.unlock();
     }
+    this.lastTop = null; // If both are null, we are closed.
   }
 
   @Override
   public boolean seek(Cell key) throws IOException {
-    lock.lock();
-    try {
+    checkResetHeap();
     // reset matcher state, in case that underlying store changed
     checkReseek();
     return this.heap.seek(key);
-    } finally {
-      lock.unlock();
-    }
   }
 
   @Override
@@ -480,173 +467,168 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
    */
   @Override
   public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
-    lock.lock();
-    try {
-      if (scannerContext == null) {
-        throw new IllegalArgumentException("Scanner context cannot be null");
-      }
-      if (checkReseek()) {
-        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
-      }
+    if (scannerContext == null) {
+      throw new IllegalArgumentException("Scanner context cannot be null");
+    }
+    checkResetHeap();
+    if (checkReseek()) {
+      return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
+    }
 
-      // if the heap was left null, then the scanners had previously run out anyways, close and
-      // return.
-      if (this.heap == null) {
-        // By this time partial close should happened because already heap is null
-        close(false);// Do all cleanup except heap.close()
-        return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
-      }
+    // if the heap was left null, then the scanners had previously run out anyways, close and
+    // return.
+    if (this.heap == null) {
+      // By this time partial close should happened because already heap is null
+      close(false);// Do all cleanup except heap.close()
+      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
+    }
 
-      Cell cell = this.heap.peek();
-      if (cell == null) {
-        close(false);// Do all cleanup except heap.close()
-        return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
-      }
+    Cell cell = this.heap.peek();
+    if (cell == null) {
+      close(false);// Do all cleanup except heap.close()
+      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
+    }
 
-      // only call setRow if the row changes; avoids confusing the query matcher
-      // if scanning intra-row
+    // only call setRow if the row changes; avoids confusing the query matcher
+    // if scanning intra-row
 
-      // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
-      // rows. Else it is possible we are still traversing the same row so we must perform the row
-      // comparison.
-      if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.curCell == null ||
-          !CellUtil.matchingRow(cell, matcher.curCell)) {
-        this.countPerRow = 0;
-        matcher.setToNewRow(cell);
-      }
+    // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
+    // rows. Else it is possible we are still traversing the same row so we must perform the row
+    // comparison.
+    if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.curCell == null
+        || !CellUtil.matchingRow(cell, matcher.curCell)) {
+      this.countPerRow = 0;
+      matcher.setToNewRow(cell);
+    }
 
-      // Clear progress away unless invoker has indicated it should be kept.
-      if (!scannerContext.getKeepProgress()) scannerContext.clearProgress();
+    // Clear progress away unless invoker has indicated it should be kept.
+    if (!scannerContext.getKeepProgress()) scannerContext.clearProgress();
 
-      // Only do a sanity-check if store and comparator are available.
-      CellComparator comparator = store != null ? store.getComparator() : null;
+    // Only do a sanity-check if store and comparator are available.
+    CellComparator comparator = store != null ? store.getComparator() : null;
 
-      int count = 0;
-      long totalBytesRead = 0;
+    int count = 0;
+    long totalBytesRead = 0;
 
-      LOOP: do {
-        // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
-        if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
-          scannerContext.updateTimeProgress();
-          if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
-            return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
-          }
+    LOOP: do {
+      // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
+      if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
+        scannerContext.updateTimeProgress();
+        if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
+          return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
         }
+      }
 
-        if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
-        checkScanOrder(prevCell, cell, comparator);
-        prevCell = cell;
+      if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
+      checkScanOrder(prevCell, cell, comparator);
+      prevCell = cell;
 
-        ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
-        qcode = optimize(qcode, cell);
-        switch(qcode) {
-        case INCLUDE:
-        case INCLUDE_AND_SEEK_NEXT_ROW:
-        case INCLUDE_AND_SEEK_NEXT_COL:
+      ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
+      qcode = optimize(qcode, cell);
+      switch (qcode) {
+      case INCLUDE:
+      case INCLUDE_AND_SEEK_NEXT_ROW:
+      case INCLUDE_AND_SEEK_NEXT_COL:
 
-          Filter f = matcher.getFilter();
-          if (f != null) {
-            cell = f.transformCell(cell);
-          }
+        Filter f = matcher.getFilter();
+        if (f != null) {
+          cell = f.transformCell(cell);
+        }
 
-          this.countPerRow++;
-          if (storeLimit > -1 &&
-              this.countPerRow > (storeLimit + storeOffset)) {
-            // do what SEEK_NEXT_ROW does.
-            if (!matcher.moreRowsMayExistAfter(cell)) {
-              return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
-            }
-            seekToNextRow(cell);
-            break LOOP;
+        this.countPerRow++;
+        if (storeLimit > -1 && this.countPerRow > (storeLimit + storeOffset)) {
+          // do what SEEK_NEXT_ROW does.
+          if (!matcher.moreRowsMayExistAfter(cell)) {
+            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
           }
+          seekToNextRow(cell);
+          break LOOP;
+        }
 
-          // add to results only if we have skipped #storeOffset kvs
-          // also update metric accordingly
-          if (this.countPerRow > storeOffset) {
-            outResult.add(cell);
+        // add to results only if we have skipped #storeOffset kvs
+        // also update metric accordingly
+        if (this.countPerRow > storeOffset) {
+          outResult.add(cell);
 
-            // Update local tracking information
-            count++;
-            totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell);
+          // Update local tracking information
+          count++;
+          totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell);
 
-            // Update the progress of the scanner context
-            scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOf(cell));
-            scannerContext.incrementBatchProgress(1);
+          // Update the progress of the scanner context
+          scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOf(cell));
+          scannerContext.incrementBatchProgress(1);
 
-            if (totalBytesRead > maxRowSize) {
-              throw new RowTooBigException("Max row size allowed: " + maxRowSize
-                  + ", but the row is bigger than that.");
-            }
+          if (totalBytesRead > maxRowSize) {
+            throw new RowTooBigException(
+                "Max row size allowed: " + maxRowSize + ", but the row is bigger than that.");
           }
+        }
 
-          if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
-            if (!matcher.moreRowsMayExistAfter(cell)) {
-              return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
-            }
-            seekToNextRow(cell);
-          } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
-            seekAsDirection(matcher.getKeyForNextColumn(cell));
-          } else {
-            this.heap.next();
+        if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
+          if (!matcher.moreRowsMayExistAfter(cell)) {
+            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
           }
+          seekToNextRow(cell);
+        } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
+          seekAsDirection(matcher.getKeyForNextColumn(cell));
+        } else {
+          this.heap.next();
+        }
 
-          if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
-            break LOOP;
-          }
-          if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
-            break LOOP;
-          }
-          continue;
+        if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
+          break LOOP;
+        }
+        if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
+          break LOOP;
+        }
+        continue;
 
-        case DONE:
-          return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
+      case DONE:
+        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
 
-        case DONE_SCAN:
-          close(false);// Do all cleanup except heap.close()
-          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
+      case DONE_SCAN:
+        close(false);// Do all cleanup except heap.close()
+        return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
 
-        case SEEK_NEXT_ROW:
-          // This is just a relatively simple end of scan fix, to short-cut end
-          // us if there is an endKey in the scan.
-          if (!matcher.moreRowsMayExistAfter(cell)) {
-            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
-          }
+      case SEEK_NEXT_ROW:
+        // This is just a relatively simple end of scan fix, to short-cut end
+        // us if there is an endKey in the scan.
+        if (!matcher.moreRowsMayExistAfter(cell)) {
+          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
+        }
 
-          seekToNextRow(cell);
-          break;
+        seekToNextRow(cell);
+        break;
 
-        case SEEK_NEXT_COL:
-          seekAsDirection(matcher.getKeyForNextColumn(cell));
-          break;
+      case SEEK_NEXT_COL:
+        seekAsDirection(matcher.getKeyForNextColumn(cell));
+        break;
 
-        case SKIP:
-          this.heap.next();
-          break;
-
-        case SEEK_NEXT_USING_HINT:
-          Cell nextKV = matcher.getNextKeyHint(cell);
-          if (nextKV != null) {
-            seekAsDirection(nextKV);
-          } else {
-            heap.next();
-          }
-          break;
+      case SKIP:
+        this.heap.next();
+        break;
 
-        default:
-          throw new RuntimeException("UNEXPECTED");
+      case SEEK_NEXT_USING_HINT:
+        Cell nextKV = matcher.getNextKeyHint(cell);
+        if (nextKV != null) {
+          seekAsDirection(nextKV);
+        } else {
+          heap.next();
         }
-      } while((cell = this.heap.peek()) != null);
+        break;
 
-      if (count > 0) {
-        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
+      default:
+        throw new RuntimeException("UNEXPECTED");
       }
+    } while ((cell = this.heap.peek()) != null);
 
-      // No more keys
-      close(false);// Do all cleanup except heap.close()
-      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
-    } finally {
-      lock.unlock();
+    if (count > 0) {
+      return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
     }
+
+    // No more keys
+    close(false);// Do all cleanup except heap.close()
+    return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
   }
 
   /*
@@ -684,30 +666,26 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   // Implementation of ChangedReadersObserver
   @Override
   public void updateReaders() throws IOException {
-    lock.lock();
-    try {
-    if (this.closing) return;
+    flushed = true;
+    // Let the next() call handle re-creating and seeking
+  }
 
+  protected void nullifyCurrentHeap() {
+    if (this.closing) return;
     // All public synchronized API calls will call 'checkReseek' which will cause
     // the scanner stack to reseek if this.heap==null && this.lastTop != null.
     // But if two calls to updateReaders() happen without a 'next' or 'peek' then we
     // will end up calling this.peek() which would cause a reseek in the middle of a updateReaders
     // which is NOT what we want, not to mention could cause an NPE. So we early out here.
     if (this.heap == null) return;
-
     // this could be null.
-    this.lastTop = this.peek();
+    this.lastTop = this.heap.peek();
 
     //DebugPrint.println("SS updateReaders, topKey = " + lastTop);
 
     // close scanners to old obsolete Store files
     this.heapsForDelayedClose.add(this.heap);// Don't close now. Delay it till StoreScanner#close
     this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
-
-    // Let the next() call handle re-creating and seeking
-    } finally {
-      lock.unlock();
-    }
   }
 
   /**
@@ -793,18 +771,33 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
 
   @Override
   public boolean reseek(Cell kv) throws IOException {
-    lock.lock();
-    try {
-    //Heap will not be null, if this is called from next() which.
-    //If called from RegionScanner.reseek(...) make sure the scanner
-    //stack is reset if needed.
+    checkResetHeap();
+    // Heap will not be null, if this is called from next() which.
+    // If called from RegionScanner.reseek(...) make sure the scanner
+    // stack is reset if needed.
     checkReseek();
     if (explicitColumnQuery && lazySeekEnabledGlobally) {
       return heap.requestSeek(kv, true, useRowColBloom);
     }
     return heap.reseek(kv);
-    } finally {
-      lock.unlock();
+  }
+
+  protected void checkResetHeap() {
+    // check the var without any lock. Suppose even if we see the old
+    // value here still it is ok to continue because we will not be resetting
+    // the heap but will continue with the referenced memstore's snapshot. For compactions
+    // any way we don't need the updateReaders at all to happen as we still continue with 
+    // the older files
+    if (flushed) {
+      // If the 'flushed' is found to be true then there is a need to ensure
+      // that the current scanner updates the heap that it has and then proceed
+      // with the scan and ensure to reset the flushed inside the lock
+      // One thing can be sure that the same store scanner cannot be in reseek and
+      // next at the same time ie. within the same store scanner it is always single
+      // threaded
+      nullifyCurrentHeap();
+      // reset the flag
+      flushed = false;
     }
   }
 
@@ -883,17 +876,12 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
 
   @Override
   public void shipped() throws IOException {
-    lock.lock();
-    try {
-      for (KeyValueHeap h : this.heapsForDelayedClose) {
-        h.close();// There wont be further fetch of Cells from these scanners. Just close.
-      }
-      this.heapsForDelayedClose.clear();
-      if (this.heap != null) {
-        this.heap.shipped();
-      }
-    } finally {
-      lock.unlock();
+    for (KeyValueHeap h : this.heapsForDelayedClose) {
+      h.close();// There wont be further fetch of Cells from these scanners. Just close.
+    }
+    this.heapsForDelayedClose.clear();
+    if (this.heap != null) {
+      this.heap.shipped();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
index bb49aba..ef2c282 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
@@ -106,6 +106,7 @@ public class StripeStoreFileManager
 
     /** Cached list of all files in the structure, to return from some calls */
     public ImmutableList<StoreFile> allFilesCached = ImmutableList.<StoreFile>of();
+    private ImmutableList<StoreFile> allCompactedFilesCached = ImmutableList.<StoreFile>of();
   }
   private State state = null;
 
@@ -141,8 +142,14 @@ public class StripeStoreFileManager
   }
 
   @Override
+  public Collection<StoreFile> getCompactedfiles() {
+    return state.allCompactedFilesCached;
+  }
+
+  @Override
   public void insertNewFiles(Collection<StoreFile> sfs) throws IOException {
     CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true);
+    // Passing null does not cause NPE??
     cmc.mergeResults(null, sfs);
     debugDumpState("Added new files");
   }
@@ -157,6 +164,13 @@ public class StripeStoreFileManager
   }
 
   @Override
+  public ImmutableCollection<StoreFile> clearCompactedFiles() {
+    ImmutableCollection<StoreFile> result = state.allCompactedFilesCached;
+    this.state = new State();
+    return result;
+  }
+
+  @Override
   public int getStorefileCount() {
     return state.allFilesCached.size();
   }
@@ -306,9 +320,31 @@ public class StripeStoreFileManager
     // copies and apply the result at the end.
     CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false);
     cmc.mergeResults(compactedFiles, results);
+    markCompactedAway(compactedFiles);
     debugDumpState("Merged compaction results");
   }
 
+  // Mark the files as compactedAway once the storefiles and compactedfiles list is finalised
+  // Let a background thread close the actual reader on these compacted files and also
+  // ensure to evict the blocks from block cache so that they are no longer in
+  // cache
+  private void markCompactedAway(Collection<StoreFile> compactedFiles) {
+    for (StoreFile file : compactedFiles) {
+      file.markCompactedAway();
+    }
+  }
+
+  @Override
+  public void removeCompactedFiles(Collection<StoreFile> compactedFiles) throws IOException {
+    // See class comment for the assumptions we make here.
+    LOG.debug("Attempting to delete compaction results: " + compactedFiles.size());
+    // In order to be able to fail in the middle of the operation, we'll operate on lazy
+    // copies and apply the result at the end.
+    CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false);
+    cmc.deleteResults(compactedFiles);
+    debugDumpState("Deleted compaction results");
+  }
+
   @Override
   public int getStoreCompactionPriority() {
     // If there's only L0, do what the default store does.
@@ -684,7 +720,7 @@ public class StripeStoreFileManager
       this.isFlush = isFlush;
     }
 
-    public void mergeResults(Collection<StoreFile> compactedFiles, Collection<StoreFile> results)
+    private void mergeResults(Collection<StoreFile> compactedFiles, Collection<StoreFile> results)
         throws IOException {
       assert this.compactedFiles == null && this.results == null;
       this.compactedFiles = compactedFiles;
@@ -696,12 +732,20 @@ public class StripeStoreFileManager
         processNewCandidateStripes(newStripes);
       }
       // Create new state and update parent.
-      State state = createNewState();
+      State state = createNewState(false);
       StripeStoreFileManager.this.state = state;
       updateMetadataMaps();
     }
 
-    private State createNewState() {
+    private void deleteResults(Collection<StoreFile> compactedFiles) throws IOException {
+      this.compactedFiles = compactedFiles;
+      // Create new state and update parent.
+      State state = createNewState(true);
+      StripeStoreFileManager.this.state = state;
+      updateMetadataMaps();
+    }
+
+    private State createNewState(boolean delCompactedFiles) {
       State oldState = StripeStoreFileManager.this.state;
       // Stripe count should be the same unless the end rows changed.
       assert oldState.stripeFiles.size() == this.stripeFiles.size() || this.stripeEndRows != null;
@@ -717,9 +761,21 @@ public class StripeStoreFileManager
       }
 
       List<StoreFile> newAllFiles = new ArrayList<StoreFile>(oldState.allFilesCached);
-      if (!isFlush) newAllFiles.removeAll(compactedFiles);
-      newAllFiles.addAll(results);
+      List<StoreFile> newAllCompactedFiles =
+          new ArrayList<StoreFile>(oldState.allCompactedFilesCached);
+      if (!isFlush) {
+        newAllFiles.removeAll(compactedFiles);
+        if (delCompactedFiles) {
+          newAllCompactedFiles.removeAll(compactedFiles);
+        } else {
+          newAllCompactedFiles.addAll(compactedFiles);
+        }
+      }
+      if (results != null) {
+        newAllFiles.addAll(results);
+      }
       newState.allFilesCached = ImmutableList.copyOf(newAllFiles);
+      newState.allCompactedFilesCached = ImmutableList.copyOf(newAllCompactedFiles);
       return newState;
     }
 
@@ -970,14 +1026,16 @@ public class StripeStoreFileManager
     // Order by seqnum is reversed.
     for (int i = 1; i < stripe.size(); ++i) {
       StoreFile sf = stripe.get(i);
-      long fileTs = sf.getReader().getMaxTimestamp();
-      if (fileTs < maxTs && !filesCompacting.contains(sf)) {
-        LOG.info("Found an expired store file: " + sf.getPath()
-            + " whose maxTimeStamp is " + fileTs + ", which is below " + maxTs);
-        if (expiredStoreFiles == null) {
-          expiredStoreFiles = new ArrayList<StoreFile>();
+      synchronized (sf) {
+        long fileTs = sf.getReader().getMaxTimestamp();
+        if (fileTs < maxTs && !filesCompacting.contains(sf)) {
+          LOG.info("Found an expired store file: " + sf.getPath() + " whose maxTimeStamp is "
+              + fileTs + ", which is below " + maxTs);
+          if (expiredStoreFiles == null) {
+            expiredStoreFiles = new ArrayList<StoreFile>();
+          }
+          expiredStoreFiles.add(sf);
         }
-        expiredStoreFiles.add(sf);
       }
     }
     return expiredStoreFiles;

http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactedHFilesDischarger.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactedHFilesDischarger.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactedHFilesDischarger.java
new file mode 100644
index 0000000..4cf120d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactedHFilesDischarger.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.Store;
+
+/**
+ * A chore service that periodically cleans up the compacted files when there are no active readers
+ * using those compacted files and also helps in clearing the block cache with these compacted
+ * file entries
+ */
+@InterfaceAudience.Private
+public class CompactedHFilesDischarger extends ScheduledChore {
+  private static final Log LOG = LogFactory.getLog(CompactedHFilesDischarger.class);
+  private Region region;
+
+  /**
+   * @param period the period of time to sleep between each run
+   * @param stopper the stopper
+   * @param region the store to identify the family name
+   */
+  public CompactedHFilesDischarger(final int period, final Stoppable stopper, final Region region) {
+    // Need to add the config classes
+    super("CompactedHFilesCleaner", stopper, period);
+    this.region = region;
+  }
+
+  @Override
+  public void chore() {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(
+        "Started the compacted hfiles cleaner for the region " + this.region.getRegionInfo());
+    }
+    for (Store store : region.getStores()) {
+      try {
+        store.closeAndArchiveCompactedFiles();
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Completed archiving the compacted files for the region "
+              + this.region.getRegionInfo() + " under the store " + store.getColumnFamilyName());
+        }
+      } catch (Exception e) {
+        LOG.error(
+          "Exception while trying to close and archive the comapcted store files of the store  "
+              + store.getColumnFamilyName() + " in the region " + this.region.getRegionInfo(),
+          e);
+      }
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(
+        "Completed the compacted hfiles cleaner for the region " + this.region.getRegionInfo());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index 304638a..94a63d8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -198,17 +198,6 @@ public class TestIOFencing {
     }
 
     @Override
-    protected void completeCompaction(final Collection<StoreFile> compactedFiles,
-        boolean removeFiles) throws IOException {
-      try {
-        r.compactionsWaiting.countDown();
-        r.compactionsBlocked.await();
-      } catch (InterruptedException ex) {
-        throw new IOException(ex);
-      }
-      super.completeCompaction(compactedFiles, removeFiles);
-    }
-    @Override
     protected void completeCompaction(Collection<StoreFile> compactedFiles) throws IOException {
       try {
         r.compactionsWaiting.countDown();

http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java
index a28112d..55e43de 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java
@@ -42,8 +42,10 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -170,10 +172,11 @@ public class TestZooKeeperTableArchiveClient {
 
     // create the region
     HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM);
-    Region region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
-
+    HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
+    final CompactedHFilesDischarger compactionCleaner =
+        new CompactedHFilesDischarger(100, stop, region);
     loadFlushAndCompact(region, TEST_FAM);
-
+    compactionCleaner.chore();
     // get the current hfiles in the archive directory
     List<Path> files = getAllFiles(fs, archiveDir);
     if (files == null) {
@@ -217,18 +220,22 @@ public class TestZooKeeperTableArchiveClient {
     HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop);
     List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner);
     final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
-
     // create the region
     HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM);
-    Region region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
+    HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
+    final CompactedHFilesDischarger compactionCleaner =
+        new CompactedHFilesDischarger(100, stop, region);
     loadFlushAndCompact(region, TEST_FAM);
-
+    compactionCleaner.chore();
     // create the another table that we don't archive
     hcd = new HColumnDescriptor(TEST_FAM);
-    Region otherRegion = UTIL.createTestRegion(otherTable, hcd);
+    HRegion otherRegion = UTIL.createTestRegion(otherTable, hcd);
+    final CompactedHFilesDischarger compactionCleaner1 =
+        new CompactedHFilesDischarger(100, stop, otherRegion);
     loadFlushAndCompact(otherRegion, TEST_FAM);
-
+    compactionCleaner1.chore();
     // get the current hfiles in the archive directory
+    // Should  be archived
     List<Path> files = getAllFiles(fs, archiveDir);
     if (files == null) {
       FSUtils.logFileSystemState(fs, archiveDir, LOG);

http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
index f6ade32..4f30960 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
@@ -76,7 +76,7 @@ public class TestHeapSize  {
     LOG.info("name=" + b.getName()); 
     LOG.info("specname=" + b.getSpecName()); 
     LOG.info("specvendor=" + b.getSpecVendor()); 
-    LOG.info("vmname=" + b.getVmName()); 
+    LOG.info("vmname=" + b.getVmName());
     LOG.info("vmversion=" + b.getVmVersion()); 
     LOG.info("vmvendor=" + b.getVmVendor()); 
     Map<String, String> p = b.getSystemProperties();

http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
index 9ff88f0..337eeac 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
@@ -29,6 +29,7 @@ import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
@@ -74,6 +75,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -95,6 +97,8 @@ import org.junit.experimental.categories.Category;
 import org.junit.rules.TestRule;
 import org.mockito.Mockito;
 
+import com.google.common.collect.Lists;
+
 /**
  * Simple test for {@link CellSortReducer} and {@link HFileOutputFormat2}.
  * Sets up and runs a mapreduce job that writes hfile output.
@@ -1002,6 +1006,12 @@ public class TestHFileOutputFormat2  {
         quickPoll(new Callable<Boolean>() {
           @Override
           public Boolean call() throws Exception {
+            List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAME);
+            for (HRegion region : regions) {
+              for (Store store : region.getStores()) {
+                store.closeAndArchiveCompactedFiles();
+              }
+            }
             return fs.listStatus(storePath).length == 1;
           }
         }, 5000);
@@ -1015,6 +1025,12 @@ public class TestHFileOutputFormat2  {
       quickPoll(new Callable<Boolean>() {
         @Override
         public Boolean call() throws Exception {
+          List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAME);
+          for (HRegion region : regions) {
+            for (Store store : region.getStores()) {
+              store.closeAndArchiveCompactedFiles();
+            }
+          }
           return fs.listStatus(storePath).length == 1;
         }
       }, 5000);

http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
index 20b0642..60c5473 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
@@ -48,6 +48,9 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneReq
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
 import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
 import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
 import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
@@ -121,6 +124,7 @@ public class TestSnapshotFromMaster {
     conf.setLong(SnapshotHFileCleaner.HFILE_CACHE_REFRESH_PERIOD_CONF_KEY, cacheRefreshPeriod);
     conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
       ConstantSizeRegionSplitPolicy.class.getName());
+    conf.setInt("hbase.hfile.compactions.cleaner.interval", 20 * 1000);
 
   }
 
@@ -320,6 +324,10 @@ public class TestSnapshotFromMaster {
       region.waitForFlushesAndCompactions(); // enable can trigger a compaction, wait for it.
       region.compactStores(); // min is 2 so will compact and archive
     }
+    for (HRegion region : regions) {
+      CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, region);
+      cleaner.chore();
+    }
     LOG.info("After compaction File-System state");
     FSUtils.logFileSystemState(fs, rootDir, LOG);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java
index f99226f..3614846 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java
@@ -79,6 +79,11 @@ public class MockStoreFile extends StoreFile {
   }
 
   @Override
+  public boolean isCompactedAway() {
+    return false;
+  }
+
+  @Override
   public byte[] getMetadataValue(byte[] key) {
     return this.metadata.get(key);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java
index 6c66c6d..82be1db 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java
@@ -19,9 +19,11 @@ package org.apache.hadoop.hbase.regionserver;
 
 import static org.junit.Assert.*;
 
+import java.io.IOException;
 import java.security.Key;
 import java.security.SecureRandom;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 
 import javax.crypto.spec.SecretKeySpec;
@@ -123,13 +125,14 @@ public class TestEncryptionKeyRotation {
 
     // And major compact
     TEST_UTIL.getHBaseAdmin().majorCompact(htd.getTableName());
+    final List<Path> updatePaths = findCompactedStorefilePaths(htd.getTableName());
     TEST_UTIL.waitFor(30000, 1000, true, new Predicate<Exception>() {
       @Override
       public boolean evaluate() throws Exception {
         // When compaction has finished, all of the original files will be
         // gone
         boolean found = false;
-        for (Path path: initialPaths) {
+        for (Path path: updatePaths) {
           found = TEST_UTIL.getTestFileSystem().exists(path);
           if (found) {
             LOG.info("Found " + path);
@@ -141,14 +144,20 @@ public class TestEncryptionKeyRotation {
     });
 
     // Verify we have store file(s) with only the new key
+    Thread.sleep(1000);
+    waitForCompaction(htd.getTableName());
     List<Path> pathsAfterCompaction = findStorefilePaths(htd.getTableName());
     assertTrue(pathsAfterCompaction.size() > 0);
     for (Path path: pathsAfterCompaction) {
-      assertFalse("Store file " + path + " retains initial key",
-        Bytes.equals(initialCFKey.getEncoded(), extractHFileKey(path)));
       assertTrue("Store file " + path + " has incorrect key",
         Bytes.equals(secondCFKey.getEncoded(), extractHFileKey(path)));
     }
+    List<Path> compactedPaths = findCompactedStorefilePaths(htd.getTableName());
+    assertTrue(compactedPaths.size() > 0);
+    for (Path path: compactedPaths) {
+      assertTrue("Store file " + path + " retains initial key",
+        Bytes.equals(initialCFKey.getEncoded(), extractHFileKey(path)));
+    }
   }
 
   @Test
@@ -194,6 +203,33 @@ public class TestEncryptionKeyRotation {
     }
   }
 
+  private static void waitForCompaction(TableName tableName)
+      throws IOException, InterruptedException {
+    boolean compacted = false;
+    for (Region region : TEST_UTIL.getRSForFirstRegionInTable(tableName)
+        .getOnlineRegions(tableName)) {
+      for (Store store : region.getStores()) {
+        compacted = false;
+        while (!compacted) {
+          if (store.getStorefiles() != null) {
+            while (store.getStorefilesCount() != 1) {
+              Thread.sleep(100);
+            }
+            for (StoreFile storefile : store.getStorefiles()) {
+              if (!storefile.isCompactedAway()) {
+                compacted = true;
+                break;
+              }
+              Thread.sleep(100);
+            }
+          } else {
+            break;
+          }
+        }
+      }
+    }
+  }
+  
   private static List<Path> findStorefilePaths(TableName tableName) throws Exception {
     List<Path> paths = new ArrayList<Path>();
     for (Region region:
@@ -207,6 +243,23 @@ public class TestEncryptionKeyRotation {
     return paths;
   }
 
+  private static List<Path> findCompactedStorefilePaths(TableName tableName) throws Exception {
+    List<Path> paths = new ArrayList<Path>();
+    for (Region region:
+        TEST_UTIL.getRSForFirstRegionInTable(tableName).getOnlineRegions(tableName)) {
+      for (Store store : region.getStores()) {
+        Collection<StoreFile> compactedfiles =
+            ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
+        if (compactedfiles != null) {
+          for (StoreFile storefile : compactedfiles) {
+            paths.add(storefile.getPath());
+          }
+        }
+      }
+    }
+    return paths;
+  }
+
   private void createTableAndFlush(HTableDescriptor htd) throws Exception {
     HColumnDescriptor hcd = htd.getFamilies().iterator().next();
     // Create the test table

http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
index 0c2e01c..c59d6f7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
@@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescripto
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
 import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
 import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
 import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -1369,6 +1370,8 @@ public class TestHRegionReplayEvents {
 
     // Test case 3: compact primary files
     primaryRegion.compactStores();
+    CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, primaryRegion);
+    cleaner.chore();
     secondaryRegion.refreshStoreFiles();
     assertPathListsEqual(primaryRegion.getStoreFileList(families),
       secondaryRegion.getStoreFileList(families));

http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
index 1c99fe3..e0c1453 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -61,6 +62,7 @@ import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -212,7 +214,7 @@ public class TestRegionMergeTransactionOnCluster {
       List<Pair<HRegionInfo, ServerName>> tableRegions = MetaTableAccessor
           .getTableRegionsAndLocations(master.getConnection(), tableName);
       HRegionInfo mergedRegionInfo = tableRegions.get(0).getFirst();
-      HTableDescriptor tableDescritor = master.getTableDescriptors().get(
+      HTableDescriptor tableDescriptor = master.getTableDescriptors().get(
           tableName);
       Result mergedRegionResult = MetaTableAccessor.getRegionResult(
         master.getConnection(), mergedRegionInfo.getRegionName());
@@ -236,19 +238,34 @@ public class TestRegionMergeTransactionOnCluster {
       assertTrue(fs.exists(regionAdir));
       assertTrue(fs.exists(regionBdir));
 
+      HColumnDescriptor[] columnFamilies = tableDescriptor.getColumnFamilies();
+      HRegionFileSystem hrfs = new HRegionFileSystem(
+        TEST_UTIL.getConfiguration(), fs, tabledir, mergedRegionInfo);
+      int count = 0;
+      for(HColumnDescriptor colFamily : columnFamilies) {
+        count += hrfs.getStoreFiles(colFamily.getName()).size();
+      }
       admin.compactRegion(mergedRegionInfo.getRegionName());
       // wait until merged region doesn't have reference file
       long timeout = System.currentTimeMillis() + waitTime;
-      HRegionFileSystem hrfs = new HRegionFileSystem(
-          TEST_UTIL.getConfiguration(), fs, tabledir, mergedRegionInfo);
       while (System.currentTimeMillis() < timeout) {
-        if (!hrfs.hasReferences(tableDescritor)) {
+        if (!hrfs.hasReferences(tableDescriptor)) {
           break;
         }
         Thread.sleep(50);
       }
-      assertFalse(hrfs.hasReferences(tableDescritor));
-
+      int newcount = 0;
+      for(HColumnDescriptor colFamily : columnFamilies) {
+        newcount += hrfs.getStoreFiles(colFamily.getName()).size();
+      }
+      assertTrue(newcount > count);
+      // clean up the merged region store files
+      List<HRegion> regions = 
+          TEST_UTIL.getHBaseCluster().getRegions(tableDescriptor.getName());
+      for (HRegion region : regions) {
+        CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, region);
+        cleaner.chore();
+      }
       // run CatalogJanitor to clean merge references in hbase:meta and archive the
       // files of merging regions
       int cleaned = admin.runCatalogScan();