You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/09/14 12:54:20 UTC

[1/3] hbase git commit: HBASE-18453 CompactionRequest should not be exposed to user directly

Repository: hbase
Updated Branches:
  refs/heads/master 38e983ed4 -> 61d10feff


http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index 45b7d3c..4890f0d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -23,10 +23,12 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CompareOperator;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
@@ -41,13 +43,17 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.Service;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
 
 import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Service;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
 
 /**
  * Regions store data for a certain region of a table.  It stores all columns
@@ -105,7 +111,7 @@ public interface Region extends ConfigurationObserver {
    * <p>Use with caution.  Exposed for use of fixup utilities.
    * @return a list of the Stores managed by this region
    */
-  List<Store> getStores();
+  List<? extends Store> getStores();
 
   /**
    * Return the Store for the given family
@@ -115,7 +121,7 @@ public interface Region extends ConfigurationObserver {
   Store getStore(byte[] family);
 
   /** @return list of store file names for the given families */
-  List<String> getStoreFileList(byte [][] columns);
+  List<String> getStoreFileList(byte[][] columns);
 
   /**
    * Check the region's underlying store files, open the files that have not
@@ -753,6 +759,18 @@ public interface Region extends ConfigurationObserver {
    */
   CompactionState getCompactionState();
 
+  /**
+   * Request compaction on this region.
+   */
+  void requestCompaction(String why, int priority, CompactionLifeCycleTracker tracker, User user)
+      throws IOException;
+
+  /**
+   * Request compaction for the given family
+   */
+  void requestCompaction(byte[] family, String why, int priority,
+      CompactionLifeCycleTracker tracker, User user) throws IOException;
+
   /** Wait for all current flushes and compactions of the region to complete */
   void waitForFlushesAndCompactions();
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index bed2a7a..f282766 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.metrics.MetricRegistry;
 import org.apache.hadoop.hbase.regionserver.Region.Operation;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
 import org.apache.hadoop.hbase.security.User;
@@ -499,18 +500,18 @@ public class RegionCoprocessorHost
   /**
    * See
    * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long,
-   *   InternalScanner, CompactionRequest, long)}
+   *   InternalScanner, CompactionLifeCycleTracker, long)}
    */
-  public InternalScanner preCompactScannerOpen(final Store store,
-      final List<StoreFileScanner> scanners, final ScanType scanType, final long earliestPutTs,
-      final CompactionRequest request, final User user, final long readPoint) throws IOException {
+  public InternalScanner preCompactScannerOpen(Store store, List<StoreFileScanner> scanners,
+      ScanType scanType, long earliestPutTs, CompactionLifeCycleTracker tracker, User user,
+      long readPoint) throws IOException {
     return execOperationWithResult(null,
         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>(user) {
       @Override
       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
           throws IOException {
         setResult(oserver.preCompactScannerOpen(ctx, store, scanners, scanType,
-          earliestPutTs, getResult(), request, readPoint));
+          earliestPutTs, getResult(), tracker, readPoint));
       }
     });
   }
@@ -520,17 +521,17 @@ public class RegionCoprocessorHost
    * available candidates.
    * @param store The store where compaction is being requested
    * @param candidates The currently available store files
-   * @param request custom compaction request
+   * @param tracker used to track the life cycle of a compaction
    * @return If {@code true}, skip the normal selection process and use the current list
    * @throws IOException
    */
-  public boolean preCompactSelection(final Store store, final List<StoreFile> candidates,
-      final CompactionRequest request, final User user) throws IOException {
+  public boolean preCompactSelection(Store store, List<StoreFile> candidates,
+      CompactionLifeCycleTracker tracker, User user) throws IOException {
     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) {
       @Override
       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
           throws IOException {
-        oserver.preCompactSelection(ctx, store, candidates, request);
+        oserver.preCompactSelection(ctx, store, candidates, tracker);
       }
     });
   }
@@ -540,21 +541,17 @@ public class RegionCoprocessorHost
    * candidates.
    * @param store The store where compaction is being requested
    * @param selected The store files selected to compact
-   * @param request custom compaction
+   * @param tracker used to track the life cycle of a compaction
    */
-  public void postCompactSelection(final Store store, final ImmutableList<StoreFile> selected,
-      final CompactionRequest request, final User user) {
-    try {
-      execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) {
-        @Override
-        public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-            throws IOException {
-          oserver.postCompactSelection(ctx, store, selected, request);
-        }
-      });
-    } catch (IOException e) {
-      LOG.warn(e);
-    }
+  public void postCompactSelection(Store store, ImmutableList<StoreFile> selected,
+      CompactionLifeCycleTracker tracker, User user) throws IOException {
+    execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) {
+      @Override
+      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+          throws IOException {
+        oserver.postCompactSelection(ctx, store, selected, tracker);
+      }
+    });
   }
 
   /**
@@ -562,18 +559,17 @@ public class RegionCoprocessorHost
    * @param store the store being compacted
    * @param scanner the scanner used to read store data during compaction
    * @param scanType type of Scan
-   * @param request the compaction that will be executed
+   * @param tracker used to track the life cycle of a compaction
    * @throws IOException
    */
