You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/01/29 18:14:02 UTC

[09/13] hbase git commit: HBASE-14969 Add throughput controller for flush

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareCompactionThroughputController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareCompactionThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareCompactionThroughputController.java
new file mode 100644
index 0000000..c0d3b74
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareCompactionThroughputController.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.throttle;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
+
+/**
+ * A throughput controller which uses the follow schema to limit throughput
+ * <ul>
+ * <li>If compaction pressure is greater than 1.0, no limitation.</li>
+ * <li>In off peak hours, use a fixed throughput limitation
+ * {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK}</li>
+ * <li>In normal hours, the max throughput is tuned between
+ * {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND} and
+ * {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND}, using the formula &quot;lower +
+ * (higer - lower) * compactionPressure&quot;, where compactionPressure is in range [0.0, 1.0]</li>
+ * </ul>
+ * @see org.apache.hadoop.hbase.regionserver.Store#getCompactionPressure()
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class PressureAwareCompactionThroughputController extends PressureAwareThroughputController {
+
+  private final static Log LOG = LogFactory
+      .getLog(PressureAwareCompactionThroughputController.class);
+
+  public static final String HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND =
+      "hbase.hstore.compaction.throughput.higher.bound";
+
+  private static final long DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND =
+      20L * 1024 * 1024;
+
+  public static final String HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND =
+      "hbase.hstore.compaction.throughput.lower.bound";
+
+  private static final long DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND =
+      10L * 1024 * 1024;
+
+  public static final String HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK =
+      "hbase.hstore.compaction.throughput.offpeak";
+
+  private static final long DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK = Long.MAX_VALUE;
+
+  public static final String HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD =
+      "hbase.hstore.compaction.throughput.tune.period";
+
+  private static final int DEFAULT_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD = 60 * 1000;
+
+  // check compaction throughput every this size
+  private static final String HBASE_HSTORE_COMPACTION_THROUGHPUT_CONTROL_CHECK_INTERVAL =
+    "hbase.hstore.compaction.throughput.control.check.interval";
+
+  private long maxThroughputOffpeak;
+
+  @Override
+  public void setup(final RegionServerServices server) {
+    server.getChoreService().scheduleChore(
+      new ScheduledChore("CompactionThroughputTuner", this, tuningPeriod) {
+
+        @Override
+        protected void chore() {
+          tune(server.getCompactionPressure());
+        }
+      });
+  }
+
+  private void tune(double compactionPressure) {
+    double maxThroughputToSet;
+    if (compactionPressure > 1.0) {
+      // set to unlimited if some stores already reach the blocking store file count
+      maxThroughputToSet = Double.MAX_VALUE;
+    } else if (offPeakHours.isOffPeakHour()) {
+      maxThroughputToSet = maxThroughputOffpeak;
+    } else {
+      // compactionPressure is between 0.0 and 1.0, we use a simple linear formula to
+      // calculate the throughput limitation.
+      maxThroughputToSet =
+          maxThroughputLowerBound + (maxThroughputUpperBound - maxThroughputLowerBound)
+              * compactionPressure;
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("compactionPressure is " + compactionPressure + ", tune compaction throughput to "
+          + throughputDesc(maxThroughputToSet));
+    }
+    this.setMaxThroughput(maxThroughputToSet);
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+    if (conf == null) {
+      return;
+    }
+    this.maxThroughputUpperBound =
+        conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND,
+          DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND);
+    this.maxThroughputLowerBound =
+        conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND,
+          DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND);
+    this.maxThroughputOffpeak =
+        conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK,
+          DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK);
+    this.offPeakHours = OffPeakHours.getInstance(conf);
+    this.controlPerSize =
+        conf.getLong(HBASE_HSTORE_COMPACTION_THROUGHPUT_CONTROL_CHECK_INTERVAL,
+          this.maxThroughputLowerBound);
+    this.setMaxThroughput(this.maxThroughputLowerBound);
+    this.tuningPeriod =
+        getConf().getInt(HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD,
+          DEFAULT_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD);
+    LOG.info("Compaction throughput configurations, higher bound: "
+        + throughputDesc(maxThroughputUpperBound) + ", lower bound "
+        + throughputDesc(maxThroughputLowerBound) + ", off peak: "
+        + throughputDesc(maxThroughputOffpeak) + ", tuning period: " + tuningPeriod + " ms");
+  }
+
+  @Override
+  public String toString() {
+    return "DefaultCompactionThroughputController [maxThroughput="
+        + throughputDesc(getMaxThroughput()) + ", activeCompactions=" + activeOperations.size()
+        + "]";
+  }
+
+  @Override
+  protected boolean skipControl(long deltaSize, long controlSize) {
+    if (deltaSize < controlSize) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareFlushThroughputController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareFlushThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareFlushThroughputController.java
new file mode 100644
index 0000000..f301a27
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareFlushThroughputController.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.throttle;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
+
+/**
+ * A throughput controller which uses the follow schema to limit throughput
+ * <ul>
+ * <li>If flush pressure is greater than or equal to 1.0, no limitation.</li>
+ * <li>In normal case, the max throughput is tuned between
+ * {@value #HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND} and
+ * {@value #HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND}, using the formula &quot;lower +
+ * (upper - lower) * flushPressure&quot;, where flushPressure is in range [0.0, 1.0)</li>
+ * </ul>
+ * @see org.apache.hadoop.hbase.regionserver.HRegionServer#getFlushPressure()
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class PressureAwareFlushThroughputController extends PressureAwareThroughputController {
+
+  private static final Log LOG = LogFactory.getLog(PressureAwareFlushThroughputController.class);
+
+  public static final String HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND =
+      "hbase.hstore.flush.throughput.upper.bound";
+
+  private static final long DEFAULT_HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND =
+      200L * 1024 * 1024;
+
+  public static final String HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND =
+      "hbase.hstore.flush.throughput.lower.bound";
+
+  private static final long DEFAULT_HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND =
+      100L * 1024 * 1024;
+
+  public static final String HBASE_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD =
+      "hbase.hstore.flush.throughput.tune.period";
+
+  private static final int DEFAULT_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD = 20 * 1000;
+
+  // check flush throughput every this size
+  public static final String HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL =
+      "hbase.hstore.flush.throughput.control.check.interval";
+
+  private static final long DEFAULT_HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL =
+      10L * 1024 * 1024;// 10MB
+
+  @Override
+  public void setup(final RegionServerServices server) {
+    server.getChoreService().scheduleChore(
+      new ScheduledChore("FlushThroughputTuner", this, tuningPeriod, this.tuningPeriod) {
+
+        @Override
+        protected void chore() {
+          tune(server.getFlushPressure());
+        }
+      });
+  }
+
+  private void tune(double flushPressure) {
+    double maxThroughputToSet;
+    if (flushPressure >= 1.0) {
+      // set to unlimited if global memstore size already exceeds lower limit
+      maxThroughputToSet = Double.MAX_VALUE;
+    } else {
+      // flushPressure is between 0.0 and 1.0, we use a simple linear formula to
+      // calculate the throughput limitation.
+      maxThroughputToSet =
+          maxThroughputLowerBound + (maxThroughputUpperBound - maxThroughputLowerBound)
+              * flushPressure;
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("flushPressure is " + flushPressure + ", tune flush throughput to "
+          + throughputDesc(maxThroughputToSet));
+    }
+    this.setMaxThroughput(maxThroughputToSet);
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+    if (conf == null) {
+      return;
+    }
+    this.maxThroughputUpperBound =
+        conf.getLong(HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND,
+          DEFAULT_HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND);
+    this.maxThroughputLowerBound =
+        conf.getLong(HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND,
+          DEFAULT_HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND);
+    this.offPeakHours = OffPeakHours.getInstance(conf);
+    this.controlPerSize =
+        conf.getLong(HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL,
+          DEFAULT_HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL);
+    this.setMaxThroughput(this.maxThroughputLowerBound);
+    this.tuningPeriod =
+        getConf().getInt(HBASE_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD,
+          DEFAULT_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD);
+    LOG.info("Flush throughput configurations, upper bound: "
+        + throughputDesc(maxThroughputUpperBound) + ", lower bound "
+        + throughputDesc(maxThroughputLowerBound) + ", tuning period: " + tuningPeriod + " ms");
+  }
+
+  @Override
+  public String toString() {
+    return "DefaultFlushController [maxThroughput=" + throughputDesc(getMaxThroughput())
+        + ", activeFlushNumber=" + activeOperations.size() + "]";
+  }
+
+  @Override
+  protected boolean skipControl(long deltaSize, long controlSize) {
+    // for flush, we control the flow no matter whether the flush size is small
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareThroughputController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareThroughputController.java
new file mode 100644
index 0000000..c360985
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareThroughputController.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.throttle;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public abstract class PressureAwareThroughputController extends Configured implements
+    ThroughputController, Stoppable {
+  private static final Log LOG = LogFactory.getLog(PressureAwareThroughputController.class);
+
+  /**
+   * Stores the information of one controlled compaction.
+   */
+  private static final class ActiveOperation {
+
+    private final long startTime;
+
+    private long lastControlTime;
+
+    private long lastControlSize;
+
+    private long totalSize;
+
+    private long numberOfSleeps;
+
+    private long totalSleepTime;
+
+    // prevent too many debug log
+    private long lastLogTime;
+
+    ActiveOperation() {
+      long currentTime = EnvironmentEdgeManager.currentTime();
+      this.startTime = currentTime;
+      this.lastControlTime = currentTime;
+      this.lastLogTime = currentTime;
+    }
+  }
+
+  protected long maxThroughputUpperBound;
+
+  protected long maxThroughputLowerBound;
+
+  protected OffPeakHours offPeakHours;
+
+  protected long controlPerSize;
+
+  protected int tuningPeriod;
+
+  private volatile double maxThroughput;
+
+  protected final ConcurrentMap<String, ActiveOperation> activeOperations =
+      new ConcurrentHashMap<String, ActiveOperation>();
+
+  @Override
+  public abstract void setup(final RegionServerServices server);
+
+  protected String throughputDesc(long deltaSize, long elapsedTime) {
+    return throughputDesc((double) deltaSize / elapsedTime * 1000);
+  }
+
+  protected String throughputDesc(double speed) {
+    if (speed >= 1E15) { // large enough to say it is unlimited
+      return "unlimited";
+    } else {
+      return String.format("%.2f MB/sec", speed / 1024 / 1024);
+    }
+  }
+
+  @Override
+  public void start(String opName) {
+    activeOperations.put(opName, new ActiveOperation());
+  }
+
+  @Override
+  public long control(String opName, long size) throws InterruptedException {
+    ActiveOperation operation = activeOperations.get(opName);
+    operation.totalSize += size;
+    long deltaSize = operation.totalSize - operation.lastControlSize;
+    if (deltaSize < controlPerSize) {
+      return 0;
+    }
+    long now = EnvironmentEdgeManager.currentTime();
+    double maxThroughputPerCompaction = this.getMaxThroughput() / activeOperations.size();
+    long minTimeAllowed = (long) (deltaSize / maxThroughputPerCompaction * 1000); // ms
+    long elapsedTime = now - operation.lastControlTime;
+    operation.lastControlSize = operation.totalSize;
+    if (elapsedTime >= minTimeAllowed) {
+      operation.lastControlTime = EnvironmentEdgeManager.currentTime();
+      return 0;
+    }
+    // too fast
+    long sleepTime = minTimeAllowed - elapsedTime;
+    if (LOG.isDebugEnabled()) {
+      // do not log too much
+      if (now - operation.lastLogTime > 5L * 1000) {
+        LOG.debug("deltaSize: " + deltaSize + " bytes; elapseTime: " + elapsedTime + " ns");
+        LOG.debug(opName + " sleep " + sleepTime + " ms because current throughput is "
+            + throughputDesc(deltaSize, elapsedTime) + ", max allowed is "
+            + throughputDesc(maxThroughputPerCompaction) + ", already slept "
+            + operation.numberOfSleeps + " time(s) and total slept time is "
+            + operation.totalSleepTime + " ms till now.");
+        operation.lastLogTime = now;
+      }
+    }
+    Thread.sleep(sleepTime);
+    operation.numberOfSleeps++;
+    operation.totalSleepTime += sleepTime;
+    operation.lastControlTime = EnvironmentEdgeManager.currentTime();
+    return sleepTime;
+  }
+
+  /**
+   * Check whether to skip control given delta size and control size
+   * @param deltaSize Delta size since last control
+   * @param controlSize Size limit to perform control
+   * @return a boolean indicates whether to skip this control
+   */
+  protected abstract boolean skipControl(long deltaSize, long controlSize);
+
+  @Override
+  public void finish(String opName) {
+    ActiveOperation operation = activeOperations.remove(opName);
+    long elapsedTime = EnvironmentEdgeManager.currentTime() - operation.startTime;
+    LOG.info(opName + " average throughput is "
+        + throughputDesc(operation.totalSize, elapsedTime) + ", slept "
+        + operation.numberOfSleeps + " time(s) and total slept time is "
+        + operation.totalSleepTime + " ms. " + activeOperations.size()
+        + " active operations remaining, total limit is " + throughputDesc(getMaxThroughput()));
+  }
+
+  private volatile boolean stopped = false;
+
+  @Override
+  public void stop(String why) {
+    stopped = true;
+  }
+
+  @Override
+  public boolean isStopped() {
+    return stopped;
+  }
+
+  public double getMaxThroughput() {
+    return maxThroughput;
+  }
+
+  public void setMaxThroughput(double maxThroughput) {
+    this.maxThroughput = maxThroughput;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputControlUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputControlUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputControlUtil.java
new file mode 100644
index 0000000..b3c4147
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputControlUtil.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.throttle;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.Store;
+
+/**
+ * Helper methods for throttling
+ */
+@InterfaceAudience.Private
+public final class ThroughputControlUtil {
+  private ThroughputControlUtil() {
+  }
+
+  private static final AtomicInteger NAME_COUNTER = new AtomicInteger(0);
+  private static final String NAME_DELIMITER = "#";
+
+  /**
+   * Generate a name for throttling, to prevent name conflict when multiple IO operation running
+   * parallel on the same store.
+   * @param store the Store instance on which IO operation is happening
+   * @param opName Name of the IO operation, e.g. "flush", "compaction", etc.
+   * @return The name for throttling
+   */
+  public static String getNameForThrottling(final Store store, final String opName) {
+    int counter;
+    for (;;) {
+      counter = NAME_COUNTER.get();
+      int next = counter == Integer.MAX_VALUE ? 0 : counter + 1;
+      if (NAME_COUNTER.compareAndSet(counter, next)) {
+        break;
+      }
+    }
+    return store.getRegionInfo().getRegionNameAsString() + NAME_DELIMITER
+        + store.getFamily().getNameAsString() + NAME_DELIMITER + opName + NAME_DELIMITER + counter;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputController.java
new file mode 100644
index 0000000..f299f98
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputController.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.throttle;
+
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+
+/**
+ * A utility that constrains the total throughput of one or more simultaneous flows by
+ * sleeping when necessary.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public interface ThroughputController extends Stoppable {
+
+  /**
+   * Setup controller for the given region server.
+   */
+  void setup(RegionServerServices server);
+
+  /**
+   * Start the throughput controller.
+   */
+  void start(String name);
+
+  /**
+   * Control the throughput. Will sleep if too fast.
+   * @return the actual sleep time.
+   */
+  long control(String name, long size) throws InterruptedException;
+
+  /**
+   * Finish the controller. Should call this method in a finally block.
+   */
+  void finish(String name);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index 0986ad7..e634327 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
@@ -317,4 +318,13 @@ public class MockRegionServerServices implements RegionServerServices {
     // TODO Auto-generated method stub
     return null;
   }
+
+  public ThroughputController getFlushThroughputController() {
+    return null;
+  }
+
+  @Override
+  public double getFlushPressure() {
+    return 0;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index 94a63d8..35a7403 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
@@ -44,7 +43,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -122,7 +121,7 @@ public class TestIOFencing {
 
     @Override
     public boolean compact(CompactionContext compaction, Store store,
-        CompactionThroughputController throughputController) throws IOException {
+        ThroughputController throughputController) throws IOException {
       try {
         return super.compact(compaction, store, throughputController);
       } finally {
@@ -132,7 +131,7 @@ public class TestIOFencing {
 
     @Override
     public boolean compact(CompactionContext compaction, Store store,
-        CompactionThroughputController throughputController, User user) throws IOException {
+        ThroughputController throughputController, User user) throws IOException {
       try {
         return super.compact(compaction, store, throughputController, user);
       } finally {

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
index e20c4ad..5241dbe 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
@@ -60,12 +60,12 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreScanner;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -235,7 +235,7 @@ public class TestRegionObserverScannerOpenHook {
 
     @Override
     public boolean compact(CompactionContext compaction, Store store,
-        CompactionThroughputController throughputController) throws IOException {
+        ThroughputController throughputController) throws IOException {
       boolean ret = super.compact(compaction, store, throughputController);
       if (ret) compactionStateChangeLatch.countDown();
       return ret;
@@ -243,7 +243,7 @@ public class TestRegionObserverScannerOpenHook {
 
     @Override
     public boolean compact(CompactionContext compaction, Store store,
-        CompactionThroughputController throughputController, User user) throws IOException {
+        ThroughputController throughputController, User user) throws IOException {
       boolean ret = super.compact(compaction, store, throughputController, user);
       if (ret) compactionStateChangeLatch.countDown();
       return ret;

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index 32f644b..4de4a5f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -103,6 +103,7 @@ import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
@@ -662,4 +663,13 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
     // TODO Auto-generated method stub
     return null;
   }
+
+  public ThroughputController getFlushThroughputController() {
+    return null;
+  }
+
+  @Override
+  public double getFlushPressure() {
+    return 0;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java
index 022279a..f57ade9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java
@@ -22,8 +22,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
-import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
+import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.Assert;

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
index b374bdc..06b4c46 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
@@ -59,9 +59,9 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
-import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
+import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
+import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -104,7 +104,7 @@ public class TestCompaction {
     conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
     conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100);
     conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
-      NoLimitCompactionThroughputController.class.getName());
+      NoLimitThroughputController.class.getName());
     compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
 
     secondRowBytes = START_KEY_BYTES.clone();
@@ -363,13 +363,13 @@ public class TestCompaction {
       }
 
       @Override
-      public List<Path> compact(CompactionThroughputController throughputController)
+      public List<Path> compact(ThroughputController throughputController)
           throws IOException {
         return compact(throughputController, null);
       }
 
       @Override
-      public List<Path> compact(CompactionThroughputController throughputController, User user)
+      public List<Path> compact(ThroughputController throughputController, User user)
           throws IOException {
         finishCompaction(this.selectedFiles);
         return new ArrayList<Path>();
@@ -421,13 +421,13 @@ public class TestCompaction {
       }
 
       @Override
-      public List<Path> compact(CompactionThroughputController throughputController)
+      public List<Path> compact(ThroughputController throughputController)
           throws IOException {
         return compact(throughputController, null);
       }
 
       @Override
-      public List<Path> compact(CompactionThroughputController throughputController, User user)
+      public List<Path> compact(ThroughputController throughputController, User user)
           throws IOException {
         try {
           isInCompact = true;
@@ -510,10 +510,10 @@ public class TestCompaction {
     HRegion r = mock(HRegion.class);
     when(
       r.compact(any(CompactionContext.class), any(Store.class),
-        any(CompactionThroughputController.class), any(User.class))).then(new Answer<Boolean>() {
+        any(ThroughputController.class), any(User.class))).then(new Answer<Boolean>() {
       public Boolean answer(InvocationOnMock invocation) throws Throwable {
         invocation.getArgumentAt(0, CompactionContext.class).compact(
-          invocation.getArgumentAt(2, CompactionThroughputController.class));
+          invocation.getArgumentAt(2, ThroughputController.class));
         return true;
       }
     });

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
index ef02431..385048c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
@@ -59,7 +59,7 @@ import org.apache.hadoop.hbase.mob.MobConstants;
 import org.apache.hadoop.hbase.mob.MobUtils;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
-import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
 import org.apache.hadoop.hbase.security.EncryptionUtil;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -537,7 +537,7 @@ public class TestHMobStore {
     // Trigger major compaction
     this.store.triggerMajorCompaction();
     CompactionContext requestCompaction = this.store.requestCompaction(1, null);
-    this.store.compact(requestCompaction, NoLimitCompactionThroughputController.INSTANCE);
+    this.store.compact(requestCompaction, NoLimitThroughputController.INSTANCE);
     Assert.assertEquals(1, this.store.getStorefiles().size());
 
     //Check encryption after compaction

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
index 382193b..85b2a9d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
@@ -70,7 +70,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescripto
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
 import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
 import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
-import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -314,7 +314,7 @@ public class TestHRegionReplayEvents {
     // compaction from primary
     LOG.info("-- Compacting primary, only 1 store");
     primaryRegion.compactStore(Bytes.toBytes("cf1"),
-      NoLimitCompactionThroughputController.INSTANCE);
+      NoLimitThroughputController.INSTANCE);
 
     // now replay the edits and the flush marker
     reader = createWALReaderForPrimary();

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
index 41fbae6..fe620e7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
@@ -85,7 +85,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
-import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -259,7 +259,7 @@ public class TestSplitTransactionOnCluster {
     region.initialize();
 
     // 2, Run Compaction cc
-    assertFalse(region.compact(cc, store, NoLimitCompactionThroughputController.INSTANCE));
+    assertFalse(region.compact(cc, store, NoLimitThroughputController.INSTANCE));
     assertTrue(fileNum > store.getStorefiles().size());
 
     // 3, Split

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
index 9e846c6..354ea2d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
@@ -72,7 +72,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
-import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.security.User;
@@ -382,7 +382,7 @@ public class TestStore {
     Assert.assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS);
 
     // after compact; check the lowest time stamp
-    store.compact(store.requestCompaction(), NoLimitCompactionThroughputController.INSTANCE);
+    store.compact(store.requestCompaction(), NoLimitThroughputController.INSTANCE);
     lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
     lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
     Assert.assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
index eb8513b..cb586f3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
@@ -52,8 +52,8 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
 import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
+import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -133,7 +133,7 @@ public class TestStripeCompactor {
     StripeCompactor sc = createCompactor(writers, input);
     List<Path> paths =
         sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom, majorTo,
-          NoLimitCompactionThroughputController.INSTANCE);
+          NoLimitThroughputController.INSTANCE);
     writers.verifyKvs(output, allFiles, true);
     if (allFiles) {
       assertEquals(output.length, paths.size());
@@ -170,7 +170,7 @@ public class TestStripeCompactor {
     StripeCompactor sc = createCompactor(writers, input);
     List<Path> paths =
         sc.compact(createDummyRequest(), targetCount, targetSize, left, right, null, null,
-          NoLimitCompactionThroughputController.INSTANCE);
+          NoLimitThroughputController.INSTANCE);
     assertEquals(output.length, paths.size());
     writers.verifyKvs(output, true, true);
     List<byte[]> boundaries = new ArrayList<byte[]>();

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java
index 1454aa8..6b641c1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java
@@ -37,10 +37,10 @@ import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
 import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
 import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -77,7 +77,7 @@ public class TestStripeStoreEngine {
     when(
       mockCompactor.compact(any(CompactionRequest.class), anyInt(), anyLong(), any(byte[].class),
         any(byte[].class), any(byte[].class), any(byte[].class),
-        any(CompactionThroughputController.class), any(User.class)))
+        any(ThroughputController.class), any(User.class)))
         .thenReturn(new ArrayList<Path>());
 
     // Produce 3 L0 files.
@@ -96,10 +96,10 @@ public class TestStripeStoreEngine {
     assertEquals(2, compaction.getRequest().getFiles().size());
     assertFalse(compaction.getRequest().getFiles().contains(sf));
     // Make sure the correct method it called on compactor.
-    compaction.compact(NoLimitCompactionThroughputController.INSTANCE);
+    compaction.compact(NoLimitThroughputController.INSTANCE);
     verify(mockCompactor, times(1)).compact(compaction.getRequest(), targetCount, 0L,
       StripeStoreFileManager.OPEN_KEY, StripeStoreFileManager.OPEN_KEY, null, null,
-      NoLimitCompactionThroughputController.INSTANCE, null);
+      NoLimitThroughputController.INSTANCE, null);
   }
 
   private static StoreFile createFile() throws Exception {

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactionWithThroughputController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactionWithThroughputController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactionWithThroughputController.java
deleted file mode 100644
index 4456ef2..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactionWithThroughputController.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver.compactions;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Random;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.HStore;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.StoreEngine;
-import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
-import org.apache.hadoop.hbase.regionserver.StripeStoreEngine;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.RegionServerTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.JVMClusterUtil;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({ RegionServerTests.class, MediumTests.class })
-public class TestCompactionWithThroughputController {
-
-  private static final Log LOG = LogFactory.getLog(TestCompactionWithThroughputController.class);
-
-  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-
-  private static final double EPSILON = 1E-6;
-
-  private final TableName tableName = TableName.valueOf(getClass().getSimpleName());
-
-  private final byte[] family = Bytes.toBytes("f");
-
-  private final byte[] qualifier = Bytes.toBytes("q");
-
-  private Store getStoreWithName(TableName tableName) {
-    MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
-    List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads();
-    for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
-      HRegionServer hrs = rsts.get(i).getRegionServer();
-      for (Region region : hrs.getOnlineRegions(tableName)) {
-        return region.getStores().iterator().next();
-      }
-    }
-    return null;
-  }
-
-  private Store prepareData() throws IOException {
-    Admin admin = TEST_UTIL.getHBaseAdmin();
-    if (admin.tableExists(tableName)) {
-      admin.disableTable(tableName);
-      admin.deleteTable(tableName);
-    }
-    Table table = TEST_UTIL.createTable(tableName, family);
-    Random rand = new Random();
-    for (int i = 0; i < 10; i++) {
-      for (int j = 0; j < 10; j++) {
-        byte[] value = new byte[128 * 1024];
-        rand.nextBytes(value);
-        table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value));
-      }
-      admin.flush(tableName);
-    }
-    return getStoreWithName(tableName);
-  }
-
-  private long testCompactionWithThroughputLimit() throws Exception {
-    long throughputLimit = 1024L * 1024;
-    Configuration conf = TEST_UTIL.getConfiguration();
-    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
-    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100);
-    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200);
-    conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
-    conf.setLong(
-      PressureAwareCompactionThroughputController
-        .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND,
-      throughputLimit);
-    conf.setLong(
-      PressureAwareCompactionThroughputController
-        .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND,
-      throughputLimit);
-    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
-      PressureAwareCompactionThroughputController.class.getName());
-    TEST_UTIL.startMiniCluster(1);
-    try {
-      Store store = prepareData();
-      assertEquals(10, store.getStorefilesCount());
-      long startTime = System.currentTimeMillis();
-      TEST_UTIL.getHBaseAdmin().majorCompact(tableName);
-      while (store.getStorefilesCount() != 1) {
-        Thread.sleep(20);
-      }
-      long duration = System.currentTimeMillis() - startTime;
-      double throughput = (double) store.getStorefilesSize() / duration * 1000;
-      // confirm that the speed limit work properly(not too fast, and also not too slow)
-      // 20% is the max acceptable error rate.
-      assertTrue(throughput < throughputLimit * 1.2);
-      assertTrue(throughput > throughputLimit * 0.8);
-      return System.currentTimeMillis() - startTime;
-    } finally {
-      TEST_UTIL.shutdownMiniCluster();
-    }
-  }
-
-  private long testCompactionWithoutThroughputLimit() throws Exception {
-    Configuration conf = TEST_UTIL.getConfiguration();
-    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
-    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100);
-    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200);
-    conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
-    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
-      NoLimitCompactionThroughputController.class.getName());
-    TEST_UTIL.startMiniCluster(1);
-    try {
-      Store store = prepareData();
-      assertEquals(10, store.getStorefilesCount());
-      long startTime = System.currentTimeMillis();
-      TEST_UTIL.getHBaseAdmin().majorCompact(tableName);
-      while (store.getStorefilesCount() != 1) {
-        Thread.sleep(20);
-      }
-      return System.currentTimeMillis() - startTime;
-    } finally {
-      TEST_UTIL.shutdownMiniCluster();
-    }
-  }
-
-  @Test
-  public void testCompaction() throws Exception {
-    long limitTime = testCompactionWithThroughputLimit();
-    long noLimitTime = testCompactionWithoutThroughputLimit();
-    LOG.info("With 1M/s limit, compaction use " + limitTime + "ms; without limit, compaction use "
-        + noLimitTime + "ms");
-    // usually the throughput of a compaction without limitation is about 40MB/sec at least, so this
-    // is a very weak assumption.
-    assertTrue(limitTime > noLimitTime * 2);
-  }
-
-  /**
-   * Test the tuning task of {@link PressureAwareCompactionThroughputController}
-   */
-  @Test
-  public void testThroughputTuning() throws Exception {
-    Configuration conf = TEST_UTIL.getConfiguration();
-    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
-    conf.setLong(
-      PressureAwareCompactionThroughputController
-        .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND,
-      20L * 1024 * 1024);
-    conf.setLong(
-      PressureAwareCompactionThroughputController
-        .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND,
-      10L * 1024 * 1024);
-    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 4);
-    conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 6);
-    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
-      PressureAwareCompactionThroughputController.class.getName());
-    conf.setInt(
-      PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD,
-      1000);
-    TEST_UTIL.startMiniCluster(1);
-    Connection conn = ConnectionFactory.createConnection(conf);
-    try {
-      HTableDescriptor htd = new HTableDescriptor(tableName);
-      htd.addFamily(new HColumnDescriptor(family));
-      htd.setCompactionEnabled(false);
-      TEST_UTIL.getHBaseAdmin().createTable(htd);
-      TEST_UTIL.waitTableAvailable(tableName);
-      HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName);
-      PressureAwareCompactionThroughputController throughputController =
-          (PressureAwareCompactionThroughputController) regionServer.compactSplitThread
-              .getCompactionThroughputController();
-      assertEquals(10L * 1024 * 1024, throughputController.maxThroughput, EPSILON);
-      Table table = conn.getTable(tableName);
-      for (int i = 0; i < 5; i++) {
-        byte[] value = new byte[0];
-        table.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, value));
-        TEST_UTIL.flush(tableName);
-      }
-      Thread.sleep(2000);
-      assertEquals(15L * 1024 * 1024, throughputController.maxThroughput, EPSILON);
-
-      byte[] value1 = new byte[0];
-      table.put(new Put(Bytes.toBytes(5)).addColumn(family, qualifier, value1));
-      TEST_UTIL.flush(tableName);
-      Thread.sleep(2000);
-      assertEquals(20L * 1024 * 1024, throughputController.maxThroughput, EPSILON);
-
-      byte[] value = new byte[0];
-      table.put(new Put(Bytes.toBytes(6)).addColumn(family, qualifier, value));
-      TEST_UTIL.flush(tableName);
-      Thread.sleep(2000);
-      assertEquals(Double.MAX_VALUE, throughputController.maxThroughput, EPSILON);
-
-      conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
-        NoLimitCompactionThroughputController.class.getName());
-      regionServer.compactSplitThread.onConfigurationChange(conf);
-      assertTrue(throughputController.isStopped());
-      assertTrue(regionServer.compactSplitThread.getCompactionThroughputController()
-        instanceof NoLimitCompactionThroughputController);
-    } finally {
-      conn.close();
-      TEST_UTIL.shutdownMiniCluster();
-    }
-  }
-
-  /**
-   * Test the logic that we calculate compaction pressure for a striped store.
-   */
-  @Test
-  public void testGetCompactionPressureForStripedStore() throws Exception {
-    Configuration conf = TEST_UTIL.getConfiguration();
-    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName());
-    conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, false);
-    conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, 2);
-    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 4);
-    conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 12);
-    TEST_UTIL.startMiniCluster(1);
-    Connection conn = ConnectionFactory.createConnection(conf);
-    try {
-      HTableDescriptor htd = new HTableDescriptor(tableName);
-      htd.addFamily(new HColumnDescriptor(family));
-      htd.setCompactionEnabled(false);
-      TEST_UTIL.getHBaseAdmin().createTable(htd);
-      TEST_UTIL.waitTableAvailable(tableName);
-      HStore store = (HStore) getStoreWithName(tableName);
-      assertEquals(0, store.getStorefilesCount());
-      assertEquals(0.0, store.getCompactionPressure(), EPSILON);
-      Table table = conn.getTable(tableName);
-      for (int i = 0; i < 4; i++) {
-        byte[] value1 = new byte[0];
-        table.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, value1));
-        byte[] value = new byte[0];
-        table.put(new Put(Bytes.toBytes(100 + i)).addColumn(family, qualifier, value));
-        TEST_UTIL.flush(tableName);
-      }
-      assertEquals(8, store.getStorefilesCount());
-      assertEquals(0.0, store.getCompactionPressure(), EPSILON);
-
-      byte[] value5 = new byte[0];
-      table.put(new Put(Bytes.toBytes(4)).addColumn(family, qualifier, value5));
-      byte[] value4 = new byte[0];
-      table.put(new Put(Bytes.toBytes(104)).addColumn(family, qualifier, value4));
-      TEST_UTIL.flush(tableName);
-      assertEquals(10, store.getStorefilesCount());
-      assertEquals(0.5, store.getCompactionPressure(), EPSILON);
-
-      byte[] value3 = new byte[0];
-      table.put(new Put(Bytes.toBytes(5)).addColumn(family, qualifier, value3));
-      byte[] value2 = new byte[0];
-      table.put(new Put(Bytes.toBytes(105)).addColumn(family, qualifier, value2));
-      TEST_UTIL.flush(tableName);
-      assertEquals(12, store.getStorefilesCount());
-      assertEquals(1.0, store.getCompactionPressure(), EPSILON);
-
-      byte[] value1 = new byte[0];
-      table.put(new Put(Bytes.toBytes(6)).addColumn(family, qualifier, value1));
-      byte[] value = new byte[0];
-      table.put(new Put(Bytes.toBytes(106)).addColumn(family, qualifier, value));
-      TEST_UTIL.flush(tableName);
-      assertEquals(14, store.getStorefilesCount());
-      assertEquals(2.0, store.getCompactionPressure(), EPSILON);
-    } finally {
-      conn.close();
-      TEST_UTIL.shutdownMiniCluster();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
index 56e71e8..c440a57 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager;
 import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
 import org.apache.hadoop.hbase.regionserver.TestStripeCompactor.StoreFileWritersCapture;
 import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider;
+import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -216,10 +217,10 @@ public class TestStripeCompactionPolicy {
     assertTrue(policy.needsCompactions(si, al()));
     StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
     assertEquals(si.getStorefiles(), scr.getRequest().getFiles());
-    scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
+    scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
     verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY),
       aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY),
