You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/09/14 12:54:21 UTC

[2/3] hbase git commit: HBASE-18453 CompactionRequest should not be exposed to user directly

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 6edf006..86a24ad 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -17,6 +17,59 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
+import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
+
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+import java.text.ParseException;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Optional;
+import java.util.RandomAccess;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -90,6 +143,7 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.Write
 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
 import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
 import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
@@ -97,7 +151,6 @@ import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Optional;
 import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps;
@@ -143,58 +196,6 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
 
-import java.io.EOFException;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.lang.reflect.Constructor;
-import java.nio.ByteBuffer;
-import java.text.ParseException;
-import java.util.AbstractList;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
-import java.util.NavigableSet;
-import java.util.RandomAccess;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.LongAdder;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Function;
-
-import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
-import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
-
 @SuppressWarnings("deprecation")
 @InterfaceAudience.Private
 public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region {
@@ -254,9 +255,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   // - the thread that owns the lock (allow reentrancy)
   // - reference count of (reentrant) locks held by the thread
   // - the row itself
-  private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows =
+      new ConcurrentHashMap<>();
 
-  protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR);
+  protected final Map<byte[], HStore> stores =
+      new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR);
 
   // TODO: account for each registered handler in HeapSize computation
   private Map<String, com.google.protobuf.Service> coprocessorServiceHandlers = Maps.newHashMap();