-  public InternalScanner preCompact(final Store store, final InternalScanner scanner,
-      final ScanType scanType, final CompactionRequest request, final User user)
-      throws IOException {
+  public InternalScanner preCompact(Store store, InternalScanner scanner, ScanType scanType,
+      CompactionLifeCycleTracker tracker, User user) throws IOException {
     return execOperationWithResult(false, scanner,
         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>(user) {
       @Override
       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
           throws IOException {
-        setResult(oserver.preCompact(ctx, store, getResult(), scanType, request));
+        setResult(oserver.preCompact(ctx, store, getResult(), scanType, tracker));
       }
     });
   }
@@ -582,16 +578,16 @@ public class RegionCoprocessorHost
    * Called after the store compaction has completed.
    * @param store the store being compacted
    * @param resultFile the new store file written during compaction
-   * @param request the compaction that is being executed
+   * @param tracker used to track the life cycle of a compaction
    * @throws IOException
    */
-  public void postCompact(final Store store, final StoreFile resultFile,
-      final CompactionRequest request, final User user) throws IOException {
+  public void postCompact(Store store, StoreFile resultFile, CompactionLifeCycleTracker tracker,
+      User user) throws IOException {
     execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) {
       @Override
       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
           throws IOException {
-        oserver.postCompact(ctx, store, resultFile, request);
+        oserver.postCompact(ctx, store, resultFile, tracker);
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index b56c925..209fce8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -64,11 +64,6 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi
   List<WAL> getWALs() throws IOException;
 
   /**
-   * @return Implementation of {@link CompactionRequestor} or null.
-   */
-  CompactionRequestor getCompactionRequester();
-
-  /**
    * @return Implementation of {@link FlushRequester} or null.
    */
   FlushRequester getFlushRequester();

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionSplitPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionSplitPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionSplitPolicy.java
index 7281626..5ccd6e3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionSplitPolicy.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionSplitPolicy.java
@@ -76,7 +76,7 @@ public abstract class RegionSplitPolicy extends Configured {
     if (explicitSplitPoint != null) {
       return explicitSplitPoint;
     }
-    List<Store> stores = region.getStores();
+    List<HStore> stores = region.getStores();
 
     byte[] splitPointFromLargestStore = null;
     long largestStoreSize = 0;

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index 4bb31ae..b7e83bf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.NavigableSet;
+import java.util.Optional;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.CellComparator;
@@ -37,6 +38,7 @@ import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
@@ -51,7 +53,8 @@ import org.apache.hadoop.hbase.security.User;
 @InterfaceStability.Evolving
 public interface Store extends HeapSize, StoreConfigInformation, PropagatingConfigurationObserver {
 
-  /* The default priority for user-specified compaction requests.
+  /**
+   * The default priority for user-specified compaction requests.
    * The user gets top priority unless we have blocking compactions. (Pri <= 0)
    */
   int PRIORITY_USER = 1;
@@ -253,17 +256,12 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
    */
   CompactionProgress getCompactionProgress();
 
-  CompactionContext requestCompaction() throws IOException;
-
-  /**
-   * @deprecated see requestCompaction(int, CompactionRequest, User)
-   */
-  @Deprecated
-  CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
-      throws IOException;
+  default Optional<CompactionContext> requestCompaction() throws IOException {
+    return requestCompaction(NO_PRIORITY, CompactionLifeCycleTracker.DUMMY, null);
+  }
 
-  CompactionContext requestCompaction(int priority, CompactionRequest baseRequest, User user)
-      throws IOException;
+  Optional<CompactionContext> requestCompaction(int priority, CompactionLifeCycleTracker tracker,
+      User user) throws IOException;
 
   void cancelRequestedCompaction(CompactionContext compaction);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionLifeCycleTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionLifeCycleTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionLifeCycleTracker.java
new file mode 100644
index 0000000..38fec7e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionLifeCycleTracker.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * Used to track compaction execution.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
+@InterfaceStability.Evolving
+public interface CompactionLifeCycleTracker {
+
+  static CompactionLifeCycleTracker DUMMY = new CompactionLifeCycleTracker() {
+  };
+
+  /**
+   * Called before compaction is executed by CompactSplitThread.
+   * <p>
+   * Requesting compaction on a region can lead to multiple compactions on different stores, so we
+   * will pass the {@link Store} in to tell you the store we operate on.
+   */
+  default void beforeExecute(Store store) {
+  }
+
+  /**
+   * Called after compaction is executed by CompactSplitThread.
+   * <p>
+   * Requesting compaction on a region can lead to multiple compactions on different stores, so we
+   * will pass the {@link Store} in to tell you the store we operate on.
+   */
+  default void afterExecute(Store store) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
index 69e354b..e05c165 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
@@ -18,25 +18,21 @@
  */
 package org.apache.hadoop.hbase.regionserver.compactions;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.stream.Collectors;
 
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.StoreFileReader;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
+import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
 
 /**
  * This class holds all logical details necessary to run a compaction.
  */
-@InterfaceAudience.LimitedPrivate({ "coprocessor" })
-@InterfaceStability.Evolving
+@InterfaceAudience.Private
 public class CompactionRequest {
 
   // was this compaction promoted to an off-peak
@@ -53,56 +49,18 @@ public class CompactionRequest {
   private String regionName = "";
   private String storeName = "";
   private long totalSize = -1L;
+  private CompactionLifeCycleTracker tracker = CompactionLifeCycleTracker.DUMMY;
 
-  /**
-   * This ctor should be used by coprocessors that want to subclass CompactionRequest.
-   */
-  public CompactionRequest() {
+  public CompactionRequest(Collection<StoreFile> files) {
     this.selectionTime = EnvironmentEdgeManager.currentTime();
     this.timeInNanos = System.nanoTime();
-  }
-
-  public CompactionRequest(Collection<StoreFile> files) {
-    this();
-    Preconditions.checkNotNull(files);
-    this.filesToCompact = files;
+    this.filesToCompact = Preconditions.checkNotNull(files, "files for compaction can not null");
     recalculateSize();
   }
 
   public void updateFiles(Collection<StoreFile> files) {
-    this.filesToCompact = files;
-    recalculateSize();
-  }
-
-  /**
-   * Called before compaction is executed by CompactSplitThread; for use by coproc subclasses.
-   */
-  public void beforeExecute() {}
-
-  /**
-   * Called after compaction is executed by CompactSplitThread; for use by coproc subclasses.
-   */
-  public void afterExecute() {}
-
-  /**
-   * Combines the request with other request. Coprocessors subclassing CR may override
-   * this if they want to do clever things based on CompactionPolicy selection that
-   * is passed to this method via "other". The default implementation just does a copy.
-   * @param other Request to combine with.
-   * @return The result (may be "this" or "other").
-   */
-  public CompactionRequest combineWith(CompactionRequest other) {
-    this.filesToCompact = new ArrayList<>(other.getFiles());
-    this.isOffPeak = other.isOffPeak;
-    this.isMajor = other.isMajor;
-    this.priority = other.priority;
-    this.selectionTime = other.selectionTime;
-    this.timeInNanos = other.timeInNanos;
-    this.regionName = other.regionName;
-    this.storeName = other.storeName;
-    this.totalSize = other.totalSize;
+    this.filesToCompact = Preconditions.checkNotNull(files, "files for compaction can not null");
     recalculateSize();
-    return this;
   }
 
   public Collection<StoreFile> getFiles() {
@@ -168,6 +126,14 @@ public class CompactionRequest {
         : (isMajor ? DisplayCompactionType.MAJOR : DisplayCompactionType.ALL_FILES);
   }
 
+  public void setTracker(CompactionLifeCycleTracker tracker) {
+    this.tracker = tracker;
+  }
+
+  public CompactionLifeCycleTracker getTracker() {
+    return tracker;
+  }
+
   @Override
   public String toString() {
     String fsList = filesToCompact.stream().filter(f -> f.getReader() != null)
@@ -186,12 +152,7 @@ public class CompactionRequest {
    * @param files files that should be included in the compaction
    */
   private void recalculateSize() {
-    long sz = 0;
-    for (StoreFile sf : this.filesToCompact) {
-      StoreFileReader r = sf.getReader();
-      sz += r == null ? 0 : r.length();
-    }
-    this.totalSize = sz;
+    this.totalSize = filesToCompact.stream().map(StoreFile::getReader)
+        .mapToLong(r -> r != null ? r.length() : 0L).sum();
   }
 }
-

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 0ca925e..e6d1935 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -338,14 +338,14 @@ public abstract class Compactor<T extends CellSink> {
    * @param readPoint the read point to help create scanner by Coprocessor if required.
    * @return Scanner override by coprocessor; null if not overriding.
    */
-  protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
-      final ScanType scanType, final long earliestPutTs, final List<StoreFileScanner> scanners,
-      User user, final long readPoint) throws IOException {
+  protected InternalScanner preCreateCoprocScanner(CompactionRequest request, ScanType scanType,
+      long earliestPutTs, List<StoreFileScanner> scanners, User user, long readPoint)
+      throws IOException {
     if (store.getCoprocessorHost() == null) {
       return null;
     }
     return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, scanType,
-        earliestPutTs, request, user, readPoint);
+      earliestPutTs, request.getTracker(), user, readPoint);
   }
 
   /**
@@ -355,12 +355,13 @@ public abstract class Compactor<T extends CellSink> {
    * @param scanner The default scanner created for compaction.
    * @return Scanner scanner to use (usually the default); null if compaction should not proceed.
    */
-  protected InternalScanner postCreateCoprocScanner(final CompactionRequest request,
-      final ScanType scanType, final InternalScanner scanner, User user) throws IOException {
+  protected InternalScanner postCreateCoprocScanner(CompactionRequest request, ScanType scanType,
+      InternalScanner scanner, User user) throws IOException {
     if (store.getCoprocessorHost() == null) {
       return scanner;
     }
-    return store.getCoprocessorHost().preCompact(store, scanner, scanType, request, user);
+    return store.getCoprocessorHost().preCompact(store, scanner, scanType, request.getTracker(),
+      user);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index f7d70d2..c34fc6d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -92,6 +92,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
@@ -1532,7 +1533,8 @@ public class AccessController implements MasterObserver, RegionObserver, RegionS
 
   @Override
   public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
-      InternalScanner scanner, ScanType scanType, CompactionRequest request) throws IOException {
+      InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker)
+      throws IOException {
     requirePermission(getActiveUser(c), "compact", getTableName(c.getEnvironment()), null, null,
       Action.ADMIN, Action.CREATE);
     return scanner;

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/resources/hbase-webapps/regionserver/region.jsp
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/resources/hbase-webapps/regionserver/region.jsp b/hbase-server/src/main/resources/hbase-webapps/regionserver/region.jsp
index c1e5aae..04aafa7 100644
--- a/hbase-server/src/main/resources/hbase-webapps/regionserver/region.jsp
+++ b/hbase-server/src/main/resources/hbase-webapps/regionserver/region.jsp
@@ -92,7 +92,7 @@
     </div>
 
 <% if(region != null) { //
-     List<Store> stores = region.getStores();
+     List<? extends Store> stores = region.getStores();
      for (Store store : stores) {
        String cf = store.getColumnFamilyName();
        Collection<StoreFile> storeFiles = store.getStorefiles(); %>

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index a99345b..76580f1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase;
 
+import com.google.protobuf.Service;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Collections;
@@ -36,11 +38,8 @@ import org.apache.hadoop.hbase.client.locking.EntityLock;
 import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
 import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
 import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
-import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
 import org.apache.hadoop.hbase.regionserver.FlushRequester;
 import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
 import org.apache.hadoop.hbase.regionserver.Leases;
@@ -51,14 +50,14 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
 import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
 
-import com.google.protobuf.Service;
-
 /**
  * Basic mock region server services.  Should only be instantiated by HBaseTestingUtility.b
  */
@@ -160,11 +159,6 @@ public class MockRegionServerServices implements RegionServerServices {
   }
 
   @Override
-  public CompactionRequestor getCompactionRequester() {
-    return null;
-  }
-
-  @Override
   public ClusterConnection getConnection() {
     return null;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index 94d1cf6..85f65e8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -124,7 +124,7 @@ public class TestIOFencing {
     }
 
     @Override
-    public boolean compact(CompactionContext compaction, Store store,
+    public boolean compact(CompactionContext compaction, HStore store,
         ThroughputController throughputController) throws IOException {
       try {
         return super.compact(compaction, store, throughputController);
@@ -134,7 +134,7 @@ public class TestIOFencing {
     }
 
     @Override
-    public boolean compact(CompactionContext compaction, Store store,
+    public boolean compact(CompactionContext compaction, HStore store,
         ThroughputController throughputController, User user) throws IOException {
       try {
         return super.compact(compaction, store, throughputController, user);

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java
index aff3d99..ac404bb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java
@@ -52,7 +52,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreScanner;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -256,7 +256,7 @@ public class TestAvoidCellReferencesIntoShippedBlocks {
     @Override
     public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
         Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
-        InternalScanner s, CompactionRequest request, long readPoint) throws IOException {
+        InternalScanner s, CompactionLifeCycleTracker request, long readPoint) throws IOException {
       return createCompactorScanner(store, scanners, scanType, earliestPutTs);
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java
index d5f3358..08ec09b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java
@@ -784,7 +784,7 @@ public class TestBlockEvictionFromClient {
   }
 
   private BlockCache setCacheProperties(Region region) {
-    Iterator<Store> strItr = region.getStores().iterator();
+    Iterator<? extends Store> strItr = region.getStores().iterator();
     BlockCache cache = null;
     while (strItr.hasNext()) {
       Store store = strItr.next();

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
index bef6f6b..023f8fc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFileReader;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -203,20 +204,20 @@ public class SimpleRegionObserver implements RegionObserver {
 
   @Override
   public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
-      List<StoreFile> candidates, CompactionRequest request) throws IOException {
+      List<StoreFile> candidates, CompactionLifeCycleTracker tracker) throws IOException {
     ctPreCompactSelect.incrementAndGet();
   }
 
   @Override
   public void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
-      ImmutableList<StoreFile> selected, CompactionRequest request) {
+      ImmutableList<StoreFile> selected, CompactionLifeCycleTracker tracker) {
     ctPostCompactSelect.incrementAndGet();
   }
 
-
   @Override
   public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
-      InternalScanner scanner, ScanType scanType, CompactionRequest request) throws IOException {
+      InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker)
+      throws IOException {
     ctPreCompact.incrementAndGet();
     return scanner;
   }
@@ -224,14 +225,14 @@ public class SimpleRegionObserver implements RegionObserver {
   @Override
   public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
       Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
-      InternalScanner s, CompactionRequest request, long readPoint) throws IOException {
+      InternalScanner s, CompactionLifeCycleTracker tracker, long readPoint) throws IOException {
     ctPreCompactScanner.incrementAndGet();
     return s;
   }
 
   @Override
   public void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
-      StoreFile resultFile, CompactionRequest request) throws IOException {
+      StoreFile resultFile, CompactionLifeCycleTracker tracker) throws IOException {
     ctPostCompact.incrementAndGet();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
index 97c45ab..5cf0bb3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -194,13 +195,13 @@ public class TestCoprocessorInterface {
     }
     @Override
     public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
-        Store store, InternalScanner scanner, ScanType scanType, CompactionRequest request) {
+        Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker) {
       preCompactCalled = true;
       return scanner;
     }
     @Override
     public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e,
-        Store store, StoreFile resultFile, CompactionRequest request) {
+        Store store, StoreFile resultFile, CompactionLifeCycleTracker tracker) {
       postCompactCalled = true;
     }
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
index 07fc179..0641b56 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
@@ -71,7 +71,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
@@ -417,7 +417,7 @@ public class TestRegionObserverInterface {
 
     @Override
     public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
-        final InternalScanner scanner, final ScanType scanType, CompactionRequest request) {
+        InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker) {
       return new InternalScanner() {
         @Override
         public boolean next(List<Cell> results) throws IOException {
@@ -456,7 +456,7 @@ public class TestRegionObserverInterface {
 
     @Override
     public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
-        StoreFile resultFile, CompactionRequest request) {
+        StoreFile resultFile, CompactionLifeCycleTracker tracker) {
       lastCompaction = EnvironmentEdgeManager.currentTime();
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
index 8ab0175..9c06c3e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.filter.FilterBase;
 import org.apache.hadoop.hbase.regionserver.ChunkCreator;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
@@ -61,6 +62,7 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreScanner;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.User;
@@ -155,7 +157,7 @@ public class TestRegionObserverScannerOpenHook {
     @Override
     public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
         Store store, List<? extends KeyValueScanner> scanners, ScanType scanType,
-        long earliestPutTs, InternalScanner s, CompactionRequest request, long readPoint)
+        long earliestPutTs, InternalScanner s, CompactionLifeCycleTracker tracker, long readPoint)
         throws IOException {
       scanners.forEach(KeyValueScanner::close);
       return NO_DATA;
@@ -252,7 +254,7 @@ public class TestRegionObserverScannerOpenHook {
     }
 
     @Override
-    public boolean compact(CompactionContext compaction, Store store,
+    public boolean compact(CompactionContext compaction, HStore store,
         ThroughputController throughputController) throws IOException {
       boolean ret = super.compact(compaction, store, throughputController);
       if (ret) compactionStateChangeLatch.countDown();
@@ -260,7 +262,7 @@ public class TestRegionObserverScannerOpenHook {
     }
 
     @Override
-    public boolean compact(CompactionContext compaction, Store store,
+    public boolean compact(CompactionContext compaction, HStore store,
         ThroughputController throughputController, User user) throws IOException {
       boolean ret = super.compact(compaction, store, throughputController, user);
       if (ret) compactionStateChangeLatch.countDown();

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index 3ffa61b..ba18299 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -47,6 +47,27 @@ import org.apache.hadoop.hbase.client.locking.EntityLock;
 import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
+import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
+import org.apache.hadoop.hbase.regionserver.FlushRequester;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
+import org.apache.hadoop.hbase.regionserver.Leases;
+import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
+import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
@@ -61,10 +82,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegion
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
@@ -102,27 +123,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRespon
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
-import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
-import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
-import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
-import org.apache.hadoop.hbase.regionserver.FlushRequester;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
-import org.apache.hadoop.hbase.regionserver.Leases;
-import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
-import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
-import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.KeeperException;
 
 /**
  * A mock RegionServer implementation.
@@ -315,12 +315,6 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
   }
 
   @Override
-  public CompactionRequestor getCompactionRequester() {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  @Override
   public FlushRequester getFlushRequester() {
     // TODO Auto-generated method stub
     return null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java
index 398d14d..12aed50 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java
@@ -86,7 +86,7 @@ import org.apache.hadoop.hbase.regionserver.HStoreFile;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.security.EncryptionUtil;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -716,14 +716,14 @@ public class TestMobCompactor {
   }
 
   /**
-   * This copro overwrites the default compaction policy. It always chooses two latest
-   * hfiles and compacts them into a new one.
+   * This copro overwrites the default compaction policy. It always chooses two latest hfiles and
+   * compacts them into a new one.
    */
   public static class CompactTwoLatestHfilesCopro implements RegionObserver {
+
     @Override
-    public void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
-      final Store store, final List<StoreFile> candidates, final CompactionRequest request)
-      throws IOException {
+    public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+        List<StoreFile> candidates, CompactionLifeCycleTracker tracker) throws IOException {
 
       int count = candidates.size();
       if (count >= 2) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/test/java/org/apache/hadoop/hbase/namespace/TestNamespaceAuditor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namespace/TestNamespaceAuditor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namespace/TestNamespaceAuditor.java
index 5179b84..8015115 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/namespace/TestNamespaceAuditor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namespace/TestNamespaceAuditor.java
@@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.quotas.QuotaUtil;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -461,7 +462,7 @@ public class TestNamespaceAuditor {
 
     @Override
     public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
-        StoreFile resultFile, CompactionRequest request) throws IOException {
+        StoreFile resultFile, CompactionLifeCycleTracker tracker) throws IOException {
       postCompact.countDown();
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFileSystemUtilizationChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFileSystemUtilizationChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFileSystemUtilizationChore.java
index 823b1f7..0d07e1a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFileSystemUtilizationChore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFileSystemUtilizationChore.java
@@ -36,9 +36,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -324,7 +324,7 @@ public class TestFileSystemUtilizationChore {
     final HRegionInfo info = mock(HRegionInfo.class);
     when(r.getRegionInfo()).thenReturn(info);
     List<Store> stores = new ArrayList<>();
-    when(r.getStores()).thenReturn(stores);
+    when(r.getStores()).thenReturn((List) stores);
     for (Long storeSize : storeSizes) {
       final Store s = mock(Store.class);
       stores.add(s);
@@ -338,7 +338,7 @@ public class TestFileSystemUtilizationChore {
     final HRegionInfo info = mock(HRegionInfo.class);
     when(r.getRegionInfo()).thenReturn(info);
     List<Store> stores = new ArrayList<>();
-    when(r.getStores()).thenReturn(stores);
+    when(r.getStores()).thenReturn((List) stores);
     assertEquals(
         "Logic error, storeSizes and linkSizes must be equal in size", storeSizes.size(),
         hfileSizes.size());

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java
index fcc3d4a..36c2e19 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.client.TestFromClientSideWithCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 
 /**
@@ -61,7 +62,7 @@ public class NoOpScanPolicyObserver implements RegionObserver {
   public InternalScanner preCompactScannerOpen(
       final ObserverContext<RegionCoprocessorEnvironment> c, Store store,
       List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
-      InternalScanner s, CompactionRequest request, long readPoint) throws IOException {
+      InternalScanner s, CompactionLifeCycleTracker tracker, long readPoint) throws IOException {
     // this demonstrates how to override the scanners default behavior
     ScanInfo oldSI = store.getScanInfo();
     ScanInfo scanInfo = new ScanInfo(oldSI.getConfiguration(), store.getColumnFamilyDescriptor(),

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/StatefulStoreMockMaker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/StatefulStoreMockMaker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/StatefulStoreMockMaker.java
index 0526462..a1fe87b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/StatefulStoreMockMaker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/StatefulStoreMockMaker.java
@@ -18,11 +18,16 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import static org.mockito.Matchers.*;
-import static org.mockito.Mockito.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Optional;
 
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.security.User;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -33,15 +38,23 @@ import org.mockito.stubbing.Answer;
  */
 public class StatefulStoreMockMaker {
   // Add and expand the methods and answers as needed.
-  public CompactionContext selectCompaction() { return null; }
-  public void cancelCompaction(Object originalContext) {}
-  public int getPriority() { return 0; }
+  public Optional<CompactionContext> selectCompaction() {
+    return Optional.empty();
+  }
 
-  private class SelectAnswer implements Answer<CompactionContext> {
-    public CompactionContext answer(InvocationOnMock invocation) throws Throwable {
+  public void cancelCompaction(Object originalContext) {
+  }
+
+  public int getPriority() {
+    return 0;
+  }
+
+  private class SelectAnswer implements Answer<Optional<CompactionContext>> {
+    public Optional<CompactionContext> answer(InvocationOnMock invocation) throws Throwable {
       return selectCompaction();
     }
   }
+
   private class PriorityAnswer implements Answer<Integer> {
     public Integer answer(InvocationOnMock invocation) throws Throwable {
       return getPriority();
@@ -53,15 +66,13 @@ public class StatefulStoreMockMaker {
     }
   }
 
-  public Store createStoreMock(String name) throws Exception {
-    Store store = mock(Store.class, name);
-    when(store.requestCompaction(
-        anyInt(), isNull(CompactionRequest.class))).then(new SelectAnswer());
-    when(store.requestCompaction(
-      anyInt(), isNull(CompactionRequest.class), any(User.class))).then(new SelectAnswer());
+  public HStore createStoreMock(String name) throws Exception {
+    HStore store = mock(HStore.class, name);
+    when(store.requestCompaction(anyInt(), any(CompactionLifeCycleTracker.class), any(User.class)))
+        .then(new SelectAnswer());
     when(store.getCompactPriority()).then(new PriorityAnswer());
-    doAnswer(new CancelAnswer()).when(
-        store).cancelRequestedCompaction(any(CompactionContext.class));
+    doAnswer(new CancelAnswer()).when(store)
+        .cancelRequestedCompaction(any(CompactionContext.class));
     return store;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
index be078f2..3649823 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
@@ -33,8 +33,8 @@ import static org.mockito.Mockito.when;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
 import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
@@ -65,7 +66,6 @@ import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.junit.After;
@@ -298,15 +298,16 @@ public class TestCompaction {
     Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
 
     // setup a region/store with some files
-    Store store = r.getStore(COLUMN_FAMILY);
+    HStore store = r.getStore(COLUMN_FAMILY);
     createStoreFile(r);
     for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
       createStoreFile(r);
     }
 
     CountDownLatch latch = new CountDownLatch(1);
-    TrackableCompactionRequest request = new TrackableCompactionRequest(latch);
-    thread.requestCompaction(r, store, "test custom comapction", Store.PRIORITY_USER, request,null);
+    Tracker tracker = new Tracker(latch);
+    thread.requestCompaction(r, store, "test custom comapction", Store.PRIORITY_USER, tracker,
+      null);
     // wait for the latch to complete.
     latch.await();
 
@@ -322,7 +323,7 @@ public class TestCompaction {
     Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
 
     // setup a region/store with some files
-    Store store = r.getStore(COLUMN_FAMILY);
+    HStore store = r.getStore(COLUMN_FAMILY);
     createStoreFile(r);
     for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) {
       createStoreFile(r);
@@ -337,9 +338,9 @@ public class TestCompaction {
     long preFailedCount = metricsWrapper.getNumCompactionsFailed();
 
     CountDownLatch latch = new CountDownLatch(1);
-    TrackableCompactionRequest request = new TrackableCompactionRequest(latch);
+    Tracker tracker = new Tracker(latch);
     thread.requestCompaction(mockRegion, store, "test custom comapction", Store.PRIORITY_USER,
-        request, null);
+      tracker, null);
     // wait for the latch to complete.
     latch.await(120, TimeUnit.SECONDS);
 
@@ -370,20 +371,17 @@ public class TestCompaction {
 
     // setup a region/store with some files
     int numStores = r.getStores().size();
-    List<Pair<CompactionRequest, Store>> requests = new ArrayList<>(numStores);
     CountDownLatch latch = new CountDownLatch(numStores);
+    Tracker tracker = new Tracker(latch);
     // create some store files and setup requests for each store on which we want to do a
     // compaction
-    for (Store store : r.getStores()) {
+    for (HStore store : r.getStores()) {
       createStoreFile(r, store.getColumnFamilyName());
       createStoreFile(r, store.getColumnFamilyName());
       createStoreFile(r, store.getColumnFamilyName());
-      requests.add(new Pair<>(new TrackableCompactionRequest(latch), store));
+      thread.requestCompaction(r, store, "test mulitple custom comapctions", Store.PRIORITY_USER,
+        tracker, null);
     }
-
-    thread.requestCompaction(r, "test mulitple custom comapctions", Store.PRIORITY_USER,
-      Collections.unmodifiableList(requests), null);
-
     // wait for the latch to complete.
     latch.await();
 
@@ -428,7 +426,7 @@ public class TestCompaction {
     }
 
     @Override
-    public synchronized CompactionContext selectCompaction() {
+    public synchronized Optional<CompactionContext> selectCompaction() {
       CompactionContext ctx = new TestCompactionContext(new ArrayList<>(notCompacting));
       compacting.addAll(notCompacting);
       notCompacting.clear();
@@ -437,7 +435,7 @@ public class TestCompaction {
       } catch (IOException ex) {
         fail("Shouldn't happen");
       }
-      return ctx;
+      return Optional.of(ctx);
     }
 
     @Override
@@ -499,14 +497,14 @@ public class TestCompaction {
     }
 
     @Override
-    public CompactionContext selectCompaction() {
+    public Optional<CompactionContext> selectCompaction() {
       this.blocked = new BlockingCompactionContext();
       try {
         this.blocked.select(null, false, false, false);
       } catch (IOException ex) {
         fail("Shouldn't happen");
       }
-      return this.blocked;
+      return Optional.of(blocked);
     }
 
     @Override
@@ -527,13 +525,13 @@ public class TestCompaction {
     }
 
     @Override
-    public Store createStoreMock(String name) throws Exception {
+    public HStore createStoreMock(String name) throws Exception {
       return createStoreMock(Integer.MIN_VALUE, name);
     }
 
-    public Store createStoreMock(int priority, String name) throws Exception {
+    public HStore createStoreMock(int priority, String name) throws Exception {
       // Override the mock to always return the specified priority.
-      Store s = super.createStoreMock(name);
+      HStore s = super.createStoreMock(name);
       when(s.getCompactPriority()).thenReturn(priority);
       return s;
     }
@@ -555,7 +553,7 @@ public class TestCompaction {
     // Set up the region mock that redirects compactions.
     HRegion r = mock(HRegion.class);
     when(
-      r.compact(any(CompactionContext.class), any(Store.class),
+      r.compact(any(CompactionContext.class), any(HStore.class),
         any(ThroughputController.class), any(User.class))).then(new Answer<Boolean>() {
       @Override
       public Boolean answer(InvocationOnMock invocation) throws Throwable {
@@ -568,7 +566,7 @@ public class TestCompaction {
     // Set up store mocks for 2 "real" stores and the one we use for blocking CST.
     ArrayList<Integer> results = new ArrayList<>();
     StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results);
-    Store store = sm.createStoreMock("store1"), store2 = sm2.createStoreMock("store2");
+    HStore store = sm.createStoreMock("store1"), store2 = sm2.createStoreMock("store2");
     BlockingStoreMockMaker blocker = new BlockingStoreMockMaker();
 
     // First, block the compaction thread so that we could muck with queue.
@@ -691,24 +689,20 @@ public class TestCompaction {
   }
 
   /**
-   * Simple {@link CompactionRequest} on which you can wait until the requested compaction finishes.
+   * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction
+   * finishes.
    */
-  public static class TrackableCompactionRequest extends CompactionRequest {
-    private CountDownLatch done;
-
-    /**
-     * Constructor for a custom compaction. Uses the setXXX methods to update the state of the
-     * compaction before being used.
-     */
-    public TrackableCompactionRequest(CountDownLatch finished) {
-      super();
-      this.done = finished;
+  public static class Tracker implements CompactionLifeCycleTracker {
+
+    private final CountDownLatch done;
+
+    public Tracker(CountDownLatch done) {
+      this.done = done;
     }
 
     @Override
-    public void afterExecute() {
-      super.afterExecute();
-      this.done.countDown();
+    public void afterExecute(Store store) {
+      done.countDown();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
index 2cea121..603203a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
@@ -28,6 +28,7 @@ import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableSet;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentSkipListSet;
 
 import javax.crypto.spec.SecretKeySpec;
@@ -60,6 +61,7 @@ import org.apache.hadoop.hbase.mob.MobConstants;
 import org.apache.hadoop.hbase.mob.MobUtils;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
 import org.apache.hadoop.hbase.security.EncryptionUtil;
 import org.apache.hadoop.hbase.security.User;
@@ -536,8 +538,9 @@ public class TestHMobStore {
 
     // Trigger major compaction
     this.store.triggerMajorCompaction();
-    CompactionContext requestCompaction = this.store.requestCompaction(1, null);
-    this.store.compact(requestCompaction, NoLimitThroughputController.INSTANCE, null);
+    Optional<CompactionContext> requestCompaction =
+        this.store.requestCompaction(Store.PRIORITY_USER, CompactionLifeCycleTracker.DUMMY, null);
+    this.store.compact(requestCompaction.get(), NoLimitThroughputController.INSTANCE, null);
     Assert.assertEquals(1, this.store.getStorefiles().size());
 
     //Check encryption after compaction

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
index ea14962..97f8ce3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener;
 import org.apache.hadoop.hbase.wal.WALEdit;
@@ -254,7 +255,8 @@ public class TestHRegionServerBulkLoad {
     static int sleepDuration;
     @Override
     public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
-        InternalScanner scanner, ScanType scanType, CompactionRequest request) throws IOException {
+        InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker)
+        throws IOException {
       try {
         Thread.sleep(sleepDuration);
       } catch (InterruptedException ie) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
index f45c76c..707540a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
@@ -417,7 +418,7 @@ public class TestMajorCompaction {
     }
     store.triggerMajorCompaction();
 
-    CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY, null).getRequest();
+    CompactionRequest request = store.requestCompaction().get().getRequest();
     assertNotNull("Expected to receive a compaction request", request);
     assertEquals(
       "System-requested major compaction should not occur if there are too many store files",
@@ -436,7 +437,9 @@ public class TestMajorCompaction {
       createStoreFile(r);
     }
     store.triggerMajorCompaction();
-    CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER, null).getRequest();
+    CompactionRequest request =
+        store.requestCompaction(Store.PRIORITY_USER, CompactionLifeCycleTracker.DUMMY, null).get()
+            .getRequest();
     assertNotNull("Expected to receive a compaction request", request);
     assertEquals(
       "User-requested major compaction should always occur, even if there are too many store files",

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
index 69bc9a7..3ba2299 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
@@ -31,6 +31,7 @@ import java.io.InterruptedIOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -70,7 +71,11 @@ import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.MasterObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
-import org.apache.hadoop.hbase.master.*;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.LoadBalancer;
+import org.apache.hadoop.hbase.master.MasterRpcServices;
+import org.apache.hadoop.hbase.master.NoSuchProcedureException;
+import org.apache.hadoop.hbase.master.RegionState;
 import org.apache.hadoop.hbase.master.RegionState.State;
 import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
 import org.apache.hadoop.hbase.master.assignment.RegionStates;
@@ -234,7 +239,7 @@ public class TestSplitTransactionOnCluster {
     assertEquals(1, cluster.getRegions(tableName).size());
 
     HRegion region = cluster.getRegions(tableName).get(0);
-    Store store = region.getStore(cf);
+    HStore store = region.getStore(cf);
     int regionServerIndex = cluster.getServerWith(region.getRegionInfo().getRegionName());
     HRegionServer regionServer = cluster.getRegionServer(regionServerIndex);
 
@@ -246,8 +251,8 @@ public class TestSplitTransactionOnCluster {
     int fileNum = store.getStorefiles().size();
     // 0, Compaction Request
     store.triggerMajorCompaction();
-    CompactionContext cc = store.requestCompaction();
-    assertNotNull(cc);
+    Optional<CompactionContext> cc = store.requestCompaction();
+    assertTrue(cc.isPresent());
     // 1, A timeout split
     // 1.1 close region
     assertEquals(2, region.close(false).get(cf).size());
@@ -255,7 +260,7 @@ public class TestSplitTransactionOnCluster {
     region.initialize();
 
     // 2, Run Compaction cc
-    assertFalse(region.compact(cc, store, NoLimitThroughputController.INSTANCE));
+    assertFalse(region.compact(cc.get(), store, NoLimitThroughputController.INSTANCE));
     assertTrue(fileNum > store.getStorefiles().size());
 
     // 3, Split

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java
index c959a22..78c5330 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java
@@ -117,7 +117,7 @@ public class TestSplitWalDataLoss {
       }
     }).when(spiedRegion).internalFlushCacheAndCommit(Matchers.<WAL> any(),
       Matchers.<MonitoredTask> any(), Matchers.<PrepareFlushResult> any(),
-      Matchers.<Collection<Store>> any());
+      Matchers.<Collection<HStore>> any());
     // Find region key; don't pick up key for hbase:meta by mistake.
     String key = null;
     for (Map.Entry<String, Region> entry: rs.onlineRegions.entrySet()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
index 3674303..2095dcd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionserver;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.spy;
@@ -47,7 +46,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -74,6 +72,8 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -86,6 +86,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
 import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
 import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -105,12 +106,6 @@ import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 import org.mockito.Mockito;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.FilterBase;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
 /**
  * Test class for the Store
  */
@@ -371,7 +366,7 @@ public class TestStore {
      // There will be no compaction due to threshold above. Last file will not be replaced.
     for (int i = 1; i <= storeFileNum - 1; i++) {
       // verify the expired store file.
-      assertNull(this.store.requestCompaction());
+      assertFalse(this.store.requestCompaction().isPresent());
       Collection<StoreFile> sfs = this.store.getStorefiles();
       // Ensure i files are gone.
       if (minVersions == 0) {
@@ -386,7 +381,7 @@ public class TestStore {
       // Let the next store file expired.
       edge.incrementTime(sleepTime);
     }
-    assertNull(this.store.requestCompaction());
+    assertFalse(this.store.requestCompaction().isPresent());
 
     Collection<StoreFile> sfs = this.store.getStorefiles();
     // Assert the last expired file is not removed.
@@ -422,7 +417,7 @@ public class TestStore {
     Assert.assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS);
 
     // after compact; check the lowest time stamp
-    store.compact(store.requestCompaction(), NoLimitThroughputController.INSTANCE, null);
+    store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null);
     lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
     lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
     Assert.assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
index 54073bc..d25829d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
@@ -823,12 +823,12 @@ public abstract class AbstractTestWALReplay {
           final HRegion region =
               new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) {
             @Override
-            protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
-                final Collection<Store> storesToFlush, MonitoredTask status,
+            protected FlushResultImpl internalFlushcache(final WAL wal, final long myseqid,
+                final Collection<HStore> storesToFlush, MonitoredTask status,
                 boolean writeFlushWalMarker)
                     throws IOException {
               LOG.info("InternalFlushCache Invoked");
-              FlushResult fs = super.internalFlushcache(wal, myseqid, storesToFlush,
+              FlushResultImpl fs = super.internalFlushcache(wal, myseqid, storesToFlush,
                   Mockito.mock(MonitoredTask.class), writeFlushWalMarker);
               flushcount.incrementAndGet();
               return fs;

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java
index 656a0c7..d4d22b1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.regionserver.ScanInfo;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreScanner;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -263,7 +264,7 @@ public class TestCoprocessorScanPolicy {
     public InternalScanner preCompactScannerOpen(
         final ObserverContext<RegionCoprocessorEnvironment> c, Store store,
         List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
-        InternalScanner s, CompactionRequest request, long readPoint) throws IOException {
+        InternalScanner s,CompactionLifeCycleTracker tracker, long readPoint) throws IOException {
       Long newTtl = ttls.get(store.getTableName());
       Integer newVersions = versions.get(store.getTableName());
       ScanInfo oldSI = store.getScanInfo();


[3/3] hbase git commit: HBASE-18453 CompactionRequest should not be exposed to user directly

Posted by zh...@apache.org.
HBASE-18453 CompactionRequest should not be exposed to user directly


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/61d10fef
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/61d10fef
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/61d10fef

Branch: refs/heads/master
Commit: 61d10feffaa7b96ee46e2a6f1e542d80c1d76f42
Parents: 38e983e
Author: zhangduo <zh...@apache.org>
Authored: Mon Sep 11 08:50:37 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Thu Sep 14 20:37:33 2017 +0800

----------------------------------------------------------------------
 .../example/ZooKeeperScanPolicyObserver.java    |   3 +-
 .../example/TestRefreshHFilesEndpoint.java      |  11 +-
 .../hbase/regionserver/CompactionTool.java      |  11 +-
 .../hbase/coprocessor/RegionObserver.java       |  36 +-
 .../hadoop/hbase/regionserver/CompactSplit.java | 233 ++++----
 .../hbase/regionserver/CompactionRequestor.java | 100 ----
 .../regionserver/FlushAllLargeStoresPolicy.java |  18 +-
 .../regionserver/FlushAllStoresPolicy.java      |   2 +-
 .../regionserver/FlushLargeStoresPolicy.java    |   2 +-
 .../FlushNonSloppyStoresFirstPolicy.java        |  29 +-
 .../hadoop/hbase/regionserver/FlushPolicy.java  |   2 +-
 .../hadoop/hbase/regionserver/HRegion.java      | 590 +++++++++----------
 .../hbase/regionserver/HRegionServer.java       |  53 +-
 .../hadoop/hbase/regionserver/HStore.java       |  60 +-
 .../hbase/regionserver/MemStoreFlusher.java     |  12 +-
 .../MetricsRegionServerWrapperImpl.java         |   2 +-
 .../regionserver/MetricsRegionWrapperImpl.java  |   2 +-
 .../hbase/regionserver/RSRpcServices.java       |  20 +-
 .../hadoop/hbase/regionserver/Region.java       |  34 +-
 .../regionserver/RegionCoprocessorHost.java     |  60 +-
 .../regionserver/RegionServerServices.java      |   5 -
 .../hbase/regionserver/RegionSplitPolicy.java   |   2 +-
 .../apache/hadoop/hbase/regionserver/Store.java |  20 +-
 .../compactions/CompactionLifeCycleTracker.java |  52 ++
 .../compactions/CompactionRequest.java          |  71 +--
 .../regionserver/compactions/Compactor.java     |  15 +-
 .../hbase/security/access/AccessController.java |   4 +-
 .../hbase-webapps/regionserver/region.jsp       |   2 +-
 .../hadoop/hbase/MockRegionServerServices.java  |  14 +-
 .../org/apache/hadoop/hbase/TestIOFencing.java  |   4 +-
 ...estAvoidCellReferencesIntoShippedBlocks.java |   4 +-
 .../client/TestBlockEvictionFromClient.java     |   2 +-
 .../hbase/coprocessor/SimpleRegionObserver.java |  13 +-
 .../coprocessor/TestCoprocessorInterface.java   |   5 +-
 .../TestRegionObserverInterface.java            |   6 +-
 .../TestRegionObserverScannerOpenHook.java      |   8 +-
 .../hadoop/hbase/master/MockRegionServer.java   |  52 +-
 .../hbase/mob/compactions/TestMobCompactor.java |  12 +-
 .../hbase/namespace/TestNamespaceAuditor.java   |   3 +-
 .../quotas/TestFileSystemUtilizationChore.java  |   6 +-
 .../regionserver/NoOpScanPolicyObserver.java    |   3 +-
 .../regionserver/StatefulStoreMockMaker.java    |  43 +-
 .../hbase/regionserver/TestCompaction.java      |  70 +--
 .../hbase/regionserver/TestHMobStore.java       |   7 +-
 .../regionserver/TestHRegionServerBulkLoad.java |   4 +-
 .../hbase/regionserver/TestMajorCompaction.java |   7 +-
 .../TestSplitTransactionOnCluster.java          |  15 +-
 .../regionserver/TestSplitWalDataLoss.java      |   2 +-
 .../hadoop/hbase/regionserver/TestStore.java    |  17 +-
 .../regionserver/wal/AbstractTestWALReplay.java |   6 +-
 .../hbase/util/TestCoprocessorScanPolicy.java   |   3 +-
 51 files changed, 800 insertions(+), 957 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
index 344d188..6b31664 100644
--- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.regionserver.ScanInfo;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreScanner;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -203,7 +204,7 @@ public class ZooKeeperScanPolicyObserver implements RegionObserver {
   @Override
   public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
       Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
-      InternalScanner s, CompactionRequest request, long readPoint) throws IOException {
+      InternalScanner s, CompactionLifeCycleTracker tracker, long readPoint) throws IOException {
     ScanInfo scanInfo = getScanInfo(store, c.getEnvironment());
     if (scanInfo == null) {
       // take default action

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRefreshHFilesEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRefreshHFilesEndpoint.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRefreshHFilesEndpoint.java
index a037f85..257b075 100644
--- a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRefreshHFilesEndpoint.java
+++ b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRefreshHFilesEndpoint.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
@@ -144,15 +145,17 @@ public class TestRefreshHFilesEndpoint {
     }
 
     @Override
-    public List<Store> getStores() {
-      List<Store> list = new ArrayList<Store>(stores.size());
+    public List<HStore> getStores() {
+      List<HStore> list = new ArrayList<>(stores.size());
       /**
        * This is used to trigger the custom definition (faulty)
        * of refresh HFiles API.
        */
       try {
-        if (this.store == null)
-          store = new HStoreWithFaultyRefreshHFilesAPI(this, new HColumnDescriptor(FAMILY), this.conf);
+        if (this.store == null) {
+          store = new HStoreWithFaultyRefreshHFilesAPI(this,
+              ColumnFamilyDescriptorBuilder.of(FAMILY), this.conf);
+        }
         list.add(store);
       } catch (IOException ioe) {
         LOG.info("Couldn't instantiate custom store implementation", ioe);

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
index de59c20..bb01459 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
@@ -42,6 +43,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.mapreduce.JobUtil;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -158,10 +160,13 @@ public class CompactionTool extends Configured implements Tool {
         store.triggerMajorCompaction();
       }
       do {
-        CompactionContext compaction = store.requestCompaction(Store.PRIORITY_USER, null);
-        if (compaction == null) break;
+        Optional<CompactionContext> compaction =
+            store.requestCompaction(Store.PRIORITY_USER, CompactionLifeCycleTracker.DUMMY, null);
+        if (!compaction.isPresent()) {
+          break;
+        }
         List<StoreFile> storeFiles =
-            store.compact(compaction, NoLimitThroughputController.INSTANCE);
+            store.compact(compaction.get(), NoLimitThroughputController.INSTANCE);
         if (storeFiles != null && !storeFiles.isEmpty()) {
           if (keepCompactedFiles && deleteCompacted) {
             for (StoreFile storeFile: storeFiles) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index b036608..ae57747 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -57,7 +57,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFileReader;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hbase.util.Pair;
@@ -186,10 +186,10 @@ public interface RegionObserver extends Coprocessor {
    * @param c the environment provided by the region server
    * @param store the store where compaction is being requested
    * @param candidates the store files currently available for compaction
-   * @param request custom compaction request
+   * @param tracker tracker used to track the life cycle of a compaction
    */
   default void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
-      List<StoreFile> candidates, CompactionRequest request) throws IOException {}
+      List<StoreFile> candidates, CompactionLifeCycleTracker tracker) throws IOException {}
 
   /**
    * Called after the {@link StoreFile}s to compact have been selected from the available
@@ -197,10 +197,10 @@ public interface RegionObserver extends Coprocessor {
    * @param c the environment provided by the region server
    * @param store the store being compacted
    * @param selected the store files selected to compact
-   * @param request custom compaction request
+   * @param tracker tracker used to track the life cycle of a compaction
    */
   default void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
-      ImmutableList<StoreFile> selected, CompactionRequest request) {}
+      ImmutableList<StoreFile> selected, CompactionLifeCycleTracker tracker) {}
 
   /**
    * Called prior to writing the {@link StoreFile}s selected for compaction into a new
@@ -220,13 +220,13 @@ public interface RegionObserver extends Coprocessor {
    * @param store the store being compacted
    * @param scanner the scanner over existing data used in the store file rewriting
    * @param scanType type of Scan
-   * @param request the requested compaction
+   * @param tracker tracker used to track the life cycle of a compaction
    * @return the scanner to use during compaction. Should not be {@code null} unless the
    *         implementation is writing new store files on its own.
    */
-  default InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c,
-      Store store, InternalScanner scanner, ScanType scanType,
-      CompactionRequest request) throws IOException {
+  default InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+      InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker)
+      throws IOException {
     return scanner;
   }
 
@@ -245,14 +245,14 @@ public interface RegionObserver extends Coprocessor {
    * @param earliestPutTs timestamp of the earliest put that was found in any of the involved store
    *          files
    * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain
-   * @param request compaction request
+   * @param tracker used to track the life cycle of a compaction
    * @param readPoint the readpoint to create scanner
    * @return the scanner to use during compaction. {@code null} if the default implementation is to
    *          be used.
    */
   default InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
       Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
-      InternalScanner s, CompactionRequest request, long readPoint) throws IOException {
+      InternalScanner s, CompactionLifeCycleTracker tracker, long readPoint) throws IOException {
     return s;
   }
 
@@ -261,10 +261,10 @@ public interface RegionObserver extends Coprocessor {
    * @param c the environment provided by the region server
    * @param store the store being compacted
    * @param resultFile the new store file written out during compaction
-   * @param request the requested compaction
+   * @param tracker used to track the life cycle of a compaction
    */
   default void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
-      StoreFile resultFile, CompactionRequest request) throws IOException {}
+      StoreFile resultFile, CompactionLifeCycleTracker tracker) throws IOException {}
 
   /**
    * Called before the region is reported as closed to the master.
@@ -798,12 +798,12 @@ public interface RegionObserver extends Coprocessor {
    * Called before a store opens a new scanner.
    * This hook is called when a "user" scanner is opened.
    * <p>
-   * See {@link #preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)} and {@link #preCompactScannerOpen(ObserverContext,
-   *  Store, List, ScanType, long, InternalScanner, CompactionRequest, long)}
-   * to override scanners created for flushes or compactions, resp.
+   * See {@link #preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)}
+   * and {@link #preCompactScannerOpen(ObserverContext, Store, List, ScanType, long,
+   * InternalScanner, CompactionLifeCycleTracker, long)} to override scanners created for flushes
+   * or compactions, resp.
    * <p>
-   * Call CoprocessorEnvironment#complete to skip any subsequent chained
-   * coprocessors.
+   * Call CoprocessorEnvironment#complete to skip any subsequent chained coprocessors.
    * Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
    * effect in this hook.
    * <p>

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
index 621bead..cdeeff7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
@@ -21,10 +21,9 @@ package org.apache.hadoop.hbase.regionserver;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
-import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.Iterator;
-import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
@@ -41,24 +40,23 @@ import org.apache.hadoop.hbase.conf.ConfigurationManager;
 import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
 import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.StealJobQueue;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.util.StringUtils;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
-
 /**
  * Compact region on request and then run split if appropriate
  */
 @InterfaceAudience.Private
-public class CompactSplit implements CompactionRequestor, PropagatingConfigurationObserver {
+public class CompactSplit implements PropagatingConfigurationObserver {
   private static final Log LOG = LogFactory.getLog(CompactSplit.class);
 
   // Configuration key for the large compaction threads.
@@ -233,126 +231,89 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
     }
   }
 
-  @Override
-  public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why)
-      throws IOException {
-    return requestCompaction(r, why, null);
+  public synchronized void requestCompaction(HRegion region, String why, int priority,
+      CompactionLifeCycleTracker tracker, User user) throws IOException {
+    requestCompactionInternal(region, why, priority, true, tracker, user);
   }
 
-  @Override
-  public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
-      List<Pair<CompactionRequest, Store>> requests) throws IOException {
-    return requestCompaction(r, why, Store.NO_PRIORITY, requests, null);
-  }
-
-  @Override
-  public synchronized CompactionRequest requestCompaction(final Region r, final Store s,
-      final String why, CompactionRequest request) throws IOException {
-    return requestCompaction(r, s, why, Store.NO_PRIORITY, request, null);
+  public synchronized void requestCompaction(HRegion region, HStore store, String why, int priority,
+      CompactionLifeCycleTracker tracker, User user) throws IOException {
+    requestCompactionInternal(region, store, why, priority, true, tracker, user);
   }
 
-  @Override
-  public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
-      int p, List<Pair<CompactionRequest, Store>> requests, User user) throws IOException {
-    return requestCompactionInternal(r, why, p, requests, true, user);
-  }
-
-  private List<CompactionRequest> requestCompactionInternal(final Region r, final String why,
-      int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow, User user)
-          throws IOException {
-    // not a special compaction request, so make our own list
-    List<CompactionRequest> ret = null;
-    if (requests == null) {
-      ret = selectNow ? new ArrayList<CompactionRequest>(r.getStores().size()) : null;
-      for (Store s : r.getStores()) {
-        CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow, user);
-        if (selectNow) ret.add(cr);
-      }
-    } else {
-      Preconditions.checkArgument(selectNow); // only system requests have selectNow == false
-      ret = new ArrayList<CompactionRequest>(requests.size());
-      for (Pair<CompactionRequest, Store> pair : requests) {
-        ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst(), user));
-      }
+  private void requestCompactionInternal(HRegion region, String why, int priority,
+      boolean selectNow, CompactionLifeCycleTracker tracker, User user) throws IOException {
+    // request compaction on all stores
+    for (HStore store : region.stores.values()) {
+      requestCompactionInternal(region, store, why, priority, selectNow, tracker, user);
     }
-    return ret;
-  }
-
-  public CompactionRequest requestCompaction(final Region r, final Store s,
-      final String why, int priority, CompactionRequest request, User user) throws IOException {
-    return requestCompactionInternal(r, s, why, priority, request, true, user);
-  }
-
-  public synchronized void requestSystemCompaction(
-      final Region r, final String why) throws IOException {
-    requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false, null);
-  }
-
-  public void requestSystemCompaction(
-      final Region r, final Store s, final String why) throws IOException {
-    requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false, null);
   }
 
-  /**
-   * @param r region store belongs to
-   * @param s Store to request compaction on
-   * @param why Why compaction requested -- used in debug messages
-   * @param priority override the default priority (NO_PRIORITY == decide)
-   * @param request custom compaction request. Can be <tt>null</tt> in which case a simple
-   *          compaction will be used.
-   */
-  private synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s,
-      final String why, int priority, CompactionRequest request, boolean selectNow, User user)
-          throws IOException {
-    if (this.server.isStopped()
-        || (r.getTableDescriptor() != null && !r.getTableDescriptor().isCompactionEnabled())) {
-      return null;
+  private void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
+      boolean selectNow, CompactionLifeCycleTracker tracker, User user) throws IOException {
+    if (this.server.isStopped() || (region.getTableDescriptor() != null &&
+        !region.getTableDescriptor().isCompactionEnabled())) {
+      return;
     }
-
-    CompactionContext compaction = null;
+    Optional<CompactionContext> compaction;
     if (selectNow) {
-      compaction = selectCompaction(r, s, priority, request, user);
-      if (compaction == null) return null; // message logged inside
+      compaction = selectCompaction(region, store, priority, tracker, user);
+      if (!compaction.isPresent()) {
+        // message logged inside
+        return;
+      }
+    } else {
+      compaction = Optional.empty();
     }
 
-    final RegionServerSpaceQuotaManager spaceQuotaManager =
-      this.server.getRegionServerSpaceQuotaManager();
-    if (spaceQuotaManager != null && spaceQuotaManager.areCompactionsDisabled(
-        r.getTableDescriptor().getTableName())) {
+    RegionServerSpaceQuotaManager spaceQuotaManager =
+        this.server.getRegionServerSpaceQuotaManager();
+    if (spaceQuotaManager != null &&
+        spaceQuotaManager.areCompactionsDisabled(region.getTableDescriptor().getTableName())) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Ignoring compaction request for " + r + " as an active space quota violation "
-            + " policy disallows compactions.");
+        LOG.debug("Ignoring compaction request for " + region +
+            " as an active space quota violation " + " policy disallows compactions.");
       }
-      return null;
+      return;
     }
 
-    // We assume that most compactions are small. So, put system compactions into small
-    // pool; we will do selection there, and move to large pool if necessary.
-    ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))
-      ? longCompactions : shortCompactions;
-    pool.execute(new CompactionRunner(s, r, compaction, pool, user));
-    ((HRegion)r).incrementCompactionsQueuedCount();
+    ThreadPoolExecutor pool;
+    if (selectNow) {
+      // compaction.get is safe as we will just return if selectNow is true but no compaction is
+      // selected
+      pool = store.throttleCompaction(compaction.get().getRequest().getSize()) ? longCompactions
+          : shortCompactions;
+    } else {
+      // We assume that most compactions are small. So, put system compactions into small
+      // pool; we will do selection there, and move to large pool if necessary.
+      pool = shortCompactions;
+    }
+    pool.execute(new CompactionRunner(store, region, compaction, pool, user));
+    region.incrementCompactionsQueuedCount();
     if (LOG.isDebugEnabled()) {
       String type = (pool == shortCompactions) ? "Small " : "Large ";
       LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
           + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
     }
-    return selectNow ? compaction.getRequest() : null;
   }
 
-  private CompactionContext selectCompaction(final Region r, final Store s,
-      int priority, CompactionRequest request, User user) throws IOException {
-    CompactionContext compaction = s.requestCompaction(priority, request, user);
-    if (compaction == null) {
-      if(LOG.isDebugEnabled() && r.getRegionInfo() != null) {
-        LOG.debug("Not compacting " + r.getRegionInfo().getRegionNameAsString() +
-            " because compaction request was cancelled");
-      }
-      return null;
-    }
-    assert compaction.hasSelection();
-    if (priority != Store.NO_PRIORITY) {
-      compaction.getRequest().setPriority(priority);
+  public synchronized void requestSystemCompaction(HRegion region, String why) throws IOException {
+    requestCompactionInternal(region, why, Store.NO_PRIORITY, false,
+      CompactionLifeCycleTracker.DUMMY, null);
+  }
+
+  public synchronized void requestSystemCompaction(HRegion region, HStore store, String why)
+      throws IOException {
+    requestCompactionInternal(region, store, why, Store.NO_PRIORITY, false,
+      CompactionLifeCycleTracker.DUMMY, null);
+  }
+
+  private Optional<CompactionContext> selectCompaction(HRegion region, HStore store, int priority,
+      CompactionLifeCycleTracker tracker, User user) throws IOException {
+    Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker, user);
+    if (!compaction.isPresent() && LOG.isDebugEnabled() && region.getRegionInfo() != null) {
+      LOG.debug("Not compacting " + region.getRegionInfo().getRegionNameAsString() +
+          " because compaction request was cancelled");
     }
     return compaction;
   }
@@ -468,33 +429,33 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
       if (cmp != 0) {
         return cmp;
       }
-      CompactionContext c1 = o1.compaction;
-      CompactionContext c2 = o2.compaction;
-      if (c1 == null) {
-        return c2 == null ? 0 : 1;
+      Optional<CompactionContext> c1 = o1.compaction;
+      Optional<CompactionContext> c2 = o2.compaction;
+      if (c1.isPresent()) {
+        return c2.isPresent() ? compare(c1.get().getRequest(), c2.get().getRequest()) : -1;
       } else {
-        return c2 == null ? -1 : compare(c1.getRequest(), c2.getRequest());
+        return c2.isPresent() ? 1 : 0;
       }
     }
   };
 
   private final class CompactionRunner implements Runnable {
-    private final Store store;
+    private final HStore store;
     private final HRegion region;
-    private CompactionContext compaction;
+    private final Optional<CompactionContext> compaction;
     private int queuedPriority;
     private ThreadPoolExecutor parent;
     private User user;
     private long time;
 
-    public CompactionRunner(Store store, Region region, CompactionContext compaction,
+    public CompactionRunner(HStore store, HRegion region, Optional<CompactionContext> compaction,
         ThreadPoolExecutor parent, User user) {
       super();
       this.store = store;
-      this.region = (HRegion) region;
+      this.region = region;
       this.compaction = compaction;
-      this.queuedPriority =
-          compaction == null ? store.getCompactPriority() : compaction.getRequest().getPriority();
+      this.queuedPriority = compaction.isPresent() ? compaction.get().getRequest().getPriority()
+          : store.getCompactPriority();
       this.parent = parent;
       this.user = user;
       this.time = System.currentTimeMillis();
@@ -502,14 +463,15 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
 
     @Override
     public String toString() {
-      return (this.compaction != null) ? ("Request = " + compaction.getRequest())
-          : ("regionName = " + region.toString() + ", storeName = " + store.toString() +
-             ", priority = " + queuedPriority + ", time = " + time);
+      return compaction.map(c -> "Request = " + c.getRequest())
+          .orElse("regionName = " + region.toString() + ", storeName = " + store.toString() +
+              ", priority = " + queuedPriority + ", time = " + time);
     }
 
     private void doCompaction(User user) {
+      CompactionContext c;
       // Common case - system compaction without a file selection. Select now.
-      if (this.compaction == null) {
+      if (!compaction.isPresent()) {
         int oldPriority = this.queuedPriority;
         this.queuedPriority = this.store.getCompactPriority();
         if (this.queuedPriority > oldPriority) {
@@ -518,44 +480,49 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
           this.parent.execute(this);
           return;
         }
+        Optional<CompactionContext> selected;
         try {
-          this.compaction = selectCompaction(this.region, this.store, queuedPriority, null, user);
+          selected = selectCompaction(this.region, this.store, queuedPriority,
+            CompactionLifeCycleTracker.DUMMY, user);
         } catch (IOException ex) {
           LOG.error("Compaction selection failed " + this, ex);
           server.checkFileSystem();
           region.decrementCompactionsQueuedCount();
           return;
         }
-        if (this.compaction == null) {
+        if (!selected.isPresent()) {
           region.decrementCompactionsQueuedCount();
           return; // nothing to do
         }
+        c = selected.get();
+        assert c.hasSelection();
         // Now see if we are in correct pool for the size; if not, go to the correct one.
         // We might end up waiting for a while, so cancel the selection.
-        assert this.compaction.hasSelection();
-        ThreadPoolExecutor pool = store.throttleCompaction(
-            compaction.getRequest().getSize()) ? longCompactions : shortCompactions;
+
+        ThreadPoolExecutor pool =
+            store.throttleCompaction(c.getRequest().getSize()) ? longCompactions : shortCompactions;
 
         // Long compaction pool can process small job
         // Short compaction pool should not process large job
         if (this.parent == shortCompactions && pool == longCompactions) {
-          this.store.cancelRequestedCompaction(this.compaction);
-          this.compaction = null;
+          this.store.cancelRequestedCompaction(c);
           this.parent = pool;
           this.parent.execute(this);
           return;
         }
+      } else {
+        c = compaction.get();
       }
       // Finally we can compact something.
-      assert this.compaction != null;
+      assert c != null;
 
-      this.compaction.getRequest().beforeExecute();
+      c.getRequest().getTracker().beforeExecute(store);
       try {
         // Note: please don't put single-compaction logic here;
         //       put it into region/store/etc. This is CST logic.
         long start = EnvironmentEdgeManager.currentTime();
         boolean completed =
-            region.compact(compaction, store, compactionThroughputController, user);
+            region.compact(c, store, compactionThroughputController, user);
         long now = EnvironmentEdgeManager.currentTime();
         LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
               this + "; duration=" + StringUtils.formatTimeDiff(now, start));
@@ -582,10 +549,10 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
         region.reportCompactionRequestFailure();
         server.checkFileSystem();
       } finally {
+        c.getRequest().getTracker().afterExecute(store);
         region.decrementCompactionsQueuedCount();
         LOG.debug("CompactSplitThread Status: " + CompactSplit.this);
       }
-      this.compaction.getRequest().afterExecute();
     }
 
     @Override
@@ -615,9 +582,9 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
     @Override
     public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
       if (runnable instanceof CompactionRunner) {
-        CompactionRunner runner = (CompactionRunner)runnable;
+        CompactionRunner runner = (CompactionRunner) runnable;
         LOG.debug("Compaction Rejected: " + runner);
-        runner.store.cancelRequestedCompaction(runner.compaction);
+        runner.compaction.ifPresent(c -> runner.store.cancelRequestedCompaction(c));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java
deleted file mode 100644
index d1f02fe..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.Pair;
-
-@InterfaceAudience.Private
-public interface CompactionRequestor {
-  /**
-   * @param r Region to compact
-   * @param why Why compaction was requested -- used in debug messages
-   * @return The created {@link CompactionRequest CompactionRequests} or an empty list if no
-   *         compactions were started
-   * @throws IOException
-   */
-  List<CompactionRequest> requestCompaction(final Region r, final String why)
-      throws IOException;
-
-  /**
-   * @param r Region to compact
-   * @param why Why compaction was requested -- used in debug messages
-   * @param requests custom compaction requests. Each compaction must specify the store on which it
-   *          is acting. Can be <tt>null</tt> in which case a compaction will be attempted on all
-   *          stores for the region.
-   * @return The created {@link CompactionRequest CompactionRequests} or an empty list if no
-   *         compactions were started
-   * @throws IOException
-   */
-  List<CompactionRequest> requestCompaction(
-    final Region r, final String why, List<Pair<CompactionRequest, Store>> requests
-  )
-      throws IOException;
-
-  /**
-   * @param r Region to compact
-   * @param s Store within region to compact
-   * @param why Why compaction was requested -- used in debug messages
-   * @param request custom compaction request for the {@link Region} and {@link Store}. Custom
-   *          request must be <tt>null</tt> or be constructed with matching region and store.
-   * @return The created {@link CompactionRequest} or <tt>null</tt> if no compaction was started.
-   * @throws IOException
-   */
-  CompactionRequest requestCompaction(
-    final Region r, final Store s, final String why, CompactionRequest request
-  ) throws IOException;
-
-  /**
-   * @param r Region to compact
-   * @param why Why compaction was requested -- used in debug messages
-   * @param pri Priority of this compaction. minHeap. &lt;=0 is critical
-   * @param requests custom compaction requests. Each compaction must specify the store on which it
-   *          is acting. Can be <tt>null</tt> in which case a compaction will be attempted on all
-   *          stores for the region.
-   * @param user  the effective user
-   * @return The created {@link CompactionRequest CompactionRequests} or an empty list if no
-   *         compactions were started.
-   * @throws IOException
-   */
-  List<CompactionRequest> requestCompaction(
-    final Region r, final String why, int pri, List<Pair<CompactionRequest, Store>> requests,
-    User user
-  ) throws IOException;
-
-  /**
-   * @param r Region to compact
-   * @param s Store within region to compact
-   * @param why Why compaction was requested -- used in debug messages
-   * @param pri Priority of this compaction. minHeap. &lt;=0 is critical
-   * @param request custom compaction request to run. {@link Store} and {@link Region} for the
-   *          request must match the region and store specified here.
-   * @param user
-   * @return The created {@link CompactionRequest} or <tt>null</tt> if no compaction was started
-   * @throws IOException
-   */
-  CompactionRequest requestCompaction(
-    final Region r, final Store s, final String why, int pri, CompactionRequest request, User user
-  ) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllLargeStoresPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllLargeStoresPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllLargeStoresPolicy.java
index b0eae71..e4476d0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllLargeStoresPolicy.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllLargeStoresPolicy.java
@@ -31,7 +31,7 @@ import org.apache.yetus.audience.InterfaceAudience;
  * enough, then all stores will be flushed.
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class FlushAllLargeStoresPolicy extends FlushLargeStoresPolicy{
+public class FlushAllLargeStoresPolicy extends FlushLargeStoresPolicy {
 
   private static final Log LOG = LogFactory.getLog(FlushAllLargeStoresPolicy.class);
 
@@ -48,20 +48,22 @@ public class FlushAllLargeStoresPolicy extends FlushLargeStoresPolicy{
   }
 
   @Override
-  public Collection<Store> selectStoresToFlush() {
+  public Collection<HStore> selectStoresToFlush() {
     // no need to select stores if only one family
     if (region.getTableDescriptor().getColumnFamilyCount() == 1) {
       return region.stores.values();
     }
     // start selection
-    Collection<Store> stores = region.stores.values();
-    Set<Store> specificStoresToFlush = new HashSet<>();
-    for (Store store : stores) {
+    Collection<HStore> stores = region.stores.values();
+    Set<HStore> specificStoresToFlush = new HashSet<>();
+    for (HStore store : stores) {
       if (shouldFlush(store)) {
         specificStoresToFlush.add(store);
       }
     }
-    if (!specificStoresToFlush.isEmpty()) return specificStoresToFlush;
+    if (!specificStoresToFlush.isEmpty()) {
+      return specificStoresToFlush;
+    }
 
     // Didn't find any CFs which were above the threshold for selection.
     if (LOG.isDebugEnabled()) {
@@ -71,8 +73,8 @@ public class FlushAllLargeStoresPolicy extends FlushLargeStoresPolicy{
   }
 
   @Override
-  protected boolean shouldFlush(Store store) {
-    return (super.shouldFlush(store) || region.shouldFlushStore(store));
+  protected boolean shouldFlush(HStore store) {
+    return super.shouldFlush(store) || region.shouldFlushStore(store);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java
index 5c7b3af..97a04f0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java
@@ -28,7 +28,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 public class FlushAllStoresPolicy extends FlushPolicy {
 
   @Override
-  public Collection<Store> selectStoresToFlush() {
+  public Collection<HStore> selectStoresToFlush() {
     return region.stores.values();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
index e37a1a2..e0c6510 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
@@ -77,7 +77,7 @@ public abstract class FlushLargeStoresPolicy extends FlushPolicy {
     return flushSizeLowerBound;
   }
 
-  protected boolean shouldFlush(Store store) {
+  protected boolean shouldFlush(HStore store) {
     if (store.getSizeOfMemStore().getDataSize() > this.flushSizeLowerBound) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Flush Column Family " + store.getColumnFamilyName() + " of " +

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushNonSloppyStoresFirstPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushNonSloppyStoresFirstPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushNonSloppyStoresFirstPolicy.java
index 1196bd5..c779ce3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushNonSloppyStoresFirstPolicy.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushNonSloppyStoresFirstPolicy.java
@@ -32,26 +32,31 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
 public class FlushNonSloppyStoresFirstPolicy extends FlushLargeStoresPolicy {
 
-  private Collection<Store> regularStores = new HashSet<>();
-  private Collection<Store> sloppyStores = new HashSet<>();
+  private Collection<HStore> regularStores = new HashSet<>();
+  private Collection<HStore> sloppyStores = new HashSet<>();
 
   /**
    * @return the stores need to be flushed.
    */
-  @Override public Collection<Store> selectStoresToFlush() {
-    Collection<Store> specificStoresToFlush = new HashSet<>();
-    for(Store store : regularStores) {
-      if(shouldFlush(store) || region.shouldFlushStore(store)) {
+  @Override
+  public Collection<HStore> selectStoresToFlush() {
+    Collection<HStore> specificStoresToFlush = new HashSet<>();
+    for (HStore store : regularStores) {
+      if (shouldFlush(store) || region.shouldFlushStore(store)) {
         specificStoresToFlush.add(store);
       }
     }
-    if(!specificStoresToFlush.isEmpty()) return specificStoresToFlush;
-    for(Store store : sloppyStores) {
-      if(shouldFlush(store)) {
+    if (!specificStoresToFlush.isEmpty()) {
+      return specificStoresToFlush;
+    }
+    for (HStore store : sloppyStores) {
+      if (shouldFlush(store)) {
         specificStoresToFlush.add(store);
       }
     }
-    if(!specificStoresToFlush.isEmpty()) return specificStoresToFlush;
+    if (!specificStoresToFlush.isEmpty()) {
+      return specificStoresToFlush;
+    }
     return region.stores.values();
   }
 
@@ -59,8 +64,8 @@ public class FlushNonSloppyStoresFirstPolicy extends FlushLargeStoresPolicy {
   protected void configureForRegion(HRegion region) {
     super.configureForRegion(region);
     this.flushSizeLowerBound = getFlushSizeLowerBound(region);
-    for(Store store : region.stores.values()) {
-      if(store.isSloppyMemstore()) {
+    for (HStore store : region.stores.values()) {
+      if (store.isSloppyMemstore()) {
         sloppyStores.add(store);
       } else {
         regularStores.add(store);

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java
index bc49c92..fecbd2f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java
@@ -44,6 +44,6 @@ public abstract class FlushPolicy extends Configured {
   /**
    * @return the stores need to be flushed.
    */
-  public abstract Collection<Store> selectStoresToFlush();
+  public abstract Collection<HStore> selectStoresToFlush();
 
 }


[2/3] hbase git commit: HBASE-18453 CompactionRequest should not be exposed to user directly

Posted by zh...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 6edf006..86a24ad 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -17,6 +17,59 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
+import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
+
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+import java.text.ParseException;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Optional;
+import java.util.RandomAccess;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -90,6 +143,7 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.Write
 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
 import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
 import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
@@ -97,7 +151,6 @@ import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Optional;
 import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps;
@@ -143,58 +196,6 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
 
-import java.io.EOFException;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.lang.reflect.Constructor;
-import java.nio.ByteBuffer;
-import java.text.ParseException;
-import java.util.AbstractList;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
-import java.util.NavigableSet;
-import java.util.RandomAccess;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.LongAdder;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Function;
-
-import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
-import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
-
 @SuppressWarnings("deprecation")
 @InterfaceAudience.Private
 public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region {
@@ -254,9 +255,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   // - the thread that owns the lock (allow reentrancy)
   // - reference count of (reentrant) locks held by the thread
   // - the row itself
-  private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows =
+      new ConcurrentHashMap<>();
 
-  protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR);
+  protected final Map<byte[], HStore> stores =
+      new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR);
 
   // TODO: account for each registered handler in HeapSize computation
   private Map<String, com.google.protobuf.Service> coprocessorServiceHandlers = Maps.newHashMap();
@@ -513,7 +516,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   /** A result object from prepare flush cache stage */
   @VisibleForTesting
   static class PrepareFlushResult {
-    final FlushResult result; // indicating a failure result from prepare
+    final FlushResultImpl result; // indicating a failure result from prepare
     final TreeMap<byte[], StoreFlushContext> storeFlushCtxs;
     final TreeMap<byte[], List<Path>> committedFiles;
     final TreeMap<byte[], MemstoreSize> storeFlushableSize;
@@ -523,7 +526,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     final MemstoreSize totalFlushableSize;
 
     /** Constructs an early exit case */
-    PrepareFlushResult(FlushResult result, long flushSeqId) {
+    PrepareFlushResult(FlushResultImpl result, long flushSeqId) {
       this(result, null, null, null, Math.max(0, flushSeqId), 0, 0, new MemstoreSize());
     }
 
@@ -538,7 +541,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
 
     private PrepareFlushResult(
-      FlushResult result,
+        FlushResultImpl result,
       TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
       TreeMap<byte[], List<Path>> committedFiles,
       TreeMap<byte[], MemstoreSize> storeFlushableSize, long startTime, long flushSeqId,
@@ -616,7 +619,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   final long rowProcessorTimeout;
 
   // Last flush time for each Store. Useful when we are flushing for each column
-  private final ConcurrentMap<Store, Long> lastStoreFlushTimeMap = new ConcurrentHashMap<>();
+  private final ConcurrentMap<HStore, Long> lastStoreFlushTimeMap = new ConcurrentHashMap<>();
 
   final RegionServerServices rsServices;
   private RegionServerAccounting rsAccounting;
@@ -802,7 +805,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     this.disallowWritesInRecovering =
         conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING,
           HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG);
-    configurationManager = Optional.absent();
+    configurationManager = Optional.empty();
 
     // disable stats tracking system tables, but check the config for everything else
     this.regionStatsEnabled = htd.getTableName().getNamespaceAsString().equals(
@@ -902,22 +905,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     long maxSeqId = initializeStores(reporter, status);
     this.mvcc.advanceTo(maxSeqId);
     if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
-      List<Store> stores = this.getStores();  // update the stores that we are replaying
+      Collection<HStore> stores = this.stores.values();
       try {
-        for (Store store : stores) {
-          ((HStore) store).startReplayingFromWAL();
-        }
+        // update the stores that we are replaying
+        stores.forEach(HStore::startReplayingFromWAL);
         // Recover any edits if available.
         maxSeqId = Math.max(maxSeqId,
-            replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
+          replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
         // Make sure mvcc is up to max.
         this.mvcc.advanceTo(maxSeqId);
       } finally {
-        for (Store store : stores) {            // update the stores that we are done replaying
-          ((HStore)store).stopReplayingFromWAL();
-        }
+        // update the stores that we are done replaying
+        stores.forEach(HStore::startReplayingFromWAL);
       }
-
     }
     this.lastReplayedOpenRegionSeqId = maxSeqId;
 
@@ -947,7 +947,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     this.flushPolicy = FlushPolicyFactory.create(this, conf);
 
     long lastFlushTime = EnvironmentEdgeManager.currentTime();
-    for (Store store: stores.values()) {
+    for (HStore store: stores.values()) {
       this.lastStoreFlushTimeMap.put(store, lastFlushTime);
     }
 
@@ -988,10 +988,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @return Highest sequenceId found out in a Store.
    * @throws IOException
    */
-  private long initializeStores(final CancelableProgressable reporter, MonitoredTask status)
-  throws IOException {
+  private long initializeStores(CancelableProgressable reporter, MonitoredTask status)
+      throws IOException {
     // Load in all the HStores.
-
     long maxSeqId = -1;
     // initialized to -1 so that we pick up MemstoreTS from column families
     long maxMemstoreTS = -1;
@@ -1050,11 +1049,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         if (!allStoresOpened) {
           // something went wrong, close all opened stores
           LOG.error("Could not initialize all stores for the region=" + this);
-          for (Store store : this.stores.values()) {
+          for (HStore store : this.stores.values()) {
             try {
               store.close();
             } catch (IOException e) {
-              LOG.warn(e.getMessage());
+              LOG.warn("close store failed", e);
             }
           }
         }
@@ -1079,11 +1078,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    */
   private NavigableMap<byte[], List<Path>> getStoreFiles() {
     NavigableMap<byte[], List<Path>> allStoreFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-    for (Store store: getStores()) {
+    for (HStore store : stores.values()) {
       Collection<StoreFile> storeFiles = store.getStorefiles();
-      if (storeFiles == null) continue;
+      if (storeFiles == null) {
+        continue;
+      }
       List<Path> storeFileNames = new ArrayList<>();
-      for (StoreFile storeFile: storeFiles) {
+      for (StoreFile storeFile : storeFiles) {
         storeFileNames.add(storeFile.getPath());
       }
       allStoreFiles.put(store.getColumnFamilyDescriptor().getName(), storeFileNames);
@@ -1121,10 +1122,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @return True if this region has references.
    */
   public boolean hasReferences() {
-    for (Store store : this.stores.values()) {
-      if (store.hasReferences()) return true;
-    }
-    return false;
+    return stores.values().stream().anyMatch(HStore::hasReferences);
   }
 
   public void blockUpdates() {
@@ -1137,19 +1135,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   @Override
   public HDFSBlocksDistribution getHDFSBlocksDistribution() {
-    HDFSBlocksDistribution hdfsBlocksDistribution =
-      new HDFSBlocksDistribution();
-    synchronized (this.stores) {
-      for (Store store : this.stores.values()) {
-        Collection<StoreFile> storeFiles = store.getStorefiles();
-        if (storeFiles == null) continue;
-        for (StoreFile sf : storeFiles) {
-          HDFSBlocksDistribution storeFileBlocksDistribution =
-            sf.getHDFSBlockDistribution();
-          hdfsBlocksDistribution.add(storeFileBlocksDistribution);
-        }
-      }
-    }
+    HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
+    stores.values().stream().filter(s -> s.getStorefiles() != null)
+        .flatMap(s -> s.getStorefiles().stream()).map(StoreFile::getHDFSBlockDistribution)
+        .forEachOrdered(hdfsBlocksDistribution::add);
     return hdfsBlocksDistribution;
   }
 
@@ -1161,8 +1150,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @return The HDFS blocks distribution for the given region.
    * @throws IOException
    */
-  public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
-      final TableDescriptor tableDescriptor, final HRegionInfo regionInfo) throws IOException {
+  public static HDFSBlocksDistribution computeHDFSBlocksDistribution(Configuration conf,
+      TableDescriptor tableDescriptor, HRegionInfo regionInfo) throws IOException {
     Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), tableDescriptor.getTableName());
     return computeHDFSBlocksDistribution(conf, tableDescriptor, regionInfo, tablePath);
   }
@@ -1176,9 +1165,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @return The HDFS blocks distribution for the given region.
    * @throws IOException
    */
-  public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
-      final TableDescriptor tableDescriptor, final HRegionInfo regionInfo,  Path tablePath)
-      throws IOException {
+  public static HDFSBlocksDistribution computeHDFSBlocksDistribution(Configuration conf,
+      TableDescriptor tableDescriptor, HRegionInfo regionInfo, Path tablePath) throws IOException {
     HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
     FileSystem fs = tablePath.getFileSystem(conf);
 
@@ -1407,9 +1395,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       new Throwable("LOGGING: REMOVE"));
     // REMOVE BELOW!!!!
     LOG.info("DEBUG LIST ALL FILES");
-    for (Store store: this.stores.values()) {
+    for (HStore store : this.stores.values()) {
       LOG.info("store " + store.getColumnFamilyName());
-      for (StoreFile sf: store.getStorefiles()) {
+      for (StoreFile sf : store.getStorefiles()) {
         LOG.info(sf.toStringDetailed());
       }
     }
@@ -1667,7 +1655,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           new ExecutorCompletionService<>(storeCloserThreadPool);
 
         // close each store in parallel
-        for (final Store store : stores.values()) {
+        for (HStore store : stores.values()) {
           MemstoreSize flushableSize = store.getSizeToFlush();
           if (!(abort || flushableSize.getDataSize() == 0 || writestate.readOnly)) {
             if (getRegionServerServices() != null) {
@@ -1740,11 +1728,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   }
 
   private long getMemstoreHeapSize() {
-    long size = 0;
-    for (Store s : this.stores.values()) {
-      size += s.getSizeOfMemStore().getHeapSize();
-    }
-    return size;
+    return stores.values().stream().mapToLong(s -> s.getSizeOfMemStore().getHeapSize()).sum();
   }
 
   @Override
@@ -1902,17 +1886,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   @Override
   public long getOldestHfileTs(boolean majorCompactionOnly) throws IOException {
     long result = Long.MAX_VALUE;
-    for (Store store : getStores()) {
+    for (HStore store : stores.values()) {
       Collection<StoreFile> storeFiles = store.getStorefiles();
-      if (storeFiles == null) continue;
+      if (storeFiles == null) {
+        continue;
+      }
       for (StoreFile file : storeFiles) {
         StoreFileReader sfReader = file.getReader();
-        if (sfReader == null) continue;
+        if (sfReader == null) {
+          continue;
+        }
         HFile.Reader reader = sfReader.getHFileReader();
-        if (reader == null) continue;
+        if (reader == null) {
+          continue;
+        }
         if (majorCompactionOnly) {
           byte[] val = reader.loadFileInfo().get(StoreFile.MAJOR_COMPACTION_KEY);
-          if (val == null || !Bytes.toBoolean(val)) continue;
+          if (val == null || !Bytes.toBoolean(val)) {
+            continue;
+          }
         }
         result = Math.min(result, reader.getFileContext().getFileCreateTime());
       }
@@ -1942,20 +1934,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   // These methods are meant to be called periodically by the HRegionServer for
   // upkeep.
   //////////////////////////////////////////////////////////////////////////////
-
-  /** @return returns size of largest HStore. */
+  /**
+   * @return returns size of largest HStore.
+   */
   public long getLargestHStoreSize() {
-    long size = 0;
-    for (Store h : stores.values()) {
-      long storeSize = h.getSize();
-      if (storeSize > size) {
-        size = storeSize;
-      }
-    }
-    return size;
+    return stores.values().stream().mapToLong(HStore::getSize).max().orElse(0L);
   }
 
-  /*
+  /**
    * Do preparation for pending compaction.
    * @throws IOException
    */
@@ -1964,19 +1950,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   @Override
   public void triggerMajorCompaction() throws IOException {
-    for (Store s : getStores()) {
-      s.triggerMajorCompaction();
-    }
+    stores.values().forEach(HStore::triggerMajorCompaction);
   }
 
   @Override
-  public void compact(final boolean majorCompaction) throws IOException {
+  public void compact(boolean majorCompaction) throws IOException {
     if (majorCompaction) {
       triggerMajorCompaction();
     }
-    for (Store s : getStores()) {
-      CompactionContext compaction = s.requestCompaction();
-      if (compaction != null) {
+    for (HStore s : stores.values()) {
+      Optional<CompactionContext> compaction = s.requestCompaction();
+      if (compaction.isPresent()) {
         ThroughputController controller = null;
         if (rsServices != null) {
           controller = CompactionThroughputControllerFactory.create(rsServices, conf);
@@ -1984,43 +1968,41 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         if (controller == null) {
           controller = NoLimitThroughputController.INSTANCE;
         }
-        compact(compaction, s, controller, null);
+        compact(compaction.get(), s, controller, null);
       }
     }
   }
 
   /**
-   * This is a helper function that compact all the stores synchronously
+   * This is a helper function that compact all the stores synchronously.
+   * <p>
    * It is used by utilities and testing
-   *
-   * @throws IOException e
    */
+  @VisibleForTesting
   public void compactStores() throws IOException {
-    for (Store s : getStores()) {
-      CompactionContext compaction = s.requestCompaction();
-      if (compaction != null) {
-        compact(compaction, s, NoLimitThroughputController.INSTANCE, null);
+    for (HStore s : stores.values()) {
+      Optional<CompactionContext> compaction = s.requestCompaction();
+      if (compaction.isPresent()) {
+        compact(compaction.get(), s, NoLimitThroughputController.INSTANCE, null);
       }
     }
   }
 
   /**
-   * This is a helper function that compact the given store
+   * This is a helper function that compact the given store.
+   * <p>
    * It is used by utilities and testing
-   *
-   * @throws IOException e
    */
   @VisibleForTesting
-  void compactStore(byte[] family, ThroughputController throughputController)
-      throws IOException {
-    Store s = getStore(family);
-    CompactionContext compaction = s.requestCompaction();
-    if (compaction != null) {
-      compact(compaction, s, throughputController, null);
+  void compactStore(byte[] family, ThroughputController throughputController) throws IOException {
+    HStore s = getStore(family);
+    Optional<CompactionContext> compaction = s.requestCompaction();
+    if (compaction.isPresent()) {
+      compact(compaction.get(), s, throughputController, null);
     }
   }
 
-  /*
+  /**
    * Called by compaction thread and after region is opened to compact the
    * HStores if necessary.
    *
@@ -2035,12 +2017,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @param throughputController
    * @return whether the compaction completed
    */
-  public boolean compact(CompactionContext compaction, Store store,
+  public boolean compact(CompactionContext compaction, HStore store,
       ThroughputController throughputController) throws IOException {
     return compact(compaction, store, throughputController, null);
   }
 
-  public boolean compact(CompactionContext compaction, Store store,
+  public boolean compact(CompactionContext compaction, HStore store,
       ThroughputController throughputController, User user) throws IOException {
     assert compaction != null && compaction.hasSelection();
     assert !compaction.getRequest().getFiles().isEmpty();
@@ -2214,7 +2196,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * because a Snapshot was not properly persisted. The region is put in closing mode, and the
    * caller MUST abort after this.
    */
-  public FlushResult flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker)
+  public FlushResultImpl flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker)
       throws IOException {
     // fail-fast instead of waiting on the lock
     if (this.closing.get()) {
@@ -2261,10 +2243,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
 
       try {
-        Collection<Store> specificStoresToFlush =
+        Collection<HStore> specificStoresToFlush =
             forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush();
-        FlushResult fs = internalFlushcache(specificStoresToFlush,
-          status, writeFlushRequestWalMarker);
+        FlushResultImpl fs =
+            internalFlushcache(specificStoresToFlush, status, writeFlushRequestWalMarker);
 
         if (coprocessorHost != null) {
           status.setStatus("Running post-flush coprocessor hooks");
@@ -2297,7 +2279,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * that you always flush all stores). Otherwise the method will always
    * returns true which will make a lot of flush requests.
    */
-  boolean shouldFlushStore(Store store) {
+  boolean shouldFlushStore(HStore store) {
     long earliest = this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(),
       store.getColumnFamilyDescriptor().getName()) - 1;
     if (earliest > 0 && earliest + flushPerChanges < mvcc.getReadPoint()) {
@@ -2349,7 +2331,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
     //since we didn't flush in the recent past, flush now if certain conditions
     //are met. Return true on first such memstore hit.
-    for (Store s : getStores()) {
+    for (Store s : stores.values()) {
       if (s.timeOfOldestEdit() < now - modifiedFlushCheckInterval) {
         // we have an old enough edit in the memstore, flush
         whyFlush.append(s.toString() + " has an old edit so flush to free WALs");
@@ -2361,39 +2343,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   /**
    * Flushing all stores.
-   *
    * @see #internalFlushcache(Collection, MonitoredTask, boolean)
    */
-  private FlushResult internalFlushcache(MonitoredTask status)
-      throws IOException {
+  private FlushResult internalFlushcache(MonitoredTask status) throws IOException {
     return internalFlushcache(stores.values(), status, false);
   }
 
   /**
    * Flushing given stores.
-   *
    * @see #internalFlushcache(WAL, long, Collection, MonitoredTask, boolean)
    */
-  private FlushResult internalFlushcache(final Collection<Store> storesToFlush,
-      MonitoredTask status, boolean writeFlushWalMarker) throws IOException {
-    return internalFlushcache(this.wal, HConstants.NO_SEQNUM, storesToFlush,
-        status, writeFlushWalMarker);
+  private FlushResultImpl internalFlushcache(Collection<HStore> storesToFlush, MonitoredTask status,
+      boolean writeFlushWalMarker) throws IOException {
+    return internalFlushcache(this.wal, HConstants.NO_SEQNUM, storesToFlush, status,
+      writeFlushWalMarker);
   }
 
   /**
-   * Flush the memstore. Flushing the memstore is a little tricky. We have a lot
-   * of updates in the memstore, all of which have also been written to the wal.
-   * We need to write those updates in the memstore out to disk, while being
-   * able to process reads/writes as much as possible during the flush
-   * operation.
+   * Flush the memstore. Flushing the memstore is a little tricky. We have a lot of updates in the
+   * memstore, all of which have also been written to the wal. We need to write those updates in the
+   * memstore out to disk, while being able to process reads/writes as much as possible during the
+   * flush operation.
    * <p>
-   * This method may block for some time. Every time you call it, we up the
-   * regions sequence id even if we don't flush; i.e. the returned region id
-   * will be at least one larger than the last edit applied to this region. The
-   * returned id does not refer to an actual edit. The returned id can be used
-   * for say installing a bulk loaded file just ahead of the last hfile that was
-   * the result of this flush, etc.
-   *
+   * This method may block for some time. Every time you call it, we up the regions sequence id even
+   * if we don't flush; i.e. the returned region id will be at least one larger than the last edit
+   * applied to this region. The returned id does not refer to an actual edit. The returned id can
+   * be used for say installing a bulk loaded file just ahead of the last hfile that was the result
+   * of this flush, etc.
    * @param wal Null if we're NOT to go via wal.
    * @param myseqid The seqid to use if <code>wal</code> is null writing out flush file.
    * @param storesToFlush The list of stores to flush.
@@ -2401,9 +2377,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @throws IOException general io exceptions
    * @throws DroppedSnapshotException Thrown when replay of WAL is required.
    */
-  protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
-      final Collection<Store> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
-          throws IOException {
+  protected FlushResultImpl internalFlushcache(WAL wal, long myseqid, Collection<HStore> storesToFlush,
+      MonitoredTask status, boolean writeFlushWalMarker) throws IOException {
     PrepareFlushResult result
       = internalPrepareFlushCache(wal, myseqid, storesToFlush, status, writeFlushWalMarker);
     if (result.result == null) {
@@ -2415,9 +2390,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DLS_DEAD_LOCAL_STORE",
       justification="FindBugs seems confused about trxId")
-  protected PrepareFlushResult internalPrepareFlushCache(final WAL wal, final long myseqid,
-      final Collection<Store> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
-  throws IOException {
+  protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid,
+      Collection<HStore> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
+      throws IOException {
     if (this.rsServices != null && this.rsServices.isAborted()) {
       // Don't flush when server aborting, it's unsafe
       throw new IOException("Aborting flush because server is aborted...");
@@ -2439,11 +2414,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           // edits in the WAL sub-system. Up the sequence number so the resulting flush id is for
           // sure just beyond the last appended region edit and not associated with any edit
           // (useful as marker when bulk loading, etc.).
-          FlushResult flushResult = null;
           if (wal != null) {
             writeEntry = mvcc.begin();
             long flushOpSeqId = writeEntry.getWriteNumber();
-            flushResult = new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
+            FlushResultImpl flushResult = new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
               flushOpSeqId, "Nothing to flush",
             writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker));
             mvcc.completeAndWait(writeEntry);
@@ -2479,9 +2453,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     MemstoreSize totalSizeOfFlushableStores = new MemstoreSize();
 
     Map<byte[], Long> flushedFamilyNamesToSeq = new HashMap<>();
-    for (Store store: storesToFlush) {
+    for (HStore store : storesToFlush) {
       flushedFamilyNamesToSeq.put(store.getColumnFamilyDescriptor().getName(),
-          ((HStore) store).preFlushSeqIDEstimation());
+        store.preFlushSeqIDEstimation());
     }
 
     TreeMap<byte[], StoreFlushContext> storeFlushCtxs = new TreeMap<>(Bytes.BYTES_COMPARATOR);
@@ -2517,7 +2491,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         flushedSeqId = flushOpSeqId = myseqid;
       }
 
-      for (Store s : storesToFlush) {
+      for (HStore s : storesToFlush) {
         MemstoreSize flushableSize = s.getSizeToFlush();
         totalSizeOfFlushableStores.incMemstoreSize(flushableSize);
         storeFlushCtxs.put(s.getColumnFamilyDescriptor().getName(), s.createFlushContext(flushOpSeqId));
@@ -2555,7 +2529,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   /**
    * Utility method broken out of internalPrepareFlushCache so that method is smaller.
    */
-  private void logFatLineOnFlush(final Collection<Store> storesToFlush, final long sequenceId) {
+  private void logFatLineOnFlush(Collection<HStore> storesToFlush, long sequenceId) {
     if (!LOG.isInfoEnabled()) {
       return;
     }
@@ -2563,7 +2537,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     StringBuilder perCfExtras = null;
     if (!isAllFamilies(storesToFlush)) {
       perCfExtras = new StringBuilder();
-      for (Store store: storesToFlush) {
+      for (HStore store: storesToFlush) {
         perCfExtras.append("; ").append(store.getColumnFamilyName());
         perCfExtras.append("=")
             .append(StringUtils.byteDesc(store.getSizeToFlush().getDataSize()));
@@ -2611,7 +2585,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   /**
    * @return True if passed Set is all families in the region.
    */
-  private boolean isAllFamilies(final Collection<Store> families) {
+  private boolean isAllFamilies(Collection<HStore> families) {
     return families == null || this.stores.size() == families.size();
   }
 
@@ -2639,11 +2613,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
       justification="Intentional; notify is about completed flush")
-  protected FlushResult internalFlushCacheAndCommit(
-        final WAL wal, MonitoredTask status, final PrepareFlushResult prepareResult,
-        final Collection<Store> storesToFlush)
-    throws IOException {
-
+  protected FlushResultImpl internalFlushCacheAndCommit(WAL wal, MonitoredTask status,
+      PrepareFlushResult prepareResult, Collection<HStore> storesToFlush) throws IOException {
     // prepare flush context is carried via PrepareFlushResult
     TreeMap<byte[], StoreFlushContext> storeFlushCtxs = prepareResult.storeFlushCtxs;
     TreeMap<byte[], List<Path>> committedFiles = prepareResult.committedFiles;
@@ -2673,7 +2644,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
       // Switch snapshot (in memstore) -> new hfile (thus causing
       // all the store scanners to reset/reseek).
-      Iterator<Store> it = storesToFlush.iterator();
+      Iterator<HStore> it = storesToFlush.iterator();
       // stores.values() and storeFlushCtxs have same order
       for (StoreFlushContext flush : storeFlushCtxs.values()) {
         boolean needsCompaction = flush.commit(status);
@@ -2746,7 +2717,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
 
     // Record latest flush time
-    for (Store store: storesToFlush) {
+    for (HStore store: storesToFlush) {
       this.lastStoreFlushTimeMap.put(store, startTime);
     }
 
@@ -4002,34 +3973,34 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
   }
 
-  /*
+  /**
    * @param delta If we are doing delta changes -- e.g. increment/append -- then this flag will be
-   *  set; when set we will run operations that make sense in the increment/append scenario but
-   *  that do not make sense otherwise.
-   * @see #applyToMemstore(Store, Cell, long)
+   *          set; when set we will run operations that make sense in the increment/append scenario
+   *          but that do not make sense otherwise.
+   * @see #applyToMemstore(HStore, Cell, long)
    */
-  private void applyToMemstore(final Store store, final List<Cell> cells, final boolean delta,
+  private void applyToMemstore(HStore store, List<Cell> cells, boolean delta,
       MemstoreSize memstoreSize) throws IOException {
     // Any change in how we update Store/MemStore needs to also be done in other applyToMemstore!!!!
     boolean upsert = delta && store.getColumnFamilyDescriptor().getMaxVersions() == 1;
     if (upsert) {
-      ((HStore) store).upsert(cells, getSmallestReadPoint(), memstoreSize);
+      store.upsert(cells, getSmallestReadPoint(), memstoreSize);
     } else {
-      ((HStore) store).add(cells, memstoreSize);
+      store.add(cells, memstoreSize);
     }
   }
 
-  /*
-   * @see #applyToMemstore(Store, List, boolean, boolean, long)
+  /**
+   * @see #applyToMemstore(HStore, List, boolean, boolean, long)
    */
-  private void applyToMemstore(final Store store, final Cell cell, MemstoreSize memstoreSize)
-  throws IOException {
+  private void applyToMemstore(HStore store, Cell cell, MemstoreSize memstoreSize)
+      throws IOException {
     // Any change in how we update Store/MemStore needs to also be done in other applyToMemstore!!!!
     if (store == null) {
       checkFamily(CellUtil.cloneFamily(cell));
       // Unreachable because checkFamily will throw exception
     }
-    ((HStore) store).add(cell, memstoreSize);
+    store.add(cell, memstoreSize);
   }
 
   @Override
@@ -4368,7 +4339,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             // Figure which store the edit is meant for.
             if (store == null || !CellUtil.matchingFamily(cell,
                 store.getColumnFamilyDescriptor().getName())) {
-              store = getHStore(cell);
+              store = getStore(cell);
             }
             if (store == null) {
               // This should never happen.  Perhaps schema was changed between
@@ -4497,7 +4468,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
       startRegionOperation(Operation.REPLAY_EVENT);
       try {
-        HStore store = this.getHStore(compaction.getFamilyName().toByteArray());
+        HStore store = this.getStore(compaction.getFamilyName().toByteArray());
         if (store == null) {
           LOG.warn(getRegionInfo().getEncodedName() + " : "
               + "Found Compaction WAL edit for deleted family:"
@@ -4567,10 +4538,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   PrepareFlushResult replayWALFlushStartMarker(FlushDescriptor flush) throws IOException {
     long flushSeqId = flush.getFlushSequenceNumber();
 
-    HashSet<Store> storesToFlush = new HashSet<>();
+    HashSet<HStore> storesToFlush = new HashSet<>();
     for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) {
       byte[] family = storeFlush.getFamilyName().toByteArray();
-      Store store = getStore(family);
+      HStore store = getStore(family);
       if (store == null) {
         LOG.warn(getRegionInfo().getEncodedName() + " : "
           + "Received a flush start marker from primary, but the family is not found. Ignoring"
@@ -4807,7 +4778,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       throws IOException {
     for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) {
       byte[] family = storeFlush.getFamilyName().toByteArray();
-      Store store = getStore(family);
+      HStore store = getStore(family);
       if (store == null) {
         LOG.warn(getRegionInfo().getEncodedName() + " : "
             + "Received a flush commit marker from primary, but the family is not found."
@@ -4843,7 +4814,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * if the memstore edits have seqNums smaller than the given seq id
    * @throws IOException
    */
-  private MemstoreSize dropMemstoreContentsForSeqId(long seqId, Store store) throws IOException {
+  private MemstoreSize dropMemstoreContentsForSeqId(long seqId, HStore store) throws IOException {
     MemstoreSize totalFreedSize = new MemstoreSize();
     this.updatesLock.writeLock().lock();
     try {
@@ -4857,7 +4828,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
         // Prepare flush (take a snapshot) and then abort (drop the snapshot)
         if (store == null) {
-          for (Store s : stores.values()) {
+          for (HStore s : stores.values()) {
             totalFreedSize.incMemstoreSize(doDropStoreMemstoreContentsForSeqId(s, currentSeqId));
           }
         } else {
@@ -4874,7 +4845,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     return totalFreedSize;
   }
 
-  private MemstoreSize doDropStoreMemstoreContentsForSeqId(Store s, long currentSeqId)
+  private MemstoreSize doDropStoreMemstoreContentsForSeqId(HStore s, long currentSeqId)
       throws IOException {
     MemstoreSize flushableSize = s.getSizeToFlush();
     this.decrMemstoreSize(flushableSize);
@@ -4965,7 +4936,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         for (StoreDescriptor storeDescriptor : regionEvent.getStoresList()) {
           // stores of primary may be different now
           byte[] family = storeDescriptor.getFamilyName().toByteArray();
-          Store store = getStore(family);
+          HStore store = getStore(family);
           if (store == null) {
             LOG.warn(getRegionInfo().getEncodedName() + " : "
                 + "Received a region open marker from primary, but the family is not found. "
@@ -5081,7 +5052,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         for (StoreDescriptor storeDescriptor : bulkLoadEvent.getStoresList()) {
           // stores of primary may be different now
           family = storeDescriptor.getFamilyName().toByteArray();
-          HStore store = getHStore(family);
+          HStore store = getStore(family);
           if (store == null) {
             LOG.warn(getRegionInfo().getEncodedName() + " : "
                     + "Received a bulk load marker from primary, but the family is not found. "
@@ -5119,9 +5090,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     if (writestate.flushing) {
       boolean canDrop = true;
       if (prepareFlushResult.storeFlushCtxs != null) {
-        for (Entry<byte[], StoreFlushContext> entry
-            : prepareFlushResult.storeFlushCtxs.entrySet()) {
-          Store store = getStore(entry.getKey());
+        for (Entry<byte[], StoreFlushContext> entry : prepareFlushResult.storeFlushCtxs
+            .entrySet()) {
+          HStore store = getStore(entry.getKey());
           if (store == null) {
             continue;
           }
@@ -5164,9 +5135,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
     startRegionOperation(); // obtain region close lock
     try {
-      Map<Store, Long> map = new HashMap<>();
+      Map<HStore, Long> map = new HashMap<>();
       synchronized (writestate) {
-        for (Store store : getStores()) {
+        for (HStore store : stores.values()) {
           // TODO: some stores might see new data from flush, while others do not which
           // MIGHT break atomic edits across column families.
           long maxSeqIdBefore = store.getMaxSequenceId();
@@ -5207,10 +5178,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         dropPrepareFlushIfPossible();
 
         // advance the mvcc read point so that the new flushed files are visible.
-          // either greater than flush seq number or they were already picked up via flush.
-          for (Store s : getStores()) {
-            mvcc.advanceTo(s.getMaxMemstoreTS());
-          }
+        // either greater than flush seq number or they were already picked up via flush.
+        for (HStore s : stores.values()) {
+          mvcc.advanceTo(s.getMaxMemstoreTS());
+        }
 
 
         // smallestSeqIdInStores is the seqId that we have a corresponding hfile for. We can safely
@@ -5222,7 +5193,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         }
       }
       if (!map.isEmpty()) {
-        for (Map.Entry<Store, Long> entry : map.entrySet()) {
+        for (Map.Entry<HStore, Long> entry : map.entrySet()) {
           // Drop the memstore contents if they are now smaller than the latest seen flushed file
           totalFreedDataSize += dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey())
               .getDataSize();
@@ -5242,13 +5213,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   private void logRegionFiles() {
     if (LOG.isTraceEnabled()) {
       LOG.trace(getRegionInfo().getEncodedName() + " : Store files for region: ");
-      for (Store s : stores.values()) {
-        Collection<StoreFile> storeFiles = s.getStorefiles();
-        if (storeFiles == null) continue;
-        for (StoreFile sf : storeFiles) {
-          LOG.trace(getRegionInfo().getEncodedName() + " : " + sf);
-        }
-      }
+      stores.values().stream().filter(s -> s.getStorefiles() != null)
+          .flatMap(s -> s.getStorefiles().stream())
+          .forEachOrdered(sf -> LOG.trace(getRegionInfo().getEncodedName() + " : " + sf));
     }
   }
 
@@ -5272,17 +5239,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       + " does not match this region: " + this.getRegionInfo());
   }
 
-  /*
+  /**
    * Used by tests
    * @param s Store to add edit too.
    * @param cell Cell to add.
    * @param memstoreSize
    */
-  protected void restoreEdit(final HStore s, final Cell cell, MemstoreSize memstoreSize) {
+  @VisibleForTesting
+  protected void restoreEdit(HStore s, Cell cell, MemstoreSize memstoreSize) {
     s.add(cell, memstoreSize);
   }
 
-  /*
+  /**
    * @param fs
    * @param p File to check.
    * @return True if file was zero-length (and if so, we'll delete it in here).
@@ -5291,7 +5259,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p)
       throws IOException {
     FileStatus stat = fs.getFileStatus(p);
-    if (stat.getLen() > 0) return false;
+    if (stat.getLen() > 0) {
+      return false;
+    }
     LOG.warn("File " + p + " is zero-length, deleting.");
     fs.delete(p, false);
     return true;
@@ -5311,49 +5281,39 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   }
 
   @Override
-  public Store getStore(final byte[] column) {
-    return getHStore(column);
-  }
-
-  public HStore getHStore(final byte[] column) {
-    return (HStore) this.stores.get(column);
+  public HStore getStore(byte[] column) {
+    return this.stores.get(column);
   }
 
   /**
-   * Return HStore instance. Does not do any copy: as the number of store is limited, we
-   *  iterate on the list.
+   * Return HStore instance. Does not do any copy: as the number of store is limited, we iterate on
+   * the list.
    */
-  private HStore getHStore(Cell cell) {
-    for (Map.Entry<byte[], Store> famStore : stores.entrySet()) {
-      if (CellUtil.matchingFamily(cell, famStore.getKey(), 0, famStore.getKey().length)) {
-        return (HStore) famStore.getValue();
-      }
-    }
-
-    return null;
+  private HStore getStore(Cell cell) {
+    return stores.entrySet().stream().filter(e -> CellUtil.matchingFamily(cell, e.getKey()))
+        .map(e -> e.getValue()).findFirst().orElse(null);
   }
 
   @Override
-  public List<Store> getStores() {
-    List<Store> list = new ArrayList<>(stores.size());
-    list.addAll(stores.values());
-    return list;
+  public List<HStore> getStores() {
+    return new ArrayList<>(stores.values());
   }
 
   @Override
-  public List<String> getStoreFileList(final byte [][] columns)
-    throws IllegalArgumentException {
+  public List<String> getStoreFileList(byte[][] columns) throws IllegalArgumentException {
     List<String> storeFileNames = new ArrayList<>();
-    synchronized(closeLock) {
-      for(byte[] column : columns) {
-        Store store = this.stores.get(column);
+    synchronized (closeLock) {
+      for (byte[] column : columns) {
+        HStore store = this.stores.get(column);
         if (store == null) {
-          throw new IllegalArgumentException("No column family : " +
-              new String(column) + " available");
+          throw new IllegalArgumentException(
+              "No column family : " + new String(column) + " available");
         }
         Collection<StoreFile> storeFiles = store.getStorefiles();
-        if (storeFiles == null) continue;
-        for (StoreFile storeFile: storeFiles) {
+        if (storeFiles == null) {
+          continue;
+        }
+        for (StoreFile storeFile : storeFiles) {
           storeFileNames.add(storeFile.getPath().toString());
         }
 
@@ -5368,7 +5328,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   //////////////////////////////////////////////////////////////////////////////
 
   /** Make sure this is a valid row for the HRegion */
-  void checkRow(final byte [] row, String op) throws IOException {
+  void checkRow(byte[] row, String op) throws IOException {
     if (!rowIsInRange(getRegionInfo(), row)) {
       throw new WrongRegionException("Requested row out of range for " +
           op + " on HRegion " + this + ", startKey='" +
@@ -5637,7 +5597,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         byte[] familyName = p.getFirst();
         String path = p.getSecond();
 
-        HStore store = getHStore(familyName);
+        HStore store = getStore(familyName);
         if (store == null) {
           IOException ioe = new org.apache.hadoop.hbase.DoNotRetryIOException(
               "No such column family " + Bytes.toStringBinary(familyName));
@@ -5697,7 +5657,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       for (Pair<byte[], String> p : familyPaths) {
         byte[] familyName = p.getFirst();
         String path = p.getSecond();
-        HStore store = getHStore(familyName);
+        HStore store = getStore(familyName);
         if (!familyWithFinalPath.containsKey(familyName)) {
           familyWithFinalPath.put(familyName, new ArrayList<>());
         }
@@ -5737,7 +5697,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         for (Pair<Path, Path> p : entry.getValue()) {
           String path = p.getFirst().toString();
           Path commitedStoreFile = p.getSecond();
-          HStore store = getHStore(familyName);
+          HStore store = getStore(familyName);
           try {
             store.bulkLoadHFile(familyName, path, commitedStoreFile);
             // Note the size of the store file
@@ -5912,7 +5872,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
       try {
         for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
-          Store store = stores.get(entry.getKey());
+          HStore store = stores.get(entry.getKey());
           KeyValueScanner scanner;
           try {
             scanner = store.getScanner(scan, entry.getValue(), this.readPt);
@@ -7145,7 +7105,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
                 // If no WAL, need to stamp it here.
                 CellUtil.setSequenceId(cell, sequenceId);
               }
-              applyToMemstore(getHStore(cell), cell, memstoreSize);
+              applyToMemstore(getStore(cell), cell, memstoreSize);
             }
           }
 
@@ -7296,7 +7256,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           return returnResults? cpResult: null;
         }
         Durability effectiveDurability = getEffectiveDurability(mutation.getDurability());
-        Map<Store, List<Cell>> forMemStore = new HashMap<>(mutation.getFamilyCellMap().size());
+        Map<HStore, List<Cell>> forMemStore = new HashMap<>(mutation.getFamilyCellMap().size());
         // Reckon Cells to apply to WAL --  in returned walEdit -- and what to add to memstore and
         // what to return back to the client (in 'forMemStore' and 'results' respectively).
         WALEdit walEdit = reckonDeltas(op, mutation, effectiveDurability, forMemStore, results);
@@ -7311,7 +7271,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           updateSequenceId(forMemStore.values(), writeEntry.getWriteNumber());
         }
         // Now write to MemStore. Do it a column family at a time.
-        for (Map.Entry<Store, List<Cell>> e : forMemStore.entrySet()) {
+        for (Map.Entry<HStore, List<Cell>> e : forMemStore.entrySet()) {
           applyToMemstore(e.getKey(), e.getValue(), true, memstoreSize);
         }
         mvcc.completeAndWait(writeEntry);
@@ -7419,18 +7379,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @param forMemStore Fill in here what to apply to the MemStore (by Store).
    * @return A WALEdit to apply to WAL or null if we are to skip the WAL.
    */
-  private WALEdit reckonDeltas(final Operation op, final Mutation mutation,
-      final Durability effectiveDurability, final Map<Store, List<Cell>> forMemStore,
-      final List<Cell> results)
-  throws IOException {
+  private WALEdit reckonDeltas(Operation op, Mutation mutation, Durability effectiveDurability,
+      Map<HStore, List<Cell>> forMemStore, List<Cell> results) throws IOException {
     WALEdit walEdit = null;
     long now = EnvironmentEdgeManager.currentTime();
     final boolean writeToWAL = effectiveDurability != Durability.SKIP_WAL;
     // Process a Store/family at a time.
     for (Map.Entry<byte [], List<Cell>> entry: mutation.getFamilyCellMap().entrySet()) {
-      final byte [] columnFamilyName = entry.getKey();
+      final byte[] columnFamilyName = entry.getKey();
       List<Cell> deltas = entry.getValue();
-      Store store = this.stores.get(columnFamilyName);
+      HStore store = this.stores.get(columnFamilyName);
       // Reckon for the Store what to apply to WAL and MemStore.
       List<Cell> toApply =
         reckonDeltasByStore(store, op, mutation, effectiveDurability, now, deltas, results);
@@ -7462,11 +7420,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @return Resulting Cells after <code>deltas</code> have been applied to current
    *  values. Side effect is our filling out of the <code>results</code> List.
    */
-  private List<Cell> reckonDeltasByStore(final Store store, final Operation op,
-      final Mutation mutation, final Durability effectiveDurability, final long now,
-      final List<Cell> deltas, final List<Cell> results)
-  throws IOException {
-    byte [] columnFamily = store.getColumnFamilyDescriptor().getName();
+  private List<Cell> reckonDeltasByStore(HStore store, Operation op, Mutation mutation,
+      Durability effectiveDurability, long now, List<Cell> deltas, List<Cell> results)
+      throws IOException {
+    byte[] columnFamily = store.getColumnFamilyDescriptor().getName();
     List<Cell> toApply = new ArrayList<>(deltas.size());
     // Get previous values for all columns in this family.
     List<Cell> currentValues = get(mutation, store, deltas,
@@ -7576,9 +7533,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @param coordinates Cells from <code>mutation</code> used as coordinates applied to Get.
    * @return Return list of Cells found.
    */
-  private List<Cell> get(final Mutation mutation, final Store store,
-          final List<Cell> coordinates, final IsolationLevel isolation, final TimeRange tr)
-  throws IOException {
+  private List<Cell> get(Mutation mutation, HStore store, List<Cell> coordinates,
+      IsolationLevel isolation, TimeRange tr) throws IOException {
     // Sort the cells so that they match the order that they appear in the Get results. Otherwise,
     // we won't be able to find the existing values if the cells are not specified in order by the
     // client since cells are in an array list.
@@ -7653,12 +7609,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   @Override
   public long heapSize() {
-    long heapSize = DEEP_OVERHEAD;
-    for (Store store : this.stores.values()) {
-      heapSize += store.heapSize();
-    }
     // this does not take into account row locks, recent flushes, mvcc entries, and more
-    return heapSize;
+    return DEEP_OVERHEAD + stores.values().stream().mapToLong(HStore::heapSize).sum();
   }
 
   @Override
@@ -7813,14 +7765,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @return The priority that this region should have in the compaction queue
    */
   public int getCompactPriority() {
-    int count = Integer.MAX_VALUE;
-    for (Store store : stores.values()) {
-      count = Math.min(count, store.getCompactPriority());
-    }
-    return count;
+    return stores.values().stream().mapToInt(HStore::getCompactPriority).min()
+        .orElse(Store.NO_PRIORITY);
   }
 
-
   /** @return the coprocessor host */
   @Override
   public RegionCoprocessorHost getCoprocessorHost() {
@@ -7881,11 +7829,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     // The unit for snapshot is a region. So, all stores for this region must be
     // prepared for snapshot operation before proceeding.
     if (op == Operation.SNAPSHOT) {
-      for (Store store : stores.values()) {
-        if (store instanceof HStore) {
-          ((HStore)store).preSnapshotOperation();
-        }
-      }
+      stores.values().forEach(HStore::preSnapshotOperation);
     }
     try {
       if (coprocessorHost != null) {
@@ -7905,11 +7849,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   @Override
   public void closeRegionOperation(Operation operation) throws IOException {
     if (operation == Operation.SNAPSHOT) {
-      for (Store store: stores.values()) {
-        if (store instanceof HStore) {
-          ((HStore)store).postSnapshotOperation();
-        }
-      }
+      stores.values().forEach(HStore::postSnapshotOperation);
     }
     lock.readLock().unlock();
     if (coprocessorHost != null) {
@@ -8142,9 +8082,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   @Override
   public void registerChildren(ConfigurationManager manager) {
     configurationManager = Optional.of(manager);
-    for (Store s : this.stores.values()) {
-      configurationManager.get().registerObserver(s);
-    }
+    stores.values().forEach(manager::registerObserver);
   }
 
   /**
@@ -8152,9 +8090,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    */
   @Override
   public void deregisterChildren(ConfigurationManager manager) {
-    for (Store s : this.stores.values()) {
-      configurationManager.get().deregisterObserver(s);
-    }
+    stores.values().forEach(configurationManager.get()::deregisterObserver);
   }
 
   @Override
@@ -8175,7 +8111,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     buf.append(getRegionInfo().isMetaRegion() ? " meta region " : " ");
     buf.append(getRegionInfo().isMetaTable() ? " meta table " : " ");
     buf.append("stores: ");
-    for (Store s : getStores()) {
+    for (HStore s : stores.values()) {
       buf.append(s.getColumnFamilyDescriptor().getNameAsString());
       buf.append(" size: ");
       buf.append(s.getSizeOfMemStore().getDataSize());
@@ -8188,4 +8124,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       throw new RuntimeException(buf.toString());
     }
   }
+
+  @Override
+  public void requestCompaction(String why, int priority, CompactionLifeCycleTracker tracker,
+      User user) throws IOException {
+    ((HRegionServer) rsServices).compactSplitThread.requestCompaction(this, why, priority, tracker,
+      user);
+  }
+
+  @Override
+  public void requestCompaction(byte[] family, String why, int priority,
+      CompactionLifeCycleTracker tracker, User user) throws IOException {
+    ((HRegionServer) rsServices).compactSplitThread.requestCompaction(this,
+      Preconditions.checkNotNull(stores.get(family)), why, priority, tracker, user);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 6bbff36..62987c0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -56,8 +56,8 @@ import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 import javax.servlet.http.HttpServlet;
 
-import org.apache.commons.lang3.SystemUtils;
 import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.SystemUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -127,6 +127,7 @@ import org.apache.hadoop.hbase.quotas.QuotaUtil;
 import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
 import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
@@ -140,6 +141,9 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
 import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingRpcChannel;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
@@ -210,10 +214,6 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.data.Stat;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps;
-
 import sun.misc.Signal;
 import sun.misc.SignalHandler;
 
@@ -1686,7 +1686,7 @@ public class HRegionServer extends HasThread implements
     int totalStaticBloomSizeKB = 0;
     long totalCompactingKVs = 0;
     long currentCompactedKVs = 0;
-    List<Store> storeList = r.getStores();
+    List<? extends Store> storeList = r.getStores();
     stores += storeList.size();
     for (Store store : storeList) {
       storefiles += store.getStorefilesCount();
@@ -1772,27 +1772,32 @@ public class HRegionServer extends HasThread implements
     @Override
     protected void chore() {
       for (Region r : this.instance.onlineRegions.values()) {
-        if (r == null)
+        if (r == null) {
           continue;
-        for (Store s : r.getStores()) {
+        }
+        HRegion hr = (HRegion) r;
+        for (HStore s : hr.stores.values()) {
           try {
             long multiplier = s.getCompactionCheckMultiplier();
             assert multiplier > 0;
-            if (iteration % multiplier != 0) continue;
+            if (iteration % multiplier != 0) {
+              continue;
+            }
             if (s.needsCompaction()) {
               // Queue a compaction. Will recognize if major is needed.
-              this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
-                  + " requests compaction");
+              this.instance.compactSplitThread.requestSystemCompaction(hr, s,
+                getName() + " requests compaction");
             } else if (s.isMajorCompaction()) {
               s.triggerMajorCompaction();
-              if (majorCompactPriority == DEFAULT_PRIORITY
-                  || majorCompactPriority > ((HRegion)r).getCompactPriority()) {
-                this.instance.compactSplitThread.requestCompaction(r, s, getName()
-                    + " requests major compaction; use default priority", null);
+              if (majorCompactPriority == DEFAULT_PRIORITY ||
+                  majorCompactPriority > hr.getCompactPriority()) {
+                this.instance.compactSplitThread.requestCompaction(hr, s,
+                  getName() + " requests major compaction; use default priority", Store.NO_PRIORITY,
+                  CompactionLifeCycleTracker.DUMMY, null);
               } else {
-                this.instance.compactSplitThread.requestCompaction(r, s, getName()
-                    + " requests major compaction; use configured priority",
-                  this.majorCompactPriority, null, null);
+                this.instance.compactSplitThread.requestCompaction(hr, s,
+                  getName() + " requests major compaction; use configured priority",
+                  this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null);
               }
             }
           } catch (IOException e) {
@@ -2146,15 +2151,14 @@ public class HRegionServer extends HasThread implements
   @Override
   public void postOpenDeployTasks(final PostOpenDeployContext context)
       throws KeeperException, IOException {
-    Region r = context.getRegion();
+    HRegion r = (HRegion) context.getRegion();
     long masterSystemTime = context.getMasterSystemTime();
-    Preconditions.checkArgument(r instanceof HRegion, "r must be an HRegion");
     rpcServices.checkOpen();
     LOG.info("Post open deploy tasks for " + r.getRegionInfo().getRegionNameAsString());
     // Do checks to see if we need to compact (references or too many files)
-    for (Store s : r.getStores()) {
+    for (HStore s : r.stores.values()) {
       if (s.hasReferences() || s.needsCompaction()) {
-       this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
+        this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
       }
     }
     long openSeqNum = r.getOpenSeqNum();
@@ -2863,11 +2867,6 @@ public class HRegionServer extends HasThread implements
     return serverName;
   }
 
-  @Override
-  public CompactionRequestor getCompactionRequester() {
-    return this.compactSplitThread;
-  }
-
   public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){
     return this.rsHost;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index f011c18..daad241 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -30,6 +30,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
@@ -52,13 +53,12 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CompoundConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.MemoryCompactionPolicy;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.FailedArchiveException;
-import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.conf.ConfigurationManager;
 import org.apache.hadoop.hbase.io.compress.Compression;
@@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
@@ -82,8 +83,6 @@ import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.security.EncryptionUtil;
 import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -92,14 +91,16 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
+import org.apache.yetus.audience.InterfaceAudience;
 
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableCollection;
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
 
 /**
  * A Store holds a column family in a Region.  Its a memstore and a set of zero
@@ -477,7 +478,7 @@ public class HStore implements Store {
   /**
    * @param tabledir {@link Path} to where the table is being stored
    * @param hri {@link HRegionInfo} for the region.
-   * @param family {@link HColumnDescriptor} describing the column family
+   * @param family {@link ColumnFamilyDescriptor} describing the column family
    * @return Path to family/Store home directory.
    */
   @Deprecated
@@ -489,7 +490,7 @@ public class HStore implements Store {
   /**
    * @param tabledir {@link Path} to where the table is being stored
    * @param encodedName Encoded region name.
-   * @param family {@link HColumnDescriptor} describing the column family
+   * @param family {@link ColumnFamilyDescriptor} describing the column family
    * @return Path to family/Store home directory.
    */
   @Deprecated
@@ -1386,15 +1387,14 @@ public class HStore implements Store {
     }
   }
 
-  private List<StoreFile> moveCompatedFilesIntoPlace(
-      final CompactionRequest cr, List<Path> newFiles, User user) throws IOException {
+  private List<StoreFile> moveCompatedFilesIntoPlace(CompactionRequest cr, List<Path> newFiles,
+      User user) throws IOException {
     List<StoreFile> sfs = new ArrayList<>(newFiles.size());
     for (Path newFile : newFiles) {
       assert newFile != null;
-      final StoreFile sf = moveFileIntoPlace(newFile);
+      StoreFile sf = moveFileIntoPlace(newFile);
       if (this.getCoprocessorHost() != null) {
-        final Store thisStore = this;
-        getCoprocessorHost().postCompact(thisStore, sf, cr, user);
+        getCoprocessorHost().postCompact(this, sf, cr.getTracker(), user);
       }
       assert sf != null;
       sfs.add(sf);
@@ -1636,23 +1636,12 @@ public class HStore implements Store {
   }
 
   @Override
-  public CompactionContext requestCompaction() throws IOException {
-    return requestCompaction(Store.NO_PRIORITY, null);
-  }
-
-  @Override
-  public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
-      throws IOException {
-    return requestCompaction(priority, baseRequest, null);
-  }
-  @Override
-  public CompactionContext requestCompaction(int priority, final CompactionRequest baseRequest,
-      User user) throws IOException {
+  public Optional<CompactionContext> requestCompaction(int priority,
+      CompactionLifeCycleTracker tracker, User user) throws IOException {
     // don't even select for compaction if writes are disabled
     if (!this.areWritesEnabled()) {
-      return null;
+      return Optional.empty();
     }
-
     // Before we do compaction, try to get rid of unneeded files to simplify things.
     removeUnneededFiles();
 
@@ -1666,7 +1655,7 @@ public class HStore implements Store {
           final List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
           boolean override = false;
           override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc,
-              baseRequest, user);
+            tracker, user);
           if (override) {
             // Coprocessor is overriding normal file selection.
             compaction.forceSelect(new CompactionRequest(candidatesForCoproc));
@@ -1695,21 +1684,13 @@ public class HStore implements Store {
         }
         if (this.getCoprocessorHost() != null) {
           this.getCoprocessorHost().postCompactSelection(
-              this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest, user);
-        }
-
-        // Selected files; see if we have a compaction with some custom base request.
-        if (baseRequest != null) {
-          // Update the request with what the system thinks the request should be;
-          // its up to the request if it wants to listen.
-          compaction.forceSelect(
-              baseRequest.combineWith(compaction.getRequest()));
+              this, ImmutableList.copyOf(compaction.getRequest().getFiles()), tracker, user);
         }
         // Finally, we have the resulting files list. Check if we have any files at all.
         request = compaction.getRequest();
-        final Collection<StoreFile> selectedFiles = request.getFiles();
+        Collection<StoreFile> selectedFiles = request.getFiles();
         if (selectedFiles.isEmpty()) {
-          return null;
+          return Optional.empty();
         }
 
         addToCompactingFiles(selectedFiles);
@@ -1721,6 +1702,7 @@ public class HStore implements Store {
         // Set priority, either override value supplied by caller or from store.
         request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
         request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
+        request.setTracker(tracker);
       }
     } finally {
       this.lock.readLock().unlock();
@@ -1730,7 +1712,7 @@ public class HStore implements Store {
         + ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction"
         + (request.isAllFiles() ? " (all files)" : ""));
     this.region.reportCompactionRequestStart(request.isMajor());
-    return compaction;
+    return Optional.of(compaction);
   }
 
   /** Adds the files to compacting files. filesCompacting must be locked. */

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index 020142d..8fa686c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -20,11 +20,8 @@ package org.apache.hadoop.hbase.regionserver;
 
 import static org.apache.hadoop.util.StringUtils.humanReadableInt;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
-
 import java.io.IOException;
 import java.lang.Thread.UncaughtExceptionHandler;
-import java.lang.management.MemoryType;
 import java.util.ArrayList;
 import java.util.ConcurrentModificationException;
 import java.util.HashMap;
@@ -50,6 +47,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.HasThread;
@@ -448,8 +446,8 @@ class MemStoreFlusher implements FlushRequester {
             "store files; delaying flush up to " + this.blockingWaitTime + "ms");
           if (!this.server.compactSplitThread.requestSplit(region)) {
             try {
-              this.server.compactSplitThread.requestSystemCompaction(
-                  region, Thread.currentThread().getName());
+              this.server.compactSplitThread.requestSystemCompaction((HRegion) region,
+                Thread.currentThread().getName());
             } catch (IOException e) {
               e = e instanceof RemoteException ?
                       ((RemoteException)e).unwrapRemoteException() : e;
@@ -503,8 +501,8 @@ class MemStoreFlusher implements FlushRequester {
       if (shouldSplit) {
         this.server.compactSplitThread.requestSplit(region);
       } else if (shouldCompact) {
-        server.compactSplitThread.requestSystemCompaction(
-            region, Thread.currentThread().getName());
+        server.compactSplitThread.requestSystemCompaction((HRegion) region,
+          Thread.currentThread().getName());
       }
     } catch (DroppedSnapshotException ex) {
       // Cache flush can fail in a few places. If it fails in a critical

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
index 2611f69..e30ed8e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
@@ -761,7 +761,7 @@ class MetricsRegionServerWrapperImpl
           tempCheckAndMutateChecksFailed += r.getCheckAndMutateChecksFailed();
           tempCheckAndMutateChecksPassed += r.getCheckAndMutateChecksPassed();
           tempBlockedRequestsCount += r.getBlockedRequestsCount();
-          List<Store> storeList = r.getStores();
+          List<? extends Store> storeList = r.getStores();
           tempNumStores += storeList.size();
           for (Store store : storeList) {
             tempNumStoreFiles += store.getStorefilesCount();

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
index 667b46c..dc7d3cb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
@@ -95,7 +95,7 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
 
   @Override
   public long getNumStores() {
-    Map<byte[],Store> stores = this.region.stores;
+    Map<byte[], HStore> stores = this.region.stores;
     if (stores == null) {
       return 0;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 02662c4..61c725b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -84,6 +84,7 @@ import org.apache.hadoop.hbase.regionserver.Leases.Lease;
 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
 import org.apache.hadoop.hbase.regionserver.Region.Operation;
 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
 import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
 import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
@@ -1538,7 +1539,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     try {
       checkOpen();
       requestCount.increment();
-      Region region = getRegion(request.getRegion());
+      HRegion region = (HRegion) getRegion(request.getRegion());
       // Quota support is enabled, the requesting user is not system/super user
       // and a quota policy is enforced that disables compactions.
       if (QuotaUtil.isQuotaEnabled(getConfiguration()) &&
@@ -1552,7 +1553,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
       boolean major = false;
       byte [] family = null;
-      Store store = null;
+      HStore store = null;
       if (request.hasFamily()) {
         family = request.getFamily().toByteArray();
         store = region.getStore(family);
@@ -1579,12 +1580,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           + region.getRegionInfo().getRegionNameAsString() + familyLogMsg);
       }
       String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg;
-      if(family != null) {
-        regionServer.compactSplitThread.requestCompaction(region, store, log,
-          Store.PRIORITY_USER, null, RpcServer.getRequestUser());
+      if (family != null) {
+        regionServer.compactSplitThread.requestCompaction(region, store, log, Store.PRIORITY_USER,
+          CompactionLifeCycleTracker.DUMMY, RpcServer.getRequestUser());
       } else {
-        regionServer.compactSplitThread.requestCompaction(region, log,
-          Store.PRIORITY_USER, null, RpcServer.getRequestUser());
+        regionServer.compactSplitThread.requestCompaction(region, log, Store.PRIORITY_USER,
+          CompactionLifeCycleTracker.DUMMY, RpcServer.getRequestUser());
       }
       return CompactRegionResponse.newBuilder().build();
     } catch (IOException ie) {
@@ -1606,7 +1607,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     try {
       checkOpen();
       requestCount.increment();
-      Region region = getRegion(request.getRegion());
+      HRegion region = (HRegion) getRegion(request.getRegion());
       LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString());
       boolean shouldFlush = true;
       if (request.hasIfOlderThanTs()) {
@@ -1617,8 +1618,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         boolean writeFlushWalMarker =  request.hasWriteFlushWalMarker() ?
             request.getWriteFlushWalMarker() : false;
         // Go behind the curtain so we can manage writing of the flush WAL marker
-        HRegion.FlushResultImpl flushResult = (HRegion.FlushResultImpl)
-            ((HRegion)region).flushcache(true, writeFlushWalMarker);
+        HRegion.FlushResultImpl flushResult = region.flushcache(true, writeFlushWalMarker);
         boolean compactionNeeded = flushResult.isCompactionNeeded();
         if (compactionNeeded) {
           regionServer.compactSplitThread.requestSystemCompaction(region,