You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2016/01/14 18:08:50 UTC

[36/50] [abbrv] hbase git commit: HBASE-15027 Refactor the way the CompactedHFileDischarger threads are created (Ram)

HBASE-15027 Refactor the way the CompactedHFileDischarger threads are
created (Ram)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/28c2b18d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/28c2b18d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/28c2b18d

Branch: refs/heads/trunk
Commit: 28c2b18d30de4ce9564e328e5fdf42188e83fb63
Parents: f3ee6df
Author: ramkrishna <ra...@gmail.com>
Authored: Fri Jan 8 11:18:39 2016 +0530
Committer: ramkrishna <ra...@gmail.com>
Committed: Fri Jan 8 11:18:39 2016 +0530

----------------------------------------------------------------------
 .../apache/hadoop/hbase/executor/EventType.java |   9 +-
 .../hadoop/hbase/executor/ExecutorType.java     |   3 +-
 .../hadoop/hbase/executor/ExecutorService.java  |   4 +-
 .../CompactedHFilesDischargeHandler.java        |  45 ++++++
 .../regionserver/CompactedHFilesDischarger.java | 112 ++++++++++++++
 .../hadoop/hbase/regionserver/HRegion.java      |  16 --
 .../hbase/regionserver/HRegionServer.java       |  26 +++-
 .../hadoop/hbase/regionserver/HStore.java       | 149 ++++++++-----------
 .../hbase/regionserver/OnlineRegions.java       |   6 +
 .../compactions/CompactedHFilesDischarger.java  |  74 ---------
 .../compactions/CompactionConfiguration.java    |   3 +
 .../hadoop/hbase/MockRegionServerServices.java  |   5 +
 .../TestZooKeeperTableArchiveClient.java        |  24 ++-
 .../hadoop/hbase/master/MockRegionServer.java   |   5 +
 .../master/cleaner/TestSnapshotFromMaster.java  |  19 ++-
 .../regionserver/TestHRegionReplayEvents.java   |  17 ++-
 .../TestRegionMergeTransactionOnCluster.java    |  36 +++--
 .../hbase/regionserver/TestRegionReplicas.java  |  15 +-
 .../TestCompactedHFilesDischarger.java          |  15 +-
 19 files changed, 368 insertions(+), 215 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/28c2b18d/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