@@ -513,7 +516,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   /** A result object from prepare flush cache stage */
   @VisibleForTesting
   static class PrepareFlushResult {
-    final FlushResult result; // indicating a failure result from prepare
+    final FlushResultImpl result; // indicating a failure result from prepare
     final TreeMap<byte[], StoreFlushContext> storeFlushCtxs;
     final TreeMap<byte[], List<Path>> committedFiles;
     final TreeMap<byte[], MemstoreSize> storeFlushableSize;
@@ -523,7 +526,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     final MemstoreSize totalFlushableSize;
 
     /** Constructs an early exit case */
-    PrepareFlushResult(FlushResult result, long flushSeqId) {
+    PrepareFlushResult(FlushResultImpl result, long flushSeqId) {
       this(result, null, null, null, Math.max(0, flushSeqId), 0, 0, new MemstoreSize());
     }
 
@@ -538,7 +541,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
 
     private PrepareFlushResult(
-      FlushResult result,
+        FlushResultImpl result,
       TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
       TreeMap<byte[], List<Path>> committedFiles,
       TreeMap<byte[], MemstoreSize> storeFlushableSize, long startTime, long flushSeqId,
@@ -616,7 +619,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   final long rowProcessorTimeout;
 
   // Last flush time for each Store. Useful when we are flushing for each column
-  private final ConcurrentMap<Store, Long> lastStoreFlushTimeMap = new ConcurrentHashMap<>();
+  private final ConcurrentMap<HStore, Long> lastStoreFlushTimeMap = new ConcurrentHashMap<>();
 
   final RegionServerServices rsServices;
   private RegionServerAccounting rsAccounting;
@@ -802,7 +805,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     this.disallowWritesInRecovering =
         conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING,
           HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG);
-    configurationManager = Optional.absent();
+    configurationManager = Optional.empty();
 
     // disable stats tracking system tables, but check the config for everything else
     this.regionStatsEnabled = htd.getTableName().getNamespaceAsString().equals(
@@ -902,22 +905,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     long maxSeqId = initializeStores(reporter, status);
     this.mvcc.advanceTo(maxSeqId);
     if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
-      List<Store> stores = this.getStores();  // update the stores that we are replaying
+      Collection<HStore> stores = this.stores.values();
       try {
-        for (Store store : stores) {
-          ((HStore) store).startReplayingFromWAL();
-        }
+        // update the stores that we are replaying
+        stores.forEach(HStore::startReplayingFromWAL);
         // Recover any edits if available.
         maxSeqId = Math.max(maxSeqId,
-            replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
+          replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
         // Make sure mvcc is up to max.
         this.mvcc.advanceTo(maxSeqId);
       } finally {
-        for (Store store : stores) {            // update the stores that we are done replaying
-          ((HStore)store).stopReplayingFromWAL();
-        }
+        // update the stores that we are done replaying
+        stores.forEach(HStore::startReplayingFromWAL);
       }
-
     }
     this.lastReplayedOpenRegionSeqId = maxSeqId;
 
@@ -947,7 +947,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     this.flushPolicy = FlushPolicyFactory.create(this, conf);
 
     long lastFlushTime = EnvironmentEdgeManager.currentTime();
-    for (Store store: stores.values()) {
+    for (HStore store: stores.values()) {
       this.lastStoreFlushTimeMap.put(store, lastFlushTime);
     }
 
@@ -988,10 +988,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @return Highest sequenceId found out in a Store.
    * @throws IOException
    */
-  private long initializeStores(final CancelableProgressable reporter, MonitoredTask status)
-  throws IOException {
+  private long initializeStores(CancelableProgressable reporter, MonitoredTask status)
+      throws IOException {
     // Load in all the HStores.
-
     long maxSeqId = -1;
     // initialized to -1 so that we pick up MemstoreTS from column families
     long maxMemstoreTS = -1;
@@ -1050,11 +1049,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         if (!allStoresOpened) {
           // something went wrong, close all opened stores
           LOG.error("Could not initialize all stores for the region=" + this);
-          for (Store store : this.stores.values()) {
+          for (HStore store : this.stores.values()) {
             try {
               store.close();
             } catch (IOException e) {
-              LOG.warn(e.getMessage());
+              LOG.warn("close store failed", e);
             }
           }
         }
@@ -1079,11 +1078,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    */
   private NavigableMap<byte[], List<Path>> getStoreFiles() {
     NavigableMap<byte[], List<Path>> allStoreFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-    for (Store store: getStores()) {
+    for (HStore store : stores.values()) {
       Collection<StoreFile> storeFiles = store.getStorefiles();
-      if (storeFiles == null) continue;
+      if (storeFiles == null) {
+        continue;
+      }
       List<Path> storeFileNames = new ArrayList<>();
-      for (StoreFile storeFile: storeFiles) {
+      for (StoreFile storeFile : storeFiles) {
         storeFileNames.add(storeFile.getPath());
       }
       allStoreFiles.put(store.getColumnFamilyDescriptor().getName(), storeFileNames);
@@ -1121,10 +1122,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @return True if this region has references.
    */
   public boolean hasReferences() {
-    for (Store store : this.stores.values()) {
-      if (store.hasReferences()) return true;
-    }
-    return false;
+    return stores.values().stream().anyMatch(HStore::hasReferences);
   }
 
   public void blockUpdates() {
@@ -1137,19 +1135,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   @Override
   public HDFSBlocksDistribution getHDFSBlocksDistribution() {
-    HDFSBlocksDistribution hdfsBlocksDistribution =
-      new HDFSBlocksDistribution();
-    synchronized (this.stores) {
-      for (Store store : this.stores.values()) {
-        Collection<StoreFile> storeFiles = store.getStorefiles();
-        if (storeFiles == null) continue;
-        for (StoreFile sf : storeFiles) {
-          HDFSBlocksDistribution storeFileBlocksDistribution =
-            sf.getHDFSBlockDistribution();
-          hdfsBlocksDistribution.add(storeFileBlocksDistribution);
-        }
-      }
-    }
+    HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
+    stores.values().stream().filter(s -> s.getStorefiles() != null)
+        .flatMap(s -> s.getStorefiles().stream()).map(StoreFile::getHDFSBlockDistribution)
+        .forEachOrdered(hdfsBlocksDistribution::add);
     return hdfsBlocksDistribution;
   }
 
@@ -1161,8 +1150,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @return The HDFS blocks distribution for the given region.
    * @throws IOException
    */
-  public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
-      final TableDescriptor tableDescriptor, final HRegionInfo regionInfo) throws IOException {
+  public static HDFSBlocksDistribution computeHDFSBlocksDistribution(Configuration conf,
+      TableDescriptor tableDescriptor, HRegionInfo regionInfo) throws IOException {
     Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), tableDescriptor.getTableName());
     return computeHDFSBlocksDistribution(conf, tableDescriptor, regionInfo, tablePath);
   }
@@ -1176,9 +1165,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @return The HDFS blocks distribution for the given region.
    * @throws IOException
    */
-  public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
-      final TableDescriptor tableDescriptor, final HRegionInfo regionInfo,  Path tablePath)
-      throws IOException {
+  public static HDFSBlocksDistribution computeHDFSBlocksDistribution(Configuration conf,
+      TableDescriptor tableDescriptor, HRegionInfo regionInfo, Path tablePath) throws IOException {
     HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
     FileSystem fs = tablePath.getFileSystem(conf);
 
@@ -1407,9 +1395,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       new Throwable("LOGGING: REMOVE"));
     // REMOVE BELOW!!!!
     LOG.info("DEBUG LIST ALL FILES");
-    for (Store store: this.stores.values()) {
+    for (HStore store : this.stores.values()) {
       LOG.info("store " + store.getColumnFamilyName());
-      for (StoreFile sf: store.getStorefiles()) {
+      for (StoreFile sf : store.getStorefiles()) {
         LOG.info(sf.toStringDetailed());
       }
     }
@@ -1667,7 +1655,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           new ExecutorCompletionService<>(storeCloserThreadPool);
 
         // close each store in parallel
-        for (final Store store : stores.values()) {
+        for (HStore store : stores.values()) {
           MemstoreSize flushableSize = store.getSizeToFlush();
           if (!(abort || flushableSize.getDataSize() == 0 || writestate.readOnly)) {
             if (getRegionServerServices() != null) {
@@ -1740,11 +1728,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   }
 
   private long getMemstoreHeapSize() {
-    long size = 0;
-    for (Store s : this.stores.values()) {
-      size += s.getSizeOfMemStore().getHeapSize();
-    }
-    return size;
+    return stores.values().stream().mapToLong(s -> s.getSizeOfMemStore().getHeapSize()).sum();
   }
 
   @Override
@@ -1902,17 +1886,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   @Override
   public long getOldestHfileTs(boolean majorCompactionOnly) throws IOException {
     long result = Long.MAX_VALUE;
-    for (Store store : getStores()) {
+    for (HStore store : stores.values()) {
       Collection<StoreFile> storeFiles = store.getStorefiles();
-      if (storeFiles == null) continue;
+      if (storeFiles == null) {
+        continue;
+      }
       for (StoreFile file : storeFiles) {
         StoreFileReader sfReader = file.getReader();
-        if (sfReader == null) continue;
+        if (sfReader == null) {
+          continue;
+        }
         HFile.Reader reader = sfReader.getHFileReader();
-        if (reader == null) continue;
+        if (reader == null) {
+          continue;
+        }
         if (majorCompactionOnly) {
           byte[] val = reader.loadFileInfo().get(StoreFile.MAJOR_COMPACTION_KEY);
-          if (val == null || !Bytes.toBoolean(val)) continue;
+          if (val == null || !Bytes.toBoolean(val)) {
+            continue;
+          }
         }
         result = Math.min(result, reader.getFileContext().getFileCreateTime());
       }
@@ -1942,20 +1934,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   // These methods are meant to be called periodically by the HRegionServer for
   // upkeep.
   //////////////////////////////////////////////////////////////////////////////
-
-  /** @return returns size of largest HStore. */
+  /**
+   * @return returns size of largest HStore.
+   */
   public long getLargestHStoreSize() {
-    long size = 0;
-    for (Store h : stores.values()) {
-      long storeSize = h.getSize();
-      if (storeSize > size) {
-        size = storeSize;
-      }
-    }
-    return size;
+    return stores.values().stream().mapToLong(HStore::getSize).max().orElse(0L);
   }
 
-  /*
+  /**
    * Do preparation for pending compaction.
    * @throws IOException
    */
@@ -1964,19 +1950,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   @Override
   public void triggerMajorCompaction() throws IOException {
-    for (Store s : getStores()) {
-      s.triggerMajorCompaction();
-    }
+    stores.values().forEach(HStore::triggerMajorCompaction);
   }
 
   @Override
-  public void compact(final boolean majorCompaction) throws IOException {
+  public void compact(boolean majorCompaction) throws IOException {
     if (majorCompaction) {
       triggerMajorCompaction();
     }
-    for (Store s : getStores()) {
-      CompactionContext compaction = s.requestCompaction();
-      if (compaction != null) {
+    for (HStore s : stores.values()) {
+      Optional<CompactionContext> compaction = s.requestCompaction();
+      if (compaction.isPresent()) {
         ThroughputController controller = null;
         if (rsServices != null) {
           controller = CompactionThroughputControllerFactory.create(rsServices, conf);
@@ -1984,43 +1968,41 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         if (controller == null) {
           controller = NoLimitThroughputController.INSTANCE;
         }
-        compact(compaction, s, controller, null);
+        compact(compaction.get(), s, controller, null);
       }
     }
   }
 
   /**
-   * This is a helper function that compact all the stores synchronously
+   * This is a helper function that compact all the stores synchronously.
+   * <p>
    * It is used by utilities and testing
-   *
-   * @throws IOException e
    */
+  @VisibleForTesting
   public void compactStores() throws IOException {
-    for (Store s : getStores()) {
-      CompactionContext compaction = s.requestCompaction();
-      if (compaction != null) {
-        compact(compaction, s, NoLimitThroughputController.INSTANCE, null);
+    for (HStore s : stores.values()) {
+      Optional<CompactionContext> compaction = s.requestCompaction();
+      if (compaction.isPresent()) {
+        compact(compaction.get(), s, NoLimitThroughputController.INSTANCE, null);
       }
     }
   }
 
   /**
-   * This is a helper function that compact the given store
+   * This is a helper function that compact the given store.
+   * <p>
    * It is used by utilities and testing
-   *
-   * @throws IOException e
    */
   @VisibleForTesting
-  void compactStore(byte[] family, ThroughputController throughputController)
-      throws IOException {
-    Store s = getStore(family);
-    CompactionContext compaction = s.requestCompaction();
-    if (compaction != null) {
-      compact(compaction, s, throughputController, null);
+  void compactStore(byte[] family, ThroughputController throughputController) throws IOException {
+    HStore s = getStore(family);
+    Optional<CompactionContext> compaction = s.requestCompaction();
+    if (compaction.isPresent()) {
+      compact(compaction.get(), s, throughputController, null);
     }
   }
 
-  /*
+  /**
    * Called by compaction thread and after region is opened to compact the
    * HStores if necessary.
    *
@@ -2035,12 +2017,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @param throughputController
    * @return whether the compaction completed
    */
-  public boolean compact(CompactionContext compaction, Store store,
+  public boolean compact(CompactionContext compaction, HStore store,
       ThroughputController throughputController) throws IOException {
     return compact(compaction, store, throughputController, null);
   }
 
-  public boolean compact(CompactionContext compaction, Store store,
+  public boolean compact(CompactionContext compaction, HStore store,
       ThroughputController throughputController, User user) throws IOException {
     assert compaction != null && compaction.hasSelection();
     assert !compaction.getRequest().getFiles().isEmpty();
@@ -2214,7 +2196,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * because a Snapshot was not properly persisted. The region is put in closing mode, and the
    * caller MUST abort after this.
    */
-  public FlushResult flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker)
+  public FlushResultImpl flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker)
       throws IOException {
     // fail-fast instead of waiting on the lock
     if (this.closing.get()) {
@@ -2261,10 +2243,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
 
       try {
-        Collection<Store> specificStoresToFlush =
+        Collection<HStore> specificStoresToFlush =
             forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush();
-        FlushResult fs = internalFlushcache(specificStoresToFlush,
-          status, writeFlushRequestWalMarker);
+        FlushResultImpl fs =
+            internalFlushcache(specificStoresToFlush, status, writeFlushRequestWalMarker);
 
         if (coprocessorHost != null) {
           status.setStatus("Running post-flush coprocessor hooks");
@@ -2297,7 +2279,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * that you always flush all stores). Otherwise the method will always
    * returns true which will make a lot of flush requests.
    */
-  boolean shouldFlushStore(Store store) {
+  boolean shouldFlushStore(HStore store) {
     long earliest = this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(),
       store.getColumnFamilyDescriptor().getName()) - 1;
     if (earliest > 0 && earliest + flushPerChanges < mvcc.getReadPoint()) {
@@ -2349,7 +2331,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
     //since we didn't flush in the recent past, flush now if certain conditions
     //are met. Return true on first such memstore hit.
-    for (Store s : getStores()) {
+    for (Store s : stores.values()) {
       if (s.timeOfOldestEdit() < now - modifiedFlushCheckInterval) {
         // we have an old enough edit in the memstore, flush
         whyFlush.append(s.toString() + " has an old edit so flush to free WALs");
@@ -2361,39 +2343,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   /**
    * Flushing all stores.
-   *
    * @see #internalFlushcache(Collection, MonitoredTask, boolean)
    */
-  private FlushResult internalFlushcache(MonitoredTask status)
-      throws IOException {
+  private FlushResult internalFlushcache(MonitoredTask status) throws IOException {
     return internalFlushcache(stores.values(), status, false);
   }
 
   /**
    * Flushing given stores.
-   *
    * @see #internalFlushcache(WAL, long, Collection, MonitoredTask, boolean)
    */
-  private FlushResult internalFlushcache(final Collection<Store> storesToFlush,
-      MonitoredTask status, boolean writeFlushWalMarker) throws IOException {
-    return internalFlushcache(this.wal, HConstants.NO_SEQNUM, storesToFlush,
-        status, writeFlushWalMarker);
+  private FlushResultImpl internalFlushcache(Collection<HStore> storesToFlush, MonitoredTask status,
+      boolean writeFlushWalMarker) throws IOException {
+    return internalFlushcache(this.wal, HConstants.NO_SEQNUM, storesToFlush, status,
+      writeFlushWalMarker);
   }
 
   /**
-   * Flush the memstore. Flushing the memstore is a little tricky. We have a lot
-   * of updates in the memstore, all of which have also been written to the wal.
-   * We need to write those updates in the memstore out to disk, while being
-   * able to process reads/writes as much as possible during the flush
-   * operation.
+   * Flush the memstore. Flushing the memstore is a little tricky. We have a lot of updates in the
+   * memstore, all of which have also been written to the wal. We need to write those updates in the
+   * memstore out to disk, while being able to process reads/writes as much as possible during the
+   * flush operation.
    * <p>
-   * This method may block for some time. Every time you call it, we up the
-   * regions sequence id even if we don't flush; i.e. the returned region id
-   * will be at least one larger than the last edit applied to this region. The
-   * returned id does not refer to an actual edit. The returned id can be used
-   * for say installing a bulk loaded file just ahead of the last hfile that was
-   * the result of this flush, etc.
-   *
+   * This method may block for some time. Every time you call it, we up the regions sequence id even
+   * if we don't flush; i.e. the returned region id will be at least one larger than the last edit
+   * applied to this region. The returned id does not refer to an actual edit. The returned id can
+   * be used for say installing a bulk loaded file just ahead of the last hfile that was the result
+   * of this flush, etc.
    * @param wal Null if we're NOT to go via wal.
    * @param myseqid The seqid to use if <code>wal</code> is null writing out flush file.
    * @param storesToFlush The list of stores to flush.
@@ -2401,9 +2377,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @throws IOException general io exceptions
    * @throws DroppedSnapshotException Thrown when replay of WAL is required.
    */
-  protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
-      final Collection<Store> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
-          throws IOException {
+  protected FlushResultImpl internalFlushcache(WAL wal, long myseqid, Collection<HStore> storesToFlush,
+      MonitoredTask status, boolean writeFlushWalMarker) throws IOException {
     PrepareFlushResult result
       = internalPrepareFlushCache(wal, myseqid, storesToFlush, status, writeFlushWalMarker);
     if (result.result == null) {
@@ -2415,9 +2390,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DLS_DEAD_LOCAL_STORE",
       justification="FindBugs seems confused about trxId")
-  protected PrepareFlushResult internalPrepareFlushCache(final WAL wal, final long myseqid,
-      final Collection<Store> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
-  throws IOException {
+  protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid,
+      Collection<HStore> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
+      throws IOException {
     if (this.rsServices != null && this.rsServices.isAborted()) {
       // Don't flush when server aborting, it's unsafe
       throw new IOException("Aborting flush because server is aborted...");
@@ -2439,11 +2414,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           // edits in the WAL sub-system. Up the sequence number so the resulting flush id is for
           // sure just beyond the last appended region edit and not associated with any edit
           // (useful as marker when bulk loading, etc.).
-          FlushResult flushResult = null;
           if (wal != null) {
             writeEntry = mvcc.begin();
             long flushOpSeqId = writeEntry.getWriteNumber();
-            flushResult = new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
+            FlushResultImpl flushResult = new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
               flushOpSeqId, "Nothing to flush",
             writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker));
             mvcc.completeAndWait(writeEntry);
@@ -2479,9 +2453,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     MemstoreSize totalSizeOfFlushableStores = new MemstoreSize();
 
     Map<byte[], Long> flushedFamilyNamesToSeq = new HashMap<>();
-    for (Store store: storesToFlush) {
+    for (HStore store : storesToFlush) {
       flushedFamilyNamesToSeq.put(store.getColumnFamilyDescriptor().getName(),
-          ((HStore) store).preFlushSeqIDEstimation());
+        store.preFlushSeqIDEstimation());
     }
 
     TreeMap<byte[], StoreFlushContext> storeFlushCtxs = new TreeMap<>(Bytes.BYTES_COMPARATOR);
@@ -2517,7 +2491,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         flushedSeqId = flushOpSeqId = myseqid;
       }
 
-      for (Store s : storesToFlush) {
+      for (HStore s : storesToFlush) {
         MemstoreSize flushableSize = s.getSizeToFlush();
         totalSizeOfFlushableStores.incMemstoreSize(flushableSize);
         storeFlushCtxs.put(s.getColumnFamilyDescriptor().getName(), s.createFlushContext(flushOpSeqId));
@@ -2555,7 +2529,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   /**
    * Utility method broken out of internalPrepareFlushCache so that method is smaller.
    */
-  private void logFatLineOnFlush(final Collection<Store> storesToFlush, final long sequenceId) {
+  private void logFatLineOnFlush(Collection<HStore> storesToFlush, long sequenceId) {
     if (!LOG.isInfoEnabled()) {
       return;
     }
@@ -2563,7 +2537,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     StringBuilder perCfExtras = null;
     if (!isAllFamilies(storesToFlush)) {
       perCfExtras = new StringBuilder();
-      for (Store store: storesToFlush) {
+      for (HStore store: storesToFlush) {
         perCfExtras.append("; ").append(store.getColumnFamilyName());
         perCfExtras.append("=")
             .append(StringUtils.byteDesc(store.getSizeToFlush().getDataSize()));
@@ -2611,7 +2585,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   /**
    * @return True if passed Set is all families in the region.
    */
-  private boolean isAllFamilies(final Collection<Store> families) {
+  private boolean isAllFamilies(Collection<HStore> families) {
     return families == null || this.stores.size() == families.size();
   }
 
@@ -2639,11 +2613,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
       justification="Intentional; notify is about completed flush")
-  protected FlushResult internalFlushCacheAndCommit(
-        final WAL wal, MonitoredTask status, final PrepareFlushResult prepareResult,
-        final Collection<Store> storesToFlush)
-    throws IOException {
-
+  protected FlushResultImpl internalFlushCacheAndCommit(WAL wal, MonitoredTask status,
+      PrepareFlushResult prepareResult, Collection<HStore> storesToFlush) throws IOException {
     // prepare flush context is carried via PrepareFlushResult
     TreeMap<byte[], StoreFlushContext> storeFlushCtxs = prepareResult.storeFlushCtxs;
     TreeMap<byte[], List<Path>> committedFiles = prepareResult.committedFiles;
@@ -2673,7 +2644,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
       // Switch snapshot (in memstore) -> new hfile (thus causing
       // all the store scanners to reset/reseek).
-      Iterator<Store> it = storesToFlush.iterator();
+      Iterator<HStore> it = storesToFlush.iterator();
       // stores.values() and storeFlushCtxs have same order
       for (StoreFlushContext flush : storeFlushCtxs.values()) {
         boolean needsCompaction = flush.commit(status);
@@ -2746,7 +2717,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
 
     // Record latest flush time
-    for (Store store: storesToFlush) {
+    for (HStore store: storesToFlush) {
       this.lastStoreFlushTimeMap.put(store, startTime);
     }
 
@@ -4002,34 +3973,34 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
   }
 
-  /*
+  /**
    * @param delta If we are doing delta changes -- e.g. increment/append -- then this flag will be
-   *  set; when set we will run operations that make sense in the increment/append scenario but
-   *  that do not make sense otherwise.
-   * @see #applyToMemstore(Store, Cell, long)
+   *          set; when set we will run operations that make sense in the increment/append scenario
+   *          but that do not make sense otherwise.
+   * @see #applyToMemstore(HStore, Cell, long)
    */
-  private void applyToMemstore(final Store store, final List<Cell> cells, final boolean delta,
+  private void applyToMemstore(HStore store, List<Cell> cells, boolean delta,
       MemstoreSize memstoreSize) throws IOException {
     // Any change in how we update Store/MemStore needs to also be done in other applyToMemstore!!!!
     boolean upsert = delta && store.getColumnFamilyDescriptor().getMaxVersions() == 1;
     if (upsert) {
-      ((HStore) store).upsert(cells, getSmallestReadPoint(), memstoreSize);
+      store.upsert(cells, getSmallestReadPoint(), memstoreSize);
     } else {
-      ((HStore) store).add(cells, memstoreSize);
+      store.add(cells, memstoreSize);
     }
   }
 
-  /*
-   * @see #applyToMemstore(Store, List, boolean, boolean, long)
+  /**
+   * @see #applyToMemstore(HStore, List, boolean, boolean, long)
    */
-  private void applyToMemstore(final Store store, final Cell cell, MemstoreSize memstoreSize)
-  throws IOException {
+  private void applyToMemstore(HStore store, Cell cell, MemstoreSize memstoreSize)
+      throws IOException {
     // Any change in how we update Store/MemStore needs to also be done in other applyToMemstore!!!!
     if (store == null) {
       checkFamily(CellUtil.cloneFamily(cell));
       // Unreachable because checkFamily will throw exception
     }
-    ((HStore) store).add(cell, memstoreSize);
+    store.add(cell, memstoreSize);
   }
 
   @Override
@@ -4368,7 +4339,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             // Figure which store the edit is meant for.
             if (store == null || !CellUtil.matchingFamily(cell,
                 store.getColumnFamilyDescriptor().getName())) {
-              store = getHStore(cell);
+              store = getStore(cell);
             }
             if (store == null) {
               // This should never happen.  Perhaps schema was changed between
@@ -4497,7 +4468,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
       startRegionOperation(Operation.REPLAY_EVENT);
       try {
-        HStore store = this.getHStore(compaction.getFamilyName().toByteArray());
+        HStore store = this.getStore(compaction.getFamilyName().toByteArray());
         if (store == null) {
           LOG.warn(getRegionInfo().getEncodedName() + " : "
               + "Found Compaction WAL edit for deleted family:"
@@ -4567,10 +4538,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   PrepareFlushResult replayWALFlushStartMarker(FlushDescriptor flush) throws IOException {
     long flushSeqId = flush.getFlushSequenceNumber();
 
-    HashSet<Store> storesToFlush = new HashSet<>();
+    HashSet<HStore> storesToFlush = new HashSet<>();
     for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) {
       byte[] family = storeFlush.getFamilyName().toByteArray();
-      Store store = getStore(family);
+      HStore store = getStore(family);
       if (store == null) {
         LOG.warn(getRegionInfo().getEncodedName() + " : "
           + "Received a flush start marker from primary, but the family is not found. Ignoring"
@@ -4807,7 +4778,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       throws IOException {
     for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) {
       byte[] family = storeFlush.getFamilyName().toByteArray();
-      Store store = getStore(family);
+      HStore store = getStore(family);
       if (store == null) {
         LOG.warn(getRegionInfo().getEncodedName() + " : "
             + "Received a flush commit marker from primary, but the family is not found."
@@ -4843,7 +4814,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * if the memstore edits have seqNums smaller than the given seq id
    * @throws IOException
    */
-  private MemstoreSize dropMemstoreContentsForSeqId(long seqId, Store store) throws IOException {
+  private MemstoreSize dropMemstoreContentsForSeqId(long seqId, HStore store) throws IOException {
     MemstoreSize totalFreedSize = new MemstoreSize();
     this.updatesLock.writeLock().lock();
     try {
@@ -4857,7 +4828,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
         // Prepare flush (take a snapshot) and then abort (drop the snapshot)
         if (store == null) {
-          for (Store s : stores.values()) {
+          for (HStore s : stores.values()) {
             totalFreedSize.incMemstoreSize(doDropStoreMemstoreContentsForSeqId(s, currentSeqId));
           }
         } else {
@@ -4874,7 +4845,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     return totalFreedSize;
   }
 
-  private MemstoreSize doDropStoreMemstoreContentsForSeqId(Store s, long currentSeqId)
+  private MemstoreSize doDropStoreMemstoreContentsForSeqId(HStore s, long currentSeqId)
       throws IOException {
     MemstoreSize flushableSize = s.getSizeToFlush();
     this.decrMemstoreSize(flushableSize);
@@ -4965,7 +4936,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         for (StoreDescriptor storeDescriptor : regionEvent.getStoresList()) {
           // stores of primary may be different now
           byte[] family = storeDescriptor.getFamilyName().toByteArray();
-          Store store = getStore(family);
+          HStore store = getStore(family);
           if (store == null) {
             LOG.warn(getRegionInfo().getEncodedName() + " : "
                 + "Received a region open marker from primary, but the family is not found. "
@@ -5081,7 +5052,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         for (StoreDescriptor storeDescriptor : bulkLoadEvent.getStoresList()) {
           // stores of primary may be different now
           family = storeDescriptor.getFamilyName().toByteArray();
-          HStore store = getHStore(family);
+          HStore store = getStore(family);
           if (store == null) {
             LOG.warn(getRegionInfo().getEncodedName() + " : "
                     + "Received a bulk load marker from primary, but the family is not found. "
@@ -5119,9 +5090,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     if (writestate.flushing) {
       boolean canDrop = true;
       if (prepareFlushResult.storeFlushCtxs != null) {
-        for (Entry<byte[], StoreFlushContext> entry
-            : prepareFlushResult.storeFlushCtxs.entrySet()) {
-          Store store = getStore(entry.getKey());
+        for (Entry<byte[], StoreFlushContext> entry : prepareFlushResult.storeFlushCtxs
+            .entrySet()) {
+          HStore store = getStore(entry.getKey());
           if (store == null) {
             continue;
           }
@@ -5164,9 +5135,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
     startRegionOperation(); // obtain region close lock
     try {
-      Map<Store, Long> map = new HashMap<>();
+      Map<HStore, Long> map = new HashMap<>();
       synchronized (writestate) {
-        for (Store store : getStores()) {
+        for (HStore store : stores.values()) {
           // TODO: some stores might see new data from flush, while others do not which
           // MIGHT break atomic edits across column families.
           long maxSeqIdBefore = store.getMaxSequenceId();
@@ -5207,10 +5178,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         dropPrepareFlushIfPossible();
 
         // advance the mvcc read point so that the new flushed files are visible.
-          // either greater than flush seq number or they were already picked up via flush.
-          for (Store s : getStores()) {
-            mvcc.advanceTo(s.getMaxMemstoreTS());
-          }
+        // either greater than flush seq number or they were already picked up via flush.
+        for (HStore s : stores.values()) {
+          mvcc.advanceTo(s.getMaxMemstoreTS());
+        }
 
 
         // smallestSeqIdInStores is the seqId that we have a corresponding hfile for. We can safely
@@ -5222,7 +5193,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         }
       }
       if (!map.isEmpty()) {
-        for (Map.Entry<Store, Long> entry : map.entrySet()) {
+        for (Map.Entry<HStore, Long> entry : map.entrySet()) {
           // Drop the memstore contents if they are now smaller than the latest seen flushed file
           totalFreedDataSize += dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey())
               .getDataSize();
@@ -5242,13 +5213,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   private void logRegionFiles() {
     if (LOG.isTraceEnabled()) {
       LOG.trace(getRegionInfo().getEncodedName() + " : Store files for region: ");
-      for (Store s : stores.values()) {
-        Collection<StoreFile> storeFiles = s.getStorefiles();
-        if (storeFiles == null) continue;
-        for (StoreFile sf : storeFiles) {
-          LOG.trace(getRegionInfo().getEncodedName() + " : " + sf);
-        }
-      }
+      stores.values().stream().filter(s -> s.getStorefiles() != null)
+          .flatMap(s -> s.getStorefiles().stream())
+          .forEachOrdered(sf -> LOG.trace(getRegionInfo().getEncodedName() + " : " + sf));
     }
   }
 
@@ -5272,17 +5239,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       + " does not match this region: " + this.getRegionInfo());
   }
 
-  /*
+  /**
    * Used by tests
    * @param s Store to add edit too.
    * @param cell Cell to add.
    * @param memstoreSize
    */
-  protected void restoreEdit(final HStore s, final Cell cell, MemstoreSize memstoreSize) {
+  @VisibleForTesting
+  protected void restoreEdit(HStore s, Cell cell, MemstoreSize memstoreSize) {
     s.add(cell, memstoreSize);
   }
 
-  /*
+  /**
    * @param fs
    * @param p File to check.
    * @return True if file was zero-length (and if so, we'll delete it in here).
@@ -5291,7 +5259,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p)
       throws IOException {
     FileStatus stat = fs.getFileStatus(p);
-    if (stat.getLen() > 0) return false;
+    if (stat.getLen() > 0) {
+      return false;
+    }
     LOG.warn("File " + p + " is zero-length, deleting.");
     fs.delete(p, false);
     return true;
@@ -5311,49 +5281,39 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   }
 
   @Override
-  public Store getStore(final byte[] column) {
-    return getHStore(column);
-  }
-
-  public HStore getHStore(final byte[] column) {
-    return (HStore) this.stores.get(column);
+  public HStore getStore(byte[] column) {
+    return this.stores.get(column);
   }
 
   /**
-   * Return HStore instance. Does not do any copy: as the number of store is limited, we
-   *  iterate on the list.
+   * Return HStore instance. Does not do any copy: as the number of store is limited, we iterate on
+   * the list.
    */
-  private HStore getHStore(Cell cell) {
-    for (Map.Entry<byte[], Store> famStore : stores.entrySet()) {
-      if (CellUtil.matchingFamily(cell, famStore.getKey(), 0, famStore.getKey().length)) {
-        return (HStore) famStore.getValue();
-      }
-    }
-
-    return null;
+  private HStore getStore(Cell cell) {
+    return stores.entrySet().stream().filter(e -> CellUtil.matchingFamily(cell, e.getKey()))
+        .map(e -> e.getValue()).findFirst().orElse(null);
   }
 
   @Override
-  public List<Store> getStores() {
-    List<Store> list = new ArrayList<>(stores.size());
-    list.addAll(stores.values());
-    return list;
+  public List<HStore> getStores() {
+    return new ArrayList<>(stores.values());
   }
 
   @Override
-  public List<String> getStoreFileList(final byte [][] columns)
-    throws IllegalArgumentException {
+  public List<String> getStoreFileList(byte[][] columns) throws IllegalArgumentException {
     List<String> storeFileNames = new ArrayList<>();
-    synchronized(closeLock) {
-      for(byte[] column : columns) {
-        Store store = this.stores.get(column);
+    synchronized (closeLock) {
+      for (byte[] column : columns) {
+        HStore store = this.stores.get(column);
         if (store == null) {
-          throw new IllegalArgumentException("No column family : " +
-              new String(column) + " available");
+          throw new IllegalArgumentException(
+              "No column family : " + new String(column) + " available");
         }
         Collection<StoreFile> storeFiles = store.getStorefiles();
-        if (storeFiles == null) continue;
-        for (StoreFile storeFile: storeFiles) {
+        if (storeFiles == null) {
+          continue;
+        }
+        for (StoreFile storeFile : storeFiles) {
           storeFileNames.add(storeFile.getPath().toString());
         }
 
@@ -5368,7 +5328,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   //////////////////////////////////////////////////////////////////////////////
 
   /** Make sure this is a valid row for the HRegion */
-  void checkRow(final byte [] row, String op) throws IOException {
+  void checkRow(byte[] row, String op) throws IOException {
     if (!rowIsInRange(getRegionInfo(), row)) {
       throw new WrongRegionException("Requested row out of range for " +
           op + " on HRegion " + this + ", startKey='" +
@@ -5637,7 +5597,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         byte[] familyName = p.getFirst();
         String path = p.getSecond();
 
-        HStore store = getHStore(familyName);
+        HStore store = getStore(familyName);
         if (store == null) {
           IOException ioe = new org.apache.hadoop.hbase.DoNotRetryIOException(
               "No such column family " + Bytes.toStringBinary(familyName));
@@ -5697,7 +5657,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       for (Pair<byte[], String> p : familyPaths) {
         byte[] familyName = p.getFirst();
         String path = p.getSecond();
-        HStore store = getHStore(familyName);
+        HStore store = getStore(familyName);
         if (!familyWithFinalPath.containsKey(familyName)) {
           familyWithFinalPath.put(familyName, new ArrayList<>());
         }
@@ -5737,7 +5697,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         for (Pair<Path, Path> p : entry.getValue()) {
           String path = p.getFirst().toString();
           Path commitedStoreFile = p.getSecond();
-          HStore store = getHStore(familyName);
+          HStore store = getStore(familyName);
           try {
             store.bulkLoadHFile(familyName, path, commitedStoreFile);
             // Note the size of the store file
@@ -5912,7 +5872,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
       try {
         for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
-          Store store = stores.get(entry.getKey());
+          HStore store = stores.get(entry.getKey());
           KeyValueScanner scanner;
           try {
             scanner = store.getScanner(scan, entry.getValue(), this.readPt);
@@ -7145,7 +7105,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
                 // If no WAL, need to stamp it here.
                 CellUtil.setSequenceId(cell, sequenceId);
               }
-              applyToMemstore(getHStore(cell), cell, memstoreSize);
+              applyToMemstore(getStore(cell), cell, memstoreSize);
             }
           }
 
@@ -7296,7 +7256,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           return returnResults? cpResult: null;
         }
         Durability effectiveDurability = getEffectiveDurability(mutation.getDurability());
-        Map<Store, List<Cell>> forMemStore = new HashMap<>(mutation.getFamilyCellMap().size());
+        Map<HStore, List<Cell>> forMemStore = new HashMap<>(mutation.getFamilyCellMap().size());
         // Reckon Cells to apply to WAL --  in returned walEdit -- and what to add to memstore and
         // what to return back to the client (in 'forMemStore' and 'results' respectively).
         WALEdit walEdit = reckonDeltas(op, mutation, effectiveDurability, forMemStore, results);
@@ -7311,7 +7271,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           updateSequenceId(forMemStore.values(), writeEntry.getWriteNumber());
         }
         // Now write to MemStore. Do it a column family at a time.
-        for (Map.Entry<Store, List<Cell>> e : forMemStore.entrySet()) {
+        for (Map.Entry<HStore, List<Cell>> e : forMemStore.entrySet()) {
           applyToMemstore(e.getKey(), e.getValue(), true, memstoreSize);
         }
         mvcc.completeAndWait(writeEntry);
@@ -7419,18 +7379,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @param forMemStore Fill in here what to apply to the MemStore (by Store).
    * @return A WALEdit to apply to WAL or null if we are to skip the WAL.
    */
-  private WALEdit reckonDeltas(final Operation op, final Mutation mutation,
-      final Durability effectiveDurability, final Map<Store, List<Cell>> forMemStore,
-      final List<Cell> results)
-  throws IOException {
+  private WALEdit reckonDeltas(Operation op, Mutation mutation, Durability effectiveDurability,
+      Map<HStore, List<Cell>> forMemStore, List<Cell> results) throws IOException {
     WALEdit walEdit = null;
     long now = EnvironmentEdgeManager.currentTime();
     final boolean writeToWAL = effectiveDurability != Durability.SKIP_WAL;
     // Process a Store/family at a time.
     for (Map.Entry<byte [], List<Cell>> entry: mutation.getFamilyCellMap().entrySet()) {
-      final byte [] columnFamilyName = entry.getKey();
+      final byte[] columnFamilyName = entry.getKey();
       List<Cell> deltas = entry.getValue();
-      Store store = this.stores.get(columnFamilyName);
+      HStore store = this.stores.get(columnFamilyName);
       // Reckon for the Store what to apply to WAL and MemStore.
       List<Cell> toApply =
         reckonDeltasByStore(store, op, mutation, effectiveDurability, now, deltas, results);
@@ -7462,11 +7420,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @return Resulting Cells after <code>deltas</code> have been applied to current
    *  values. Side effect is our filling out of the <code>results</code> List.
    */
-  private List<Cell> reckonDeltasByStore(final Store store, final Operation op,
-      final Mutation mutation, final Durability effectiveDurability, final long now,
-      final List<Cell> deltas, final List<Cell> results)
-  throws IOException {
-    byte [] columnFamily = store.getColumnFamilyDescriptor().getName();
+  private List<Cell> reckonDeltasByStore(HStore store, Operation op, Mutation mutation,
+      Durability effectiveDurability, long now, List<Cell> deltas, List<Cell> results)
+      throws IOException {
+    byte[] columnFamily = store.getColumnFamilyDescriptor().getName();
     List<Cell> toApply = new ArrayList<>(deltas.size());
     // Get previous values for all columns in this family.
     List<Cell> currentValues = get(mutation, store, deltas,
@@ -7576,9 +7533,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @param coordinates Cells from <code>mutation</code> used as coordinates applied to Get.
    * @return Return list of Cells found.
    */
-  private List<Cell> get(final Mutation mutation, final Store store,
-          final List<Cell> coordinates, final IsolationLevel isolation, final TimeRange tr)
-  throws IOException {
+  private List<Cell> get(Mutation mutation, HStore store, List<Cell> coordinates,
+      IsolationLevel isolation, TimeRange tr) throws IOException {
     // Sort the cells so that they match the order that they appear in the Get results. Otherwise,
     // we won't be able to find the existing values if the cells are not specified in order by the
     // client since cells are in an array list.
@@ -7653,12 +7609,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   @Override
   public long heapSize() {
-    long heapSize = DEEP_OVERHEAD;
-    for (Store store : this.stores.values()) {
-      heapSize += store.heapSize();
-    }
     // this does not take into account row locks, recent flushes, mvcc entries, and more
-    return heapSize;
+    return DEEP_OVERHEAD + stores.values().stream().mapToLong(HStore::heapSize).sum();
   }
 
   @Override
@@ -7813,14 +7765,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @return The priority that this region should have in the compaction queue
    */
   public int getCompactPriority() {
-    int count = Integer.MAX_VALUE;
-    for (Store store : stores.values()) {
-      count = Math.min(count, store.getCompactPriority());
-    }
-    return count;
+    return stores.values().stream().mapToInt(HStore::getCompactPriority).min()
+        .orElse(Store.NO_PRIORITY);
   }
 
-
   /** @return the coprocessor host */
   @Override
   public RegionCoprocessorHost getCoprocessorHost() {
@@ -7881,11 +7829,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     // The unit for snapshot is a region. So, all stores for this region must be
     // prepared for snapshot operation before proceeding.
     if (op == Operation.SNAPSHOT) {
-      for (Store store : stores.values()) {
-        if (store instanceof HStore) {
-          ((HStore)store).preSnapshotOperation();
-        }
-      }
+      stores.values().forEach(HStore::preSnapshotOperation);
     }
     try {
       if (coprocessorHost != null) {
@@ -7905,11 +7849,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   @Override
   public void closeRegionOperation(Operation operation) throws IOException {
     if (operation == Operation.SNAPSHOT) {
-      for (Store store: stores.values()) {
-        if (store instanceof HStore) {
-          ((HStore)store).postSnapshotOperation();
-        }
-      }
+      stores.values().forEach(HStore::postSnapshotOperation);
     }
     lock.readLock().unlock();
     if (coprocessorHost != null) {
@@ -8142,9 +8082,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   @Override
   public void registerChildren(ConfigurationManager manager) {
     configurationManager = Optional.of(manager);
-    for (Store s : this.stores.values()) {
-      configurationManager.get().registerObserver(s);
-    }
+    stores.values().forEach(manager::registerObserver);
   }
 
   /**
@@ -8152,9 +8090,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    */
   @Override
   public void deregisterChildren(ConfigurationManager manager) {
-    for (Store s : this.stores.values()) {
-      configurationManager.get().deregisterObserver(s);
-    }
+    stores.values().forEach(configurationManager.get()::deregisterObserver);
   }
 
   @Override
@@ -8175,7 +8111,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     buf.append(getRegionInfo().isMetaRegion() ? " meta region " : " ");
     buf.append(getRegionInfo().isMetaTable() ? " meta table " : " ");
     buf.append("stores: ");
-    for (Store s : getStores()) {
+    for (HStore s : stores.values()) {
       buf.append(s.getColumnFamilyDescriptor().getNameAsString());
       buf.append(" size: ");
       buf.append(s.getSizeOfMemStore().getDataSize());
@@ -8188,4 +8124,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       throw new RuntimeException(buf.toString());
     }
   }
+
+  @Override
+  public void requestCompaction(String why, int priority, CompactionLifeCycleTracker tracker,
+      User user) throws IOException {
+    ((HRegionServer) rsServices).compactSplitThread.requestCompaction(this, why, priority, tracker,
+      user);
+  }
+
+  @Override
+  public void requestCompaction(byte[] family, String why, int priority,
+      CompactionLifeCycleTracker tracker, User user) throws IOException {
+    ((HRegionServer) rsServices).compactSplitThread.requestCompaction(this,
+      Preconditions.checkNotNull(stores.get(family)), why, priority, tracker, user);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 6bbff36..62987c0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -56,8 +56,8 @@ import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 import javax.servlet.http.HttpServlet;
 
-import org.apache.commons.lang3.SystemUtils;
 import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.SystemUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -127,6 +127,7 @@ import org.apache.hadoop.hbase.quotas.QuotaUtil;
 import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
 import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
@@ -140,6 +141,9 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
 import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingRpcChannel;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
@@ -210,10 +214,6 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.data.Stat;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps;
-
 import sun.misc.Signal;
 import sun.misc.SignalHandler;
 
@@ -1686,7 +1686,7 @@ public class HRegionServer extends HasThread implements
     int totalStaticBloomSizeKB = 0;
     long totalCompactingKVs = 0;
     long currentCompactedKVs = 0;
-    List<Store> storeList = r.getStores();
+    List<? extends Store> storeList = r.getStores();
     stores += storeList.size();
     for (Store store : storeList) {
       storefiles += store.getStorefilesCount();
@@ -1772,27 +1772,32 @@ public class HRegionServer extends HasThread implements
     @Override
     protected void chore() {
       for (Region r : this.instance.onlineRegions.values()) {
-        if (r == null)
+        if (r == null) {
           continue;
-        for (Store s : r.getStores()) {
+        }
+        HRegion hr = (HRegion) r;
+        for (HStore s : hr.stores.values()) {
           try {
             long multiplier = s.getCompactionCheckMultiplier();
             assert multiplier > 0;
-            if (iteration % multiplier != 0) continue;
+            if (iteration % multiplier != 0) {
+              continue;
+            }
             if (s.needsCompaction()) {
               // Queue a compaction. Will recognize if major is needed.
-              this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
-                  + " requests compaction");
+              this.instance.compactSplitThread.requestSystemCompaction(hr, s,
+                getName() + " requests compaction");
             } else if (s.isMajorCompaction()) {
               s.triggerMajorCompaction();
-              if (majorCompactPriority == DEFAULT_PRIORITY
-                  || majorCompactPriority > ((HRegion)r).getCompactPriority()) {
-                this.instance.compactSplitThread.requestCompaction(r, s, getName()
-                    + " requests major compaction; use default priority", null);
+              if (majorCompactPriority == DEFAULT_PRIORITY ||
+                  majorCompactPriority > hr.getCompactPriority()) {
+                this.instance.compactSplitThread.requestCompaction(hr, s,
+                  getName() + " requests major compaction; use default priority", Store.NO_PRIORITY,
+                  CompactionLifeCycleTracker.DUMMY, null);
               } else {
-                this.instance.compactSplitThread.requestCompaction(r, s, getName()
-                    + " requests major compaction; use configured priority",
-                  this.majorCompactPriority, null, null);
+                this.instance.compactSplitThread.requestCompaction(hr, s,
+                  getName() + " requests major compaction; use configured priority",
+                  this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null);
               }
             }
           } catch (IOException e) {
@@ -2146,15 +2151,14 @@ public class HRegionServer extends HasThread implements
   @Override
   public void postOpenDeployTasks(final PostOpenDeployContext context)
       throws KeeperException, IOException {
-    Region r = context.getRegion();
+    HRegion r = (HRegion) context.getRegion();
     long masterSystemTime = context.getMasterSystemTime();
-    Preconditions.checkArgument(r instanceof HRegion, "r must be an HRegion");
     rpcServices.checkOpen();
     LOG.info("Post open deploy tasks for " + r.getRegionInfo().getRegionNameAsString());
     // Do checks to see if we need to compact (references or too many files)
-    for (Store s : r.getStores()) {
+    for (HStore s : r.stores.values()) {
       if (s.hasReferences() || s.needsCompaction()) {
-       this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
+        this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
       }
     }
     long openSeqNum = r.getOpenSeqNum();
@@ -2863,11 +2867,6 @@ public class HRegionServer extends HasThread implements
     return serverName;
   }
 
-  @Override
-  public CompactionRequestor getCompactionRequester() {
-    return this.compactSplitThread;
-  }
-
   public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){
     return this.rsHost;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index f011c18..daad241 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -30,6 +30,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
@@ -52,13 +53,12 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CompoundConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.MemoryCompactionPolicy;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.FailedArchiveException;
-import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.conf.ConfigurationManager;
 import org.apache.hadoop.hbase.io.compress.Compression;
@@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
@@ -82,8 +83,6 @@ import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.security.EncryptionUtil;
 import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -92,14 +91,16 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
+import org.apache.yetus.audience.InterfaceAudience;
 
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableCollection;
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
 
 /**
  * A Store holds a column family in a Region.  Its a memstore and a set of zero
@@ -477,7 +478,7 @@ public class HStore implements Store {
   /**
    * @param tabledir {@link Path} to where the table is being stored
    * @param hri {@link HRegionInfo} for the region.
-   * @param family {@link HColumnDescriptor} describing the column family
+   * @param family {@link ColumnFamilyDescriptor} describing the column family
    * @return Path to family/Store home directory.
    */
   @Deprecated
@@ -489,7 +490,7 @@ public class HStore implements Store {
   /**
    * @param tabledir {@link Path} to where the table is being stored
    * @param encodedName Encoded region name.
-   * @param family {@link HColumnDescriptor} describing the column family
+   * @param family {@link ColumnFamilyDescriptor} describing the column family
    * @return Path to family/Store home directory.
    */
   @Deprecated
@@ -1386,15 +1387,14 @@ public class HStore implements Store {
     }
   }
 
-  private List<StoreFile> moveCompatedFilesIntoPlace(
-      final CompactionRequest cr, List<Path> newFiles, User user) throws IOException {
+  private List<StoreFile> moveCompatedFilesIntoPlace(CompactionRequest cr, List<Path> newFiles,
+      User user) throws IOException {
     List<StoreFile> sfs = new ArrayList<>(newFiles.size());
     for (Path newFile : newFiles) {
       assert newFile != null;
-      final StoreFile sf = moveFileIntoPlace(newFile);
+      StoreFile sf = moveFileIntoPlace(newFile);
       if (this.getCoprocessorHost() != null) {
-        final Store thisStore = this;
-        getCoprocessorHost().postCompact(thisStore, sf, cr, user);
+        getCoprocessorHost().postCompact(this, sf, cr.getTracker(), user);
       }
       assert sf != null;
       sfs.add(sf);
@@ -1636,23 +1636,12 @@ public class HStore implements Store {
   }
 
   @Override
-  public CompactionContext requestCompaction() throws IOException {
-    return requestCompaction(Store.NO_PRIORITY, null);
-  }
-
-  @Override
-  public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
-      throws IOException {
-    return requestCompaction(priority, baseRequest, null);
-  }
-  @Override
-  public CompactionContext requestCompaction(int priority, final CompactionRequest baseRequest,
-      User user) throws IOException {
+  public Optional<CompactionContext> requestCompaction(int priority,
+      CompactionLifeCycleTracker tracker, User user) throws IOException {
     // don't even select for compaction if writes are disabled
     if (!this.areWritesEnabled()) {
-      return null;
+      return Optional.empty();
     }
-
     // Before we do compaction, try to get rid of unneeded files to simplify things.
     removeUnneededFiles();
 
@@ -1666,7 +1655,7 @@ public class HStore implements Store {
           final List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
           boolean override = false;
           override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc,
-              baseRequest, user);
+            tracker, user);
           if (override) {
             // Coprocessor is overriding normal file selection.
             compaction.forceSelect(new CompactionRequest(candidatesForCoproc));
@@ -1695,21 +1684,13 @@ public class HStore implements Store {
         }
         if (this.getCoprocessorHost() != null) {
           this.getCoprocessorHost().postCompactSelection(
-              this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest, user);
-        }
-
-        // Selected files; see if we have a compaction with some custom base request.
-        if (baseRequest != null) {
-          // Update the request with what the system thinks the request should be;
-          // its up to the request if it wants to listen.
-          compaction.forceSelect(
-              baseRequest.combineWith(compaction.getRequest()));
+              this, ImmutableList.copyOf(compaction.getRequest().getFiles()), tracker, user);
         }
         // Finally, we have the resulting files list. Check if we have any files at all.
         request = compaction.getRequest();
-        final Collection<StoreFile> selectedFiles = request.getFiles();
+        Collection<StoreFile> selectedFiles = request.getFiles();
         if (selectedFiles.isEmpty()) {
-          return null;
+          return Optional.empty();
         }
 
         addToCompactingFiles(selectedFiles);
@@ -1721,6 +1702,7 @@ public class HStore implements Store {
         // Set priority, either override value supplied by caller or from store.
         request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
         request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
+        request.setTracker(tracker);
       }
     } finally {
       this.lock.readLock().unlock();
@@ -1730,7 +1712,7 @@ public class HStore implements Store {
         + ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction"
         + (request.isAllFiles() ? " (all files)" : ""));
     this.region.reportCompactionRequestStart(request.isMajor());
-    return compaction;
+    return Optional.of(compaction);
   }
 
   /** Adds the files to compacting files. filesCompacting must be locked. */

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index 020142d..8fa686c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -20,11 +20,8 @@ package org.apache.hadoop.hbase.regionserver;
 
 import static org.apache.hadoop.util.StringUtils.humanReadableInt;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
-
 import java.io.IOException;
 import java.lang.Thread.UncaughtExceptionHandler;
-import java.lang.management.MemoryType;
 import java.util.ArrayList;
 import java.util.ConcurrentModificationException;
 import java.util.HashMap;
@@ -50,6 +47,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.HasThread;
@@ -448,8 +446,8 @@ class MemStoreFlusher implements FlushRequester {
             "store files; delaying flush up to " + this.blockingWaitTime + "ms");
           if (!this.server.compactSplitThread.requestSplit(region)) {
             try {
-              this.server.compactSplitThread.requestSystemCompaction(
-                  region, Thread.currentThread().getName());
+              this.server.compactSplitThread.requestSystemCompaction((HRegion) region,
+                Thread.currentThread().getName());
             } catch (IOException e) {
               e = e instanceof RemoteException ?
                       ((RemoteException)e).unwrapRemoteException() : e;
@@ -503,8 +501,8 @@ class MemStoreFlusher implements FlushRequester {
       if (shouldSplit) {
         this.server.compactSplitThread.requestSplit(region);
       } else if (shouldCompact) {
-        server.compactSplitThread.requestSystemCompaction(
-            region, Thread.currentThread().getName());
+        server.compactSplitThread.requestSystemCompaction((HRegion) region,
+          Thread.currentThread().getName());
       }
     } catch (DroppedSnapshotException ex) {
       // Cache flush can fail in a few places. If it fails in a critical

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
index 2611f69..e30ed8e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
@@ -761,7 +761,7 @@ class MetricsRegionServerWrapperImpl
           tempCheckAndMutateChecksFailed += r.getCheckAndMutateChecksFailed();
           tempCheckAndMutateChecksPassed += r.getCheckAndMutateChecksPassed();
           tempBlockedRequestsCount += r.getBlockedRequestsCount();
-          List<Store> storeList = r.getStores();
+          List<? extends Store> storeList = r.getStores();
           tempNumStores += storeList.size();
           for (Store store : storeList) {
             tempNumStoreFiles += store.getStorefilesCount();

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
index 667b46c..dc7d3cb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
@@ -95,7 +95,7 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
 
   @Override
   public long getNumStores() {
-    Map<byte[],Store> stores = this.region.stores;
+    Map<byte[], HStore> stores = this.region.stores;
     if (stores == null) {
       return 0;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 02662c4..61c725b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -84,6 +84,7 @@ import org.apache.hadoop.hbase.regionserver.Leases.Lease;
 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
 import org.apache.hadoop.hbase.regionserver.Region.Operation;
 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
 import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
 import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
@@ -1538,7 +1539,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     try {
       checkOpen();
       requestCount.increment();
-      Region region = getRegion(request.getRegion());
+      HRegion region = (HRegion) getRegion(request.getRegion());
       // Quota support is enabled, the requesting user is not system/super user
       // and a quota policy is enforced that disables compactions.
       if (QuotaUtil.isQuotaEnabled(getConfiguration()) &&
@@ -1552,7 +1553,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
       boolean major = false;
       byte [] family = null;
-      Store store = null;
+      HStore store = null;
       if (request.hasFamily()) {
         family = request.getFamily().toByteArray();
         store = region.getStore(family);
@@ -1579,12 +1580,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           + region.getRegionInfo().getRegionNameAsString() + familyLogMsg);
       }
       String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg;
-      if(family != null) {
-        regionServer.compactSplitThread.requestCompaction(region, store, log,
-          Store.PRIORITY_USER, null, RpcServer.getRequestUser());
+      if (family != null) {
+        regionServer.compactSplitThread.requestCompaction(region, store, log, Store.PRIORITY_USER,
+          CompactionLifeCycleTracker.DUMMY, RpcServer.getRequestUser());
       } else {
-        regionServer.compactSplitThread.requestCompaction(region, log,
-          Store.PRIORITY_USER, null, RpcServer.getRequestUser());
+        regionServer.compactSplitThread.requestCompaction(region, log, Store.PRIORITY_USER,
+          CompactionLifeCycleTracker.DUMMY, RpcServer.getRequestUser());
       }
       return CompactRegionResponse.newBuilder().build();
     } catch (IOException ie) {
@@ -1606,7 +1607,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     try {
       checkOpen();
       requestCount.increment();
-      Region region = getRegion(request.getRegion());
+      HRegion region = (HRegion) getRegion(request.getRegion());
       LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString());
       boolean shouldFlush = true;
       if (request.hasIfOlderThanTs()) {
@@ -1617,8 +1618,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         boolean writeFlushWalMarker =  request.hasWriteFlushWalMarker() ?
             request.getWriteFlushWalMarker() : false;
         // Go behind the curtain so we can manage writing of the flush WAL marker
-        HRegion.FlushResultImpl flushResult = (HRegion.FlushResultImpl)
-            ((HRegion)region).flushcache(true, writeFlushWalMarker);
+        HRegion.FlushResultImpl flushResult = region.flushcache(true, writeFlushWalMarker);
         boolean compactionNeeded = flushResult.isCompactionNeeded();
         if (compactionNeeded) {
           regionServer.compactSplitThread.requestSystemCompaction(region,