-      any(NoLimitCompactionThroughputController.class), any(User.class));
+      any(NoLimitThroughputController.class), any(User.class));
   }
 
   @Test
@@ -469,7 +470,7 @@ public class TestStripeCompactionPolicy {
     // All the Stripes are expired, so the Compactor will not create any Writers. We need to create
     // an empty file to preserve metadata
     StripeCompactor sc = createCompactor();
-    List<Path> paths = scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
+    List<Path> paths = scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
     assertEquals(1, paths.size());
   }
 
@@ -528,7 +529,7 @@ public class TestStripeCompactionPolicy {
     assertTrue(policy.needsCompactions(si, al()));
     StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
     verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
-    scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
+    scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
     verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(new ArgumentMatcher<List<byte[]>>() {
       @Override
       public boolean matches(Object argument) {
@@ -542,7 +543,7 @@ public class TestStripeCompactionPolicy {
       }
     }), dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom),
       dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo),
-      any(NoLimitCompactionThroughputController.class), any(User.class));
+      any(NoLimitThroughputController.class), any(User.class));
   }
 
   /**
@@ -563,12 +564,12 @@ public class TestStripeCompactionPolicy {
     assertTrue(!needsCompaction || policy.needsCompactions(si, al()));
     StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
     verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
-    scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
+    scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
     verify(sc, times(1)).compact(eq(scr.getRequest()),
       count == null ? anyInt() : eq(count.intValue()),
       size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end),
       dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end),
-      any(NoLimitCompactionThroughputController.class), any(User.class));
+      any(NoLimitThroughputController.class), any(User.class));
   }
 
   /** Verify arbitrary flush. */

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestCompactionWithThroughputController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestCompactionWithThroughputController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestCompactionWithThroughputController.java
new file mode 100644
index 0000000..41975eb
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestCompactionWithThroughputController.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.throttle;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreEngine;
+import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
+import org.apache.hadoop.hbase.regionserver.StripeStoreEngine;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
+import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
+import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.PressureAwareCompactionThroughputController;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestCompactionWithThroughputController {
+
+  private static final Log LOG = LogFactory.getLog(TestCompactionWithThroughputController.class);
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static final double EPSILON = 1E-6;
+
+  private final TableName tableName = TableName.valueOf(getClass().getSimpleName());
+
+  private final byte[] family = Bytes.toBytes("f");
+
+  private final byte[] qualifier = Bytes.toBytes("q");
+
+  private Store getStoreWithName(TableName tableName) {
+    MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
+    List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads();
+    for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
+      HRegionServer hrs = rsts.get(i).getRegionServer();
+      for (Region region : hrs.getOnlineRegions(tableName)) {
+        return region.getStores().iterator().next();
+      }
+    }
+    return null;
+  }
+
+  private Store prepareData() throws IOException {
+    Admin admin = TEST_UTIL.getHBaseAdmin();
+    if (admin.tableExists(tableName)) {
+      admin.disableTable(tableName);
+      admin.deleteTable(tableName);
+    }
+    Table table = TEST_UTIL.createTable(tableName, family);
+    Random rand = new Random();
+    for (int i = 0; i < 10; i++) {
+      for (int j = 0; j < 10; j++) {
+        byte[] value = new byte[128 * 1024];
+        rand.nextBytes(value);
+        table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value));
+      }
+      admin.flush(tableName);
+    }
+    return getStoreWithName(tableName);
+  }
+
+  private long testCompactionWithThroughputLimit() throws Exception {
+    long throughputLimit = 1024L * 1024;
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
+    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100);
+    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200);
+    conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
+    conf.setLong(
+      PressureAwareCompactionThroughputController
+        .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND,
+      throughputLimit);
+    conf.setLong(
+      PressureAwareCompactionThroughputController
+        .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND,
+      throughputLimit);
+    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
+      PressureAwareCompactionThroughputController.class.getName());
+    TEST_UTIL.startMiniCluster(1);
+    try {
+      Store store = prepareData();
+      assertEquals(10, store.getStorefilesCount());
+      long startTime = System.currentTimeMillis();
+      TEST_UTIL.getHBaseAdmin().majorCompact(tableName);
+      while (store.getStorefilesCount() != 1) {
+        Thread.sleep(20);
+      }
+      long duration = System.currentTimeMillis() - startTime;
+      double throughput = (double) store.getStorefilesSize() / duration * 1000;
+      // confirm that the speed limit work properly(not too fast, and also not too slow)
+      // 20% is the max acceptable error rate.
+      assertTrue(throughput < throughputLimit * 1.2);
+      assertTrue(throughput > throughputLimit * 0.8);
+      return System.currentTimeMillis() - startTime;
+    } finally {
+      TEST_UTIL.shutdownMiniCluster();
+    }
+  }
+
+  private long testCompactionWithoutThroughputLimit() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
+    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100);
+    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200);
+    conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
+    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
+      NoLimitThroughputController.class.getName());
+    TEST_UTIL.startMiniCluster(1);
+    try {
+      Store store = prepareData();
+      assertEquals(10, store.getStorefilesCount());
+      long startTime = System.currentTimeMillis();
+      TEST_UTIL.getHBaseAdmin().majorCompact(tableName);
+      while (store.getStorefilesCount() != 1) {
+        Thread.sleep(20);
+      }
+      return System.currentTimeMillis() - startTime;
+    } finally {
+      TEST_UTIL.shutdownMiniCluster();
+    }
+  }
+
+  @Test
+  public void testCompaction() throws Exception {
+    long limitTime = testCompactionWithThroughputLimit();
+    long noLimitTime = testCompactionWithoutThroughputLimit();
+    LOG.info("With 1M/s limit, compaction use " + limitTime + "ms; without limit, compaction use "
+        + noLimitTime + "ms");
+    // usually the throughput of a compaction without limitation is about 40MB/sec at least, so this
+    // is a very weak assumption.
+    assertTrue(limitTime > noLimitTime * 2);
+  }
+
+  /**
+   * Test the tuning task of {@link PressureAwareCompactionThroughputController}
+   */
+  @Test
+  public void testThroughputTuning() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
+    conf.setLong(
+      PressureAwareCompactionThroughputController
+        .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND,
+      20L * 1024 * 1024);
+    conf.setLong(
+      PressureAwareCompactionThroughputController
+        .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND,
+      10L * 1024 * 1024);
+    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 4);
+    conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 6);
+    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
+      PressureAwareCompactionThroughputController.class.getName());
+    conf.setInt(
+      PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD,
+      1000);
+    TEST_UTIL.startMiniCluster(1);
+    Connection conn = ConnectionFactory.createConnection(conf);
+    try {
+      HTableDescriptor htd = new HTableDescriptor(tableName);
+      htd.addFamily(new HColumnDescriptor(family));
+      htd.setCompactionEnabled(false);
+      TEST_UTIL.getHBaseAdmin().createTable(htd);
+      TEST_UTIL.waitTableAvailable(tableName);
+      HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName);
+      PressureAwareCompactionThroughputController throughputController =
+          (PressureAwareCompactionThroughputController) regionServer.compactSplitThread
+              .getCompactionThroughputController();
+      assertEquals(10L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON);
+      Table table = conn.getTable(tableName);
+      for (int i = 0; i < 5; i++) {
+        byte[] value = new byte[0];
+        table.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, value));
+        TEST_UTIL.flush(tableName);
+      }
+      Thread.sleep(2000);
+      assertEquals(15L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON);
+
+      byte[] value1 = new byte[0];
+      table.put(new Put(Bytes.toBytes(5)).addColumn(family, qualifier, value1));
+      TEST_UTIL.flush(tableName);
+      Thread.sleep(2000);
+      assertEquals(20L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON);
+
+      byte[] value = new byte[0];
+      table.put(new Put(Bytes.toBytes(6)).addColumn(family, qualifier, value));
+      TEST_UTIL.flush(tableName);
+      Thread.sleep(2000);
+      assertEquals(Double.MAX_VALUE, throughputController.getMaxThroughput(), EPSILON);
+
+      conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
+        NoLimitThroughputController.class.getName());
+      regionServer.compactSplitThread.onConfigurationChange(conf);
+      assertTrue(throughputController.isStopped());
+      assertTrue(regionServer.compactSplitThread.getCompactionThroughputController()
+        instanceof NoLimitThroughputController);
+    } finally {
+      conn.close();
+      TEST_UTIL.shutdownMiniCluster();
+    }
+  }
+
+  /**
+   * Test the logic that we calculate compaction pressure for a striped store.
+   */
+  @Test
+  public void testGetCompactionPressureForStripedStore() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName());
+    conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, false);
+    conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, 2);
+    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 4);
+    conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 12);
+    TEST_UTIL.startMiniCluster(1);
+    Connection conn = ConnectionFactory.createConnection(conf);
+    try {
+      HTableDescriptor htd = new HTableDescriptor(tableName);
+      htd.addFamily(new HColumnDescriptor(family));
+      htd.setCompactionEnabled(false);
+      TEST_UTIL.getHBaseAdmin().createTable(htd);
+      TEST_UTIL.waitTableAvailable(tableName);
+      HStore store = (HStore) getStoreWithName(tableName);
+      assertEquals(0, store.getStorefilesCount());
+      assertEquals(0.0, store.getCompactionPressure(), EPSILON);
+      Table table = conn.getTable(tableName);
+      for (int i = 0; i < 4; i++) {
+        byte[] value1 = new byte[0];
+        table.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, value1));
+        byte[] value = new byte[0];
+        table.put(new Put(Bytes.toBytes(100 + i)).addColumn(family, qualifier, value));
+        TEST_UTIL.flush(tableName);
+      }
+      assertEquals(8, store.getStorefilesCount());
+      assertEquals(0.0, store.getCompactionPressure(), EPSILON);
+
+      byte[] value5 = new byte[0];
+      table.put(new Put(Bytes.toBytes(4)).addColumn(family, qualifier, value5));
+      byte[] value4 = new byte[0];
+      table.put(new Put(Bytes.toBytes(104)).addColumn(family, qualifier, value4));
+      TEST_UTIL.flush(tableName);
+      assertEquals(10, store.getStorefilesCount());
+      assertEquals(0.5, store.getCompactionPressure(), EPSILON);
+
+      byte[] value3 = new byte[0];
+      table.put(new Put(Bytes.toBytes(5)).addColumn(family, qualifier, value3));
+      byte[] value2 = new byte[0];
+      table.put(new Put(Bytes.toBytes(105)).addColumn(family, qualifier, value2));
+      TEST_UTIL.flush(tableName);
+      assertEquals(12, store.getStorefilesCount());
+      assertEquals(1.0, store.getCompactionPressure(), EPSILON);
+
+      byte[] value1 = new byte[0];
+      table.put(new Put(Bytes.toBytes(6)).addColumn(family, qualifier, value1));
+      byte[] value = new byte[0];
+      table.put(new Put(Bytes.toBytes(106)).addColumn(family, qualifier, value));
+      TEST_UTIL.flush(tableName);
+      assertEquals(14, store.getStorefilesCount());
+      assertEquals(2.0, store.getCompactionPressure(), EPSILON);
+    } finally {
+      conn.close();
+      TEST_UTIL.shutdownMiniCluster();
+    }
+  }
+}