index ac76edb..a7759c5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
@@ -265,7 +265,14 @@ public enum EventType {
    *
    * RS_REGION_REPLICA_FLUSH
    */
-  RS_REGION_REPLICA_FLUSH   (82, ExecutorType.RS_REGION_REPLICA_FLUSH_OPS);
+  RS_REGION_REPLICA_FLUSH   (82, ExecutorType.RS_REGION_REPLICA_FLUSH_OPS),
+
+  /**
+   * RS compacted files discharger <br>
+   *
+   * RS_COMPACTED_FILES_DISCHARGER
+   */
+  RS_COMPACTED_FILES_DISCHARGER (83, ExecutorType.RS_COMPACTED_FILES_DISCHARGER);
 
   private final int code;
   private final ExecutorType executor;

http://git-wip-us.apache.org/repos/asf/hbase/blob/28c2b18d/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
index d0f6bee..5a16149 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
@@ -46,7 +46,8 @@ public enum ExecutorType {
   RS_CLOSE_META              (25),
   RS_PARALLEL_SEEK           (26),
   RS_LOG_REPLAY_OPS          (27),
-  RS_REGION_REPLICA_FLUSH_OPS  (28);
+  RS_REGION_REPLICA_FLUSH_OPS  (28),
+  RS_COMPACTED_FILES_DISCHARGER (29);
 
   ExecutorType(int value) {}
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/28c2b18d/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
index 410fb39..335b672 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
@@ -37,6 +37,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.monitoring.ThreadMonitoring;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -78,7 +79,8 @@ public class ExecutorService {
    * started with the same name, this throws a RuntimeException.
    * @param name Name of the service to start.
    */
-  void startExecutorService(String name, int maxThreads) {
+  @VisibleForTesting
+  public void startExecutorService(String name, int maxThreads) {
     if (this.executorMap.get(name) != null) {
       throw new RuntimeException("An executor service with the name " + name +
         " is already running!");

http://git-wip-us.apache.org/repos/asf/hbase/blob/28c2b18d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischargeHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischargeHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischargeHandler.java
new file mode 100644
index 0000000..02160d8
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischargeHandler.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.executor.EventHandler;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.regionserver.HStore;
+
+/**
+ * Event handler that handles the removal and archival of the compacted hfiles
+ */
+@InterfaceAudience.Private
+public class CompactedHFilesDischargeHandler extends EventHandler {
+
+  private HStore store;
+
+  public CompactedHFilesDischargeHandler(Server server, EventType eventType, HStore store) {
+    super(server, eventType);
+    this.store = store;
+  }
+
+  @Override
+  public void process() throws IOException {
+    this.store.closeAndArchiveCompactedFiles();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/28c2b18d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java
new file mode 100644
index 0000000..c4974cf
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.Store;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A chore service that periodically cleans up the compacted files when there are no active readers
+ * using those compacted files and also helps in clearing the block cache with these compacted
+ * file entries
+ */
+@InterfaceAudience.Private
+public class CompactedHFilesDischarger extends ScheduledChore {
+  private static final Log LOG = LogFactory.getLog(CompactedHFilesDischarger.class);
+  private RegionServerServices regionServerServices;
+  // Default is to use executor
+  @VisibleForTesting
+  private boolean useExecutor = true;
+
+  /**
+   * @param period the period of time to sleep between each run
+   * @param stopper the stopper
+   * @param regionServerServices the region server that starts this chore
+   */
+  public CompactedHFilesDischarger(final int period, final Stoppable stopper,
+      final RegionServerServices regionServerServices) {
+    // Need to add the config classes
+    super("CompactedHFilesCleaner", stopper, period);
+    this.regionServerServices = regionServerServices;
+  }
+
+  /**
+   * @param period the period of time to sleep between each run
+   * @param stopper the stopper
+   * @param regionServerServices the region server that starts this chore
+   * @param useExecutor true if to use the region server's executor service, false otherwise
+   */
+  @VisibleForTesting
+  public CompactedHFilesDischarger(final int period, final Stoppable stopper,
+      final RegionServerServices regionServerServices, boolean useExecutor) {
+    // Need to add the config classes
+    this(period, stopper, regionServerServices);
+    this.useExecutor = useExecutor;
+  }
+
+  @Override
+  public void chore() {
+    List<Region> onlineRegions = regionServerServices.getOnlineRegions();
+    if (onlineRegions != null) {
+      for (Region region : onlineRegions) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(
+              "Started the compacted hfiles cleaner for the region " + region.getRegionInfo());
+        }
+        for (Store store : region.getStores()) {
+          try {
+            if (useExecutor && regionServerServices != null) {
+              CompactedHFilesDischargeHandler handler = new CompactedHFilesDischargeHandler(
+                  (Server) regionServerServices, EventType.RS_COMPACTED_FILES_DISCHARGER,
+                  (HStore) store);
+              regionServerServices.getExecutorService().submit(handler);
+            } else {
+              // call synchronously if the RegionServerServices are not
+              // available
+              store.closeAndArchiveCompactedFiles();
+            }
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Completed archiving the compacted files for the region "
+                  + region.getRegionInfo() + " under the store " + store.getColumnFamilyName());
+            }
+          } catch (Exception e) {
+            LOG.error("Exception while trying to close and archive the comapcted store "
+                + "files of the store  " + store.getColumnFamilyName() + " in the" + " region "
+                + region.getRegionInfo(), e);
+          }
+        }
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(
+              "Completed the compacted hfiles cleaner for the region " + region.getRegionInfo());
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/28c2b18d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 38404c7..d059cd0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -74,7 +74,6 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.ChoreService;
 import org.apache.hadoop.hbase.CompoundConfiguration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.DroppedSnapshotException;
@@ -151,7 +150,6 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescripto
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
 import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
@@ -814,20 +812,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     // Initialize all the HStores
     status.setStatus("Initializing all the Stores");
     long maxSeqId = initializeStores(reporter, status);
-    // Start the CompactedHFilesDischarger here. This chore helps to remove the compacted files
-    // that will no longer be used in reads.
-    if (this.getRegionServerServices() != null) {
-      ChoreService choreService = this.getRegionServerServices().getChoreService();
-      if (choreService != null) {
-        // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
-        // 2 mins so that compacted files can be archived before the TTLCleaner runs
-        int cleanerInterval =
-            conf.getInt("hbase.hfile.compactions.cleaner.interval", 2 * 60 * 1000);
-        this.compactedFileDischarger =
-            new CompactedHFilesDischarger(cleanerInterval, this.getRegionServerServices(), this);
-        choreService.scheduleChore(compactedFileDischarger);
-      }
-    }
     this.mvcc.advanceTo(maxSeqId);
     if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
       // Recover any edits if available.

http://git-wip-us.apache.org/repos/asf/hbase/blob/28c2b18d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 00046ba..b2cc78a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -134,6 +134,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Repor
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
 import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
@@ -484,6 +485,8 @@ public class HRegionServer extends HasThread implements RegionServerServices, La
    */
   protected final ConfigurationManager configurationManager;
 
+  private CompactedHFilesDischarger compactedFileDischarger;
+
   /**
    * Starts a HRegionServer at the default location.
    * @param conf
@@ -615,6 +618,16 @@ public class HRegionServer extends HasThread implements RegionServerServices, La
         }
       });
     }
+    // Create the CompactedFileDischarger chore service. This chore helps to
+    // remove the compacted files
+    // that will no longer be used in reads.
+    // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
+    // 2 mins so that compacted files can be archived before the TTLCleaner runs
+    int cleanerInterval =
+        conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
+    this.compactedFileDischarger =
+        new CompactedHFilesDischarger(cleanerInterval, (Stoppable)this, (RegionServerServices)this);
+    choreService.scheduleChore(compactedFileDischarger);
   }
 
   protected TableDescriptors getFsTableDescriptors() throws IOException {
@@ -1716,7 +1729,9 @@ public class HRegionServer extends HasThread implements RegionServerServices, La
     }
     this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
        "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
-
+    // Start the threads for compacted files discharger
+    this.service.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER,
+      conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10));
     if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
       this.service.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
         conf.getInt("hbase.regionserver.region.replica.flusher.threads",
@@ -2725,6 +2740,15 @@ public class HRegionServer extends HasThread implements RegionServerServices, La
      return tableRegions;
    }
 
+  @Override
+  public List<Region> getOnlineRegions() {
+    List<Region> allRegions = new ArrayList<Region>();
+    synchronized (this.onlineRegions) {
+      // Return a clone copy of the onlineRegions
+      allRegions.addAll(onlineRegions.values());
+    }
+    return allRegions;
+  }
   /**
    * Gets the online tables in this RS.
    * This method looks at the in-memory onlineRegions.

http://git-wip-us.apache.org/repos/asf/hbase/blob/28c2b18d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 8d66696..9ebdaee 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -37,9 +37,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -76,7 +74,6 @@ import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@@ -91,7 +88,6 @@ import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
-import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
 
@@ -139,8 +135,6 @@ public class HStore implements Store {
   static int closeCheckInterval = 0;
   private volatile long storeSize = 0L;
   private volatile long totalUncompressedBytes = 0L;
-  private ThreadPoolExecutor compactionCleanerthreadPoolExecutor = null;
-  private CompletionService<StoreFile> completionService = null;
 
   /**
    * RWLock for store operations.
@@ -274,10 +268,6 @@ public class HStore implements Store {
           "hbase.hstore.flush.retries.number must be > 0, not "
               + flushRetriesNumber);
     }
-    compactionCleanerthreadPoolExecutor = getThreadPoolExecutor(
-      conf.getInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 10));
-    completionService =
-        new ExecutorCompletionService<StoreFile>(compactionCleanerthreadPoolExecutor);
     cryptoContext = EncryptionUtil.createEncryptionContext(conf, family);
   }
 
@@ -802,7 +792,9 @@ public class HStore implements Store {
       Collection<StoreFile> compactedfiles =
           storeEngine.getStoreFileManager().clearCompactedFiles();
       // clear the compacted files
-      removeCompactedFiles(compactedfiles);
+      if (compactedfiles != null && !compactedfiles.isEmpty()) {
+        removeCompactedfiles(compactedfiles);
+      }
       if (!result.isEmpty()) {
         // initialize the thread pool for closing store files in parallel.
         ThreadPoolExecutor storeFileCloserThreadPool = this.region
@@ -844,9 +836,6 @@ public class HStore implements Store {
         }
         if (ioe != null) throw ioe;
       }
-      if (compactionCleanerthreadPoolExecutor != null) {
-        compactionCleanerthreadPoolExecutor.shutdownNow();
-      }
       LOG.info("Closed " + this);
       return result;
     } finally {
@@ -2174,7 +2163,7 @@ public class HStore implements Store {
   }
 
   public static final long FIXED_OVERHEAD =
-      ClassSize.align(ClassSize.OBJECT + (18 * ClassSize.REFERENCE) + (10 * Bytes.SIZEOF_LONG)
+      ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (10 * Bytes.SIZEOF_LONG)
               + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
 
   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
@@ -2311,92 +2300,72 @@ public class HStore implements Store {
     } finally {
       lock.readLock().unlock();
     }
-    removeCompactedFiles(copyCompactedfiles);
-  }
-
-  private ThreadPoolExecutor getThreadPoolExecutor(int maxThreads) {
-    return Threads.getBoundedCachedThreadPool(maxThreads, maxThreads * 3, TimeUnit.SECONDS,
-      new ThreadFactory() {
-        private int count = 1;
-
-        @Override
-        public Thread newThread(Runnable r) {
-          return new Thread(r, "CompactedfilesArchiver-" + count++);
-        }
-      });
+    if (copyCompactedfiles != null && !copyCompactedfiles.isEmpty()) {
+      removeCompactedfiles(copyCompactedfiles);
+    }
   }
 
-  private void removeCompactedFiles(Collection<StoreFile> compactedfiles) throws IOException {
-    if (compactedfiles != null && !compactedfiles.isEmpty()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Removing the compacted store files " + compactedfiles);
-      }
-      for (final StoreFile file : compactedfiles) {
-        completionService.submit(new Callable<StoreFile>() {
-          @Override
-          public StoreFile call() throws IOException {
-            synchronized (file) {
-              try {
-                StoreFile.Reader r = file.getReader();
-                if (r == null) {
-                  if (LOG.isDebugEnabled()) {
-                    LOG.debug("The file " + file + " was closed but still not archived.");
-                  }
-                  return file;
-                }
-                if (r != null && r.isCompactedAway() && !r.isReferencedInReads()) {
-                  // Even if deleting fails we need not bother as any new scanners won't be
-                  // able to use the compacted file as the status is already compactedAway
-                  if (LOG.isTraceEnabled()) {
-                    LOG.trace("Closing and archiving the file " + file.getPath());
-                  }
-                  r.close(true);
-                  // Just close and return
-                  return file;
-                }
-              } catch (Exception e) {
-                LOG.error("Exception while trying to close the compacted store file "
-                    + file.getPath().getName());
-              }
+  /**
+   * Archives and removes the compacted files
+   * @param compactedfiles The compacted files in this store that are not active in reads
+   * @throws IOException
+   */
+  private void removeCompactedfiles(Collection<StoreFile> compactedfiles)
+      throws IOException {
+    final List<StoreFile> filesToRemove = new ArrayList<StoreFile>(compactedfiles.size());
+    for (final StoreFile file : compactedfiles) {
+      synchronized (file) {
+        try {
+          StoreFile.Reader r = file.getReader();
+          if (r == null) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("The file " + file + " was closed but still not archived.");
             }
-            return null;
+            filesToRemove.add(file);
           }
-        });
-      }
-      final List<StoreFile> filesToRemove = new ArrayList<StoreFile>(compactedfiles.size());
-      try {
-        for (final StoreFile file : compactedfiles) {
-          Future<StoreFile> future = completionService.take();
-          StoreFile closedFile = future.get();
-          if (closedFile != null) {
-            filesToRemove.add(closedFile);
+          if (r != null && r.isCompactedAway() && !r.isReferencedInReads()) {
+            // Even if deleting fails we need not bother as any new scanners won't be
+            // able to use the compacted file as the status is already compactedAway
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Closing and archiving the file " + file.getPath());
+            }
+            r.close(true);
+            // Just close and return
+            filesToRemove.add(file);
           }
+        } catch (Exception e) {
+          LOG.error(
+            "Exception while trying to close the compacted store file " + file.getPath().getName());
         }
-      } catch (InterruptedException ie) {
-        LOG.error("Interrupted exception while closing the compacted files", ie);
-      } catch (Exception e) {
-        LOG.error("Exception occured while closing the compacted files", e);
       }
-      if (isPrimaryReplicaStore()) {
-        archiveAndRemoveCompactedFiles(filesToRemove);
+    }
+    if (this.isPrimaryReplicaStore()) {
+      // Only the primary region is allowed to move the file to archive.
+      // The secondary region does not move the files to archive. Any active reads from
+      // the secondary region will still work because the file as such has active readers on it.
+      if (!filesToRemove.isEmpty()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Moving the files " + filesToRemove + " to archive");
+        }
+        // Only if this is successful it has to be removed
+        this.fs.removeStoreFiles(this.getFamily().getNameAsString(), filesToRemove);
       }
-
+    }
+    if (!filesToRemove.isEmpty()) {
+      // Clear the compactedfiles from the store file manager
+      clearCompactedfiles(filesToRemove);
     }
   }
 
-  private void archiveAndRemoveCompactedFiles(List<StoreFile> filesToArchive) throws IOException {
-    if (!filesToArchive.isEmpty()) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Moving the files " + filesToArchive + " to archive");
-      }
-      // Only if this is successful it has to be removed
-      this.fs.removeStoreFiles(this.getFamily().getNameAsString(), filesToArchive);
-      try {
-        lock.writeLock().lock();
-        this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToArchive);
-      } finally {
-        lock.writeLock().unlock();
-      }
+  private void clearCompactedfiles(final List<StoreFile> filesToRemove) throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Clearing the compacted file " + filesToRemove + " from this store");
+    }
+    try {
+      lock.writeLock().lock();
+      this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToRemove);
+    } finally {
+      lock.writeLock().unlock();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/28c2b18d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java
index 60fc9fb..310108c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java
@@ -67,4 +67,10 @@ public interface OnlineRegions extends Server {
     * @throws java.io.IOException
     */
    List<Region> getOnlineRegions(TableName tableName) throws IOException;
+
+   /**
+    * Get all online regions in this RS.
+    * @return List of online Region
+    */
+   List<Region> getOnlineRegions();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/28c2b18d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactedHFilesDischarger.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactedHFilesDischarger.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactedHFilesDischarger.java
deleted file mode 100644
index 4cf120d..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactedHFilesDischarger.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver.compactions;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.ScheduledChore;
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.Store;
-
-/**
- * A chore service that periodically cleans up the compacted files when there are no active readers
- * using those compacted files and also helps in clearing the block cache with these compacted
- * file entries
- */
-@InterfaceAudience.Private
-public class CompactedHFilesDischarger extends ScheduledChore {
-  private static final Log LOG = LogFactory.getLog(CompactedHFilesDischarger.class);
-  private Region region;
-
-  /**
-   * @param period the period of time to sleep between each run
-   * @param stopper the stopper
-   * @param region the store to identify the family name
-   */
-  public CompactedHFilesDischarger(final int period, final Stoppable stopper, final Region region) {
-    // Need to add the config classes
-    super("CompactedHFilesCleaner", stopper, period);
-    this.region = region;
-  }
-
-  @Override
-  public void chore() {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(
-        "Started the compacted hfiles cleaner for the region " + this.region.getRegionInfo());
-    }
-    for (Store store : region.getStores()) {
-      try {
-        store.closeAndArchiveCompactedFiles();
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Completed archiving the compacted files for the region "
-              + this.region.getRegionInfo() + " under the store " + store.getColumnFamilyName());
-        }
-      } catch (Exception e) {
-        LOG.error(
-          "Exception while trying to close and archive the comapcted store files of the store  "
-              + store.getColumnFamilyName() + " in the region " + this.region.getRegionInfo(),
-          e);
-      }
-    }
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(
-        "Completed the compacted hfiles cleaner for the region " + this.region.getRegionInfo());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/28c2b18d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
index 62e7c7c..633477e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
@@ -64,6 +64,9 @@ public class CompactionConfiguration {
   public static final String HBASE_HSTORE_MIN_LOCALITY_TO_SKIP_MAJOR_COMPACT =
       "hbase.hstore.min.locality.to.skip.major.compact";
 
+  public static final String HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT =
+      "hbase.hfile.compaction.discharger.thread.count";
+
   Configuration conf;
   StoreConfigInformation storeConfigInfo;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/28c2b18d/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index a7fc75b..0986ad7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -110,6 +110,11 @@ public class MockRegionServerServices implements RegionServerServices {
   }
 
   @Override
+  public List<Region> getOnlineRegions() {
+    return null;
+  }
+
+  @Override
   public void addToOnlineRegions(Region r) {
     this.regions.put(r.getRegionInfo().getEncodedName(), r);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/28c2b18d/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java
index 55e43de..64139ee 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.backup.example;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -42,10 +44,11 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
+import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -79,6 +82,7 @@ public class TestZooKeeperTableArchiveClient {
   private static ZKTableArchiveClient archivingClient;
   private final List<Path> toCleanup = new ArrayList<Path>();
   private static ClusterConnection CONNECTION;
+  private static RegionServerServices rss;
 
   /**
    * Setup the config for the cluster
@@ -93,6 +97,7 @@ public class TestZooKeeperTableArchiveClient {
     ZooKeeperWatcher watcher = UTIL.getZooKeeperWatcher();
     String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher);
     ZKUtil.createWithParents(watcher, archivingZNode);
+    rss = mock(RegionServerServices.class);
   }
 
   private static void setupConf(Configuration conf) {
@@ -173,8 +178,11 @@ public class TestZooKeeperTableArchiveClient {
     // create the region
     HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM);
     HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
+    List<Region> regions = new ArrayList<Region>();
+    regions.add(region);
+    when(rss.getOnlineRegions()).thenReturn(regions);
     final CompactedHFilesDischarger compactionCleaner =
-        new CompactedHFilesDischarger(100, stop, region);
+        new CompactedHFilesDischarger(100, stop, rss, false);
     loadFlushAndCompact(region, TEST_FAM);
     compactionCleaner.chore();
     // get the current hfiles in the archive directory
@@ -223,15 +231,21 @@ public class TestZooKeeperTableArchiveClient {
     // create the region
     HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM);
     HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
+    List<Region> regions = new ArrayList<Region>();
+    regions.add(region);
+    when(rss.getOnlineRegions()).thenReturn(regions);
     final CompactedHFilesDischarger compactionCleaner =
-        new CompactedHFilesDischarger(100, stop, region);
+        new CompactedHFilesDischarger(100, stop, rss, false);
     loadFlushAndCompact(region, TEST_FAM);
     compactionCleaner.chore();
     // create the another table that we don't archive
     hcd = new HColumnDescriptor(TEST_FAM);
     HRegion otherRegion = UTIL.createTestRegion(otherTable, hcd);
-    final CompactedHFilesDischarger compactionCleaner1 =
-        new CompactedHFilesDischarger(100, stop, otherRegion);
+    regions = new ArrayList<Region>();
+    regions.add(otherRegion);
+    when(rss.getOnlineRegions()).thenReturn(regions);
+    final CompactedHFilesDischarger compactionCleaner1 = new CompactedHFilesDischarger(100, stop,
+        rss, false);
     loadFlushAndCompact(otherRegion, TEST_FAM);
     compactionCleaner1.chore();
     // get the current hfiles in the archive directory

http://git-wip-us.apache.org/repos/asf/hbase/blob/28c2b18d/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index 234ad20..32f644b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -462,6 +462,11 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
   }
 
   @Override
+  public List<Region> getOnlineRegions() {
+    return null;
+  }
+
+  @Override
   public OpenRegionResponse openRegion(RpcController controller,
       OpenRegionRequest request) throws ServiceException {
     // TODO Auto-generated method stub

http://git-wip-us.apache.org/repos/asf/hbase/blob/28c2b18d/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
index 60c5473..a6b6e4c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
@@ -46,11 +46,10 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnaps
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
+import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger;
 import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
 import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.HStore;
-import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
 import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
 import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
@@ -60,6 +59,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -324,10 +324,17 @@ public class TestSnapshotFromMaster {
       region.waitForFlushesAndCompactions(); // enable can trigger a compaction, wait for it.
       region.compactStores(); // min is 2 so will compact and archive
     }
-    for (HRegion region : regions) {
-      CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, region);
-      cleaner.chore();
+    List<RegionServerThread> regionServerThreads = UTIL.getMiniHBaseCluster()
+        .getRegionServerThreads();
+    HRegionServer hrs = null;
+    for (RegionServerThread rs : regionServerThreads) {
+      if (!rs.getRegionServer().getOnlineRegions(TABLE_NAME).isEmpty()) {
+        hrs = rs.getRegionServer();
+        break;
+      }
     }
+    CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, hrs, false);
+    cleaner.chore();
     LOG.info("After compaction File-System state");
     FSUtils.logFileSystemState(fs, rootDir, LOG);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/28c2b18d/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
index c59d6f7..382193b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -69,7 +70,6 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescripto
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
 import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
 import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
 import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -167,9 +167,17 @@ public class TestHRegionReplayEvents {
     when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1));
     when(rss.getConfiguration()).thenReturn(CONF);
     when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting());
-
+    String string = org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER
+        .toString();
+    ExecutorService es = new ExecutorService(string);
+    es.startExecutorService(
+      string+"-"+string, 1);
+    when(rss.getExecutorService()).thenReturn(es);
     primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary);
     primaryRegion.close();
+    List<Region> regions = new ArrayList<Region>();
+    regions.add(primaryRegion);
+    when(rss.getOnlineRegions()).thenReturn(regions);
 
     primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
     secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null);
@@ -1370,7 +1378,10 @@ public class TestHRegionReplayEvents {
 
     // Test case 3: compact primary files
     primaryRegion.compactStores();
-    CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, primaryRegion);
+    List<Region> regions = new ArrayList<Region>();
+    regions.add(primaryRegion);
+    when(rss.getOnlineRegions()).thenReturn(regions);
+    CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, rss, false);
     cleaner.chore();
     secondaryRegion.refreshStoreFiles();
     assertPathListsEqual(primaryRegion.getStoreFileList(families),

http://git-wip-us.apache.org/repos/asf/hbase/blob/28c2b18d/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
index e0c1453..44b24ce 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
@@ -62,12 +62,12 @@ import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.PairOfSameType;
 import org.apache.hadoop.util.StringUtils;
@@ -246,25 +246,37 @@ public class TestRegionMergeTransactionOnCluster {
         count += hrfs.getStoreFiles(colFamily.getName()).size();
       }
       admin.compactRegion(mergedRegionInfo.getRegionName());
-      // wait until merged region doesn't have reference file
+      // clean up the merged region store files
+      // wait until merged region have reference file
       long timeout = System.currentTimeMillis() + waitTime;
+      int newcount = 0;
       while (System.currentTimeMillis() < timeout) {
-        if (!hrfs.hasReferences(tableDescriptor)) {
+        for(HColumnDescriptor colFamily : columnFamilies) {
+          newcount += hrfs.getStoreFiles(colFamily.getName()).size();
+        }
+        if(newcount > count) {
           break;
         }
         Thread.sleep(50);
       }
-      int newcount = 0;
-      for(HColumnDescriptor colFamily : columnFamilies) {
-        newcount += hrfs.getStoreFiles(colFamily.getName()).size();
-      }
       assertTrue(newcount > count);
-      // clean up the merged region store files
-      List<HRegion> regions = 
-          TEST_UTIL.getHBaseCluster().getRegions(tableDescriptor.getName());
-      for (HRegion region : regions) {
-        CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, region);
+      List<RegionServerThread> regionServerThreads = TEST_UTIL.getHBaseCluster()
+          .getRegionServerThreads();
+      for (RegionServerThread rs : regionServerThreads) {
+        CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null,
+            rs.getRegionServer(), false);
         cleaner.chore();
+        Thread.sleep(1000);
+      }
+      int newcount1 = 0;
+      while (System.currentTimeMillis() < timeout) {
+        for(HColumnDescriptor colFamily : columnFamilies) {
+          newcount1 += hrfs.getStoreFiles(colFamily.getName()).size();
+        }
+        if(newcount1 <= 1) {
+          break;
+        }
+        Thread.sleep(50);
       }
       // run CatalogJanitor to clean merge references in hbase:meta and archive the
       // files of merging regions

http://git-wip-us.apache.org/repos/asf/hbase/blob/28c2b18d/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
index 67258aa..99f5801 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
 
 import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.*;
 import java.io.IOException;
+import java.util.List;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -47,9 +48,9 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.AfterClass;
@@ -454,8 +455,18 @@ public class TestRegionReplicas {
       LOG.info("Force Major compaction on primary region " + hriPrimary);
       primaryRegion.compact(true);
       Assert.assertEquals(1, primaryRegion.getStore(f).getStorefilesCount());
+      List<RegionServerThread> regionServerThreads = HTU.getMiniHBaseCluster()
+          .getRegionServerThreads();
+      HRegionServer hrs = null;
+      for (RegionServerThread rs : regionServerThreads) {
+        if (rs.getRegionServer()
+            .getOnlineRegion(primaryRegion.getRegionInfo().getRegionName()) != null) {
+          hrs = rs.getRegionServer();
+          break;
+        }
+      }
       CompactedHFilesDischarger cleaner =
-          new CompactedHFilesDischarger(100, null, (HRegion) primaryRegion);
+          new CompactedHFilesDischarger(100, null, hrs, false);
       cleaner.chore();
       // scan all the hfiles on the secondary.
       // since there are no read on the secondary when we ask locations to

http://git-wip-us.apache.org/repos/asf/hbase/blob/28c2b18d/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java
index 40539c4..c23e794 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.regionserver.compactions;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -38,10 +40,12 @@ import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -62,6 +66,7 @@ public class TestCompactedHFilesDischarger {
   private static CountDownLatch latch = new CountDownLatch(3);
   private static AtomicInteger counter = new AtomicInteger(0);
   private static AtomicInteger scanCompletedCounter = new AtomicInteger(0);
+  private RegionServerServices rss;
 
   @Before
   public void setUp() throws Exception {
@@ -71,6 +76,10 @@ public class TestCompactedHFilesDischarger {
     HRegionInfo info = new HRegionInfo(tableName, null, null, false);
     Path path = testUtil.getDataTestDir(getClass().getSimpleName());
     region = HBaseTestingUtility.createRegionAndWAL(info, path, testUtil.getConfiguration(), htd);
+    rss = mock(RegionServerServices.class);
+    List<Region> regions = new ArrayList<Region>();
+    regions.add(region);
+    when(rss.getOnlineRegions()).thenReturn(regions);
   }
 
   @After
@@ -86,7 +95,7 @@ public class TestCompactedHFilesDischarger {
   public void testCompactedHFilesCleaner() throws Exception {
     // Create the cleaner object
     CompactedHFilesDischarger cleaner =
-        new CompactedHFilesDischarger(1000, (Stoppable) null, (HRegion) region);
+        new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
     // Add some data to the region and do some flushes
     for (int i = 1; i < 10; i++) {
       Put p = new Put(Bytes.toBytes("row" + i));
@@ -152,7 +161,7 @@ public class TestCompactedHFilesDischarger {
   public void testCleanerWithParallelScannersAfterCompaction() throws Exception {
     // Create the cleaner object
     CompactedHFilesDischarger cleaner =
-        new CompactedHFilesDischarger(1000, (Stoppable) null, (HRegion) region);
+        new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
     // Add some data to the region and do some flushes
     for (int i = 1; i < 10; i++) {
       Put p = new Put(Bytes.toBytes("row" + i));
@@ -223,7 +232,7 @@ public class TestCompactedHFilesDischarger {
   public void testCleanerWithParallelScanners() throws Exception {
     // Create the cleaner object
     CompactedHFilesDischarger cleaner =
-        new CompactedHFilesDischarger(1000, (Stoppable) null, (HRegion) region);
+        new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
     // Add some data to the region and do some flushes
     for (int i = 1; i < 10; i++) {
       Put p = new Put(Bytes.toBytes("row" + i));