You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/01/29 18:14:02 UTC
[09/13] hbase git commit: HBASE-14969 Add throughput controller for
flush
http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareCompactionThroughputController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareCompactionThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareCompactionThroughputController.java
new file mode 100644
index 0000000..c0d3b74
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareCompactionThroughputController.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.throttle;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
+
+/**
+ * A throughput controller which uses the follow schema to limit throughput
+ * <ul>
+ * <li>If compaction pressure is greater than 1.0, no limitation.</li>
+ * <li>In off peak hours, use a fixed throughput limitation
+ * {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK}</li>
+ * <li>In normal hours, the max throughput is tuned between
+ * {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND} and
+ * {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND}, using the formula "lower +
+ * (higer - lower) * compactionPressure", where compactionPressure is in range [0.0, 1.0]</li>
+ * </ul>
+ * @see org.apache.hadoop.hbase.regionserver.Store#getCompactionPressure()
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class PressureAwareCompactionThroughputController extends PressureAwareThroughputController {
+
+ private final static Log LOG = LogFactory
+ .getLog(PressureAwareCompactionThroughputController.class);
+
+ public static final String HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND =
+ "hbase.hstore.compaction.throughput.higher.bound";
+
+ private static final long DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND =
+ 20L * 1024 * 1024;
+
+ public static final String HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND =
+ "hbase.hstore.compaction.throughput.lower.bound";
+
+ private static final long DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND =
+ 10L * 1024 * 1024;
+
+ public static final String HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK =
+ "hbase.hstore.compaction.throughput.offpeak";
+
+ private static final long DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK = Long.MAX_VALUE;
+
+ public static final String HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD =
+ "hbase.hstore.compaction.throughput.tune.period";
+
+ private static final int DEFAULT_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD = 60 * 1000;
+
+ // check compaction throughput every this size
+ private static final String HBASE_HSTORE_COMPACTION_THROUGHPUT_CONTROL_CHECK_INTERVAL =
+ "hbase.hstore.compaction.throughput.control.check.interval";
+
+ private long maxThroughputOffpeak;
+
+ @Override
+ public void setup(final RegionServerServices server) {
+ server.getChoreService().scheduleChore(
+ new ScheduledChore("CompactionThroughputTuner", this, tuningPeriod) {
+
+ @Override
+ protected void chore() {
+ tune(server.getCompactionPressure());
+ }
+ });
+ }
+
+ private void tune(double compactionPressure) {
+ double maxThroughputToSet;
+ if (compactionPressure > 1.0) {
+ // set to unlimited if some stores already reach the blocking store file count
+ maxThroughputToSet = Double.MAX_VALUE;
+ } else if (offPeakHours.isOffPeakHour()) {
+ maxThroughputToSet = maxThroughputOffpeak;
+ } else {
+ // compactionPressure is between 0.0 and 1.0, we use a simple linear formula to
+ // calculate the throughput limitation.
+ maxThroughputToSet =
+ maxThroughputLowerBound + (maxThroughputUpperBound - maxThroughputLowerBound)
+ * compactionPressure;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("compactionPressure is " + compactionPressure + ", tune compaction throughput to "
+ + throughputDesc(maxThroughputToSet));
+ }
+ this.setMaxThroughput(maxThroughputToSet);
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ if (conf == null) {
+ return;
+ }
+ this.maxThroughputUpperBound =
+ conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND,
+ DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND);
+ this.maxThroughputLowerBound =
+ conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND,
+ DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND);
+ this.maxThroughputOffpeak =
+ conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK,
+ DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK);
+ this.offPeakHours = OffPeakHours.getInstance(conf);
+ this.controlPerSize =
+ conf.getLong(HBASE_HSTORE_COMPACTION_THROUGHPUT_CONTROL_CHECK_INTERVAL,
+ this.maxThroughputLowerBound);
+ this.setMaxThroughput(this.maxThroughputLowerBound);
+ this.tuningPeriod =
+ getConf().getInt(HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD,
+ DEFAULT_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD);
+ LOG.info("Compaction throughput configurations, higher bound: "
+ + throughputDesc(maxThroughputUpperBound) + ", lower bound "
+ + throughputDesc(maxThroughputLowerBound) + ", off peak: "
+ + throughputDesc(maxThroughputOffpeak) + ", tuning period: " + tuningPeriod + " ms");
+ }
+
+ @Override
+ public String toString() {
+ return "DefaultCompactionThroughputController [maxThroughput="
+ + throughputDesc(getMaxThroughput()) + ", activeCompactions=" + activeOperations.size()
+ + "]";
+ }
+
+ @Override
+ protected boolean skipControl(long deltaSize, long controlSize) {
+ if (deltaSize < controlSize) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareFlushThroughputController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareFlushThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareFlushThroughputController.java
new file mode 100644
index 0000000..f301a27
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareFlushThroughputController.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.throttle;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
+
+/**
+ * A throughput controller which uses the follow schema to limit throughput
+ * <ul>
+ * <li>If flush pressure is greater than or equal to 1.0, no limitation.</li>
+ * <li>In normal case, the max throughput is tuned between
+ * {@value #HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND} and
+ * {@value #HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND}, using the formula "lower +
+ * (upper - lower) * flushPressure", where flushPressure is in range [0.0, 1.0)</li>
+ * </ul>
+ * @see org.apache.hadoop.hbase.regionserver.HRegionServer#getFlushPressure()
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class PressureAwareFlushThroughputController extends PressureAwareThroughputController {
+
+ private static final Log LOG = LogFactory.getLog(PressureAwareFlushThroughputController.class);
+
+ public static final String HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND =
+ "hbase.hstore.flush.throughput.upper.bound";
+
+ private static final long DEFAULT_HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND =
+ 200L * 1024 * 1024;
+
+ public static final String HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND =
+ "hbase.hstore.flush.throughput.lower.bound";
+
+ private static final long DEFAULT_HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND =
+ 100L * 1024 * 1024;
+
+ public static final String HBASE_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD =
+ "hbase.hstore.flush.throughput.tune.period";
+
+ private static final int DEFAULT_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD = 20 * 1000;
+
+ // check flush throughput every this size
+ public static final String HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL =
+ "hbase.hstore.flush.throughput.control.check.interval";
+
+ private static final long DEFAULT_HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL =
+ 10L * 1024 * 1024;// 10MB
+
+ @Override
+ public void setup(final RegionServerServices server) {
+ server.getChoreService().scheduleChore(
+ new ScheduledChore("FlushThroughputTuner", this, tuningPeriod, this.tuningPeriod) {
+
+ @Override
+ protected void chore() {
+ tune(server.getFlushPressure());
+ }
+ });
+ }
+
+ private void tune(double flushPressure) {
+ double maxThroughputToSet;
+ if (flushPressure >= 1.0) {
+ // set to unlimited if global memstore size already exceeds lower limit
+ maxThroughputToSet = Double.MAX_VALUE;
+ } else {
+ // flushPressure is between 0.0 and 1.0, we use a simple linear formula to
+ // calculate the throughput limitation.
+ maxThroughputToSet =
+ maxThroughputLowerBound + (maxThroughputUpperBound - maxThroughputLowerBound)
+ * flushPressure;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("flushPressure is " + flushPressure + ", tune flush throughput to "
+ + throughputDesc(maxThroughputToSet));
+ }
+ this.setMaxThroughput(maxThroughputToSet);
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ if (conf == null) {
+ return;
+ }
+ this.maxThroughputUpperBound =
+ conf.getLong(HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND,
+ DEFAULT_HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND);
+ this.maxThroughputLowerBound =
+ conf.getLong(HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND,
+ DEFAULT_HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND);
+ this.offPeakHours = OffPeakHours.getInstance(conf);
+ this.controlPerSize =
+ conf.getLong(HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL,
+ DEFAULT_HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL);
+ this.setMaxThroughput(this.maxThroughputLowerBound);
+ this.tuningPeriod =
+ getConf().getInt(HBASE_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD,
+ DEFAULT_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD);
+ LOG.info("Flush throughput configurations, upper bound: "
+ + throughputDesc(maxThroughputUpperBound) + ", lower bound "
+ + throughputDesc(maxThroughputLowerBound) + ", tuning period: " + tuningPeriod + " ms");
+ }
+
+ @Override
+ public String toString() {
+ return "DefaultFlushController [maxThroughput=" + throughputDesc(getMaxThroughput())
+ + ", activeFlushNumber=" + activeOperations.size() + "]";
+ }
+
+ @Override
+ protected boolean skipControl(long deltaSize, long controlSize) {
+ // for flush, we control the flow no matter whether the flush size is small
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareThroughputController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareThroughputController.java
new file mode 100644
index 0000000..c360985
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareThroughputController.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.throttle;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public abstract class PressureAwareThroughputController extends Configured implements
+ ThroughputController, Stoppable {
+ private static final Log LOG = LogFactory.getLog(PressureAwareThroughputController.class);
+
+ /**
+ * Stores the information of one controlled compaction.
+ */
+ private static final class ActiveOperation {
+
+ private final long startTime;
+
+ private long lastControlTime;
+
+ private long lastControlSize;
+
+ private long totalSize;
+
+ private long numberOfSleeps;
+
+ private long totalSleepTime;
+
+ // prevent too many debug log
+ private long lastLogTime;
+
+ ActiveOperation() {
+ long currentTime = EnvironmentEdgeManager.currentTime();
+ this.startTime = currentTime;
+ this.lastControlTime = currentTime;
+ this.lastLogTime = currentTime;
+ }
+ }
+
+ protected long maxThroughputUpperBound;
+
+ protected long maxThroughputLowerBound;
+
+ protected OffPeakHours offPeakHours;
+
+ protected long controlPerSize;
+
+ protected int tuningPeriod;
+
+ private volatile double maxThroughput;
+
+ protected final ConcurrentMap<String, ActiveOperation> activeOperations =
+ new ConcurrentHashMap<String, ActiveOperation>();
+
+ @Override
+ public abstract void setup(final RegionServerServices server);
+
+ protected String throughputDesc(long deltaSize, long elapsedTime) {
+ return throughputDesc((double) deltaSize / elapsedTime * 1000);
+ }
+
+ protected String throughputDesc(double speed) {
+ if (speed >= 1E15) { // large enough to say it is unlimited
+ return "unlimited";
+ } else {
+ return String.format("%.2f MB/sec", speed / 1024 / 1024);
+ }
+ }
+
+ @Override
+ public void start(String opName) {
+ activeOperations.put(opName, new ActiveOperation());
+ }
+
+ @Override
+ public long control(String opName, long size) throws InterruptedException {
+ ActiveOperation operation = activeOperations.get(opName);
+ operation.totalSize += size;
+ long deltaSize = operation.totalSize - operation.lastControlSize;
+ if (deltaSize < controlPerSize) {
+ return 0;
+ }
+ long now = EnvironmentEdgeManager.currentTime();
+ double maxThroughputPerCompaction = this.getMaxThroughput() / activeOperations.size();
+ long minTimeAllowed = (long) (deltaSize / maxThroughputPerCompaction * 1000); // ms
+ long elapsedTime = now - operation.lastControlTime;
+ operation.lastControlSize = operation.totalSize;
+ if (elapsedTime >= minTimeAllowed) {
+ operation.lastControlTime = EnvironmentEdgeManager.currentTime();
+ return 0;
+ }
+ // too fast
+ long sleepTime = minTimeAllowed - elapsedTime;
+ if (LOG.isDebugEnabled()) {
+ // do not log too much
+ if (now - operation.lastLogTime > 5L * 1000) {
+ LOG.debug("deltaSize: " + deltaSize + " bytes; elapseTime: " + elapsedTime + " ns");
+ LOG.debug(opName + " sleep " + sleepTime + " ms because current throughput is "
+ + throughputDesc(deltaSize, elapsedTime) + ", max allowed is "
+ + throughputDesc(maxThroughputPerCompaction) + ", already slept "
+ + operation.numberOfSleeps + " time(s) and total slept time is "
+ + operation.totalSleepTime + " ms till now.");
+ operation.lastLogTime = now;
+ }
+ }
+ Thread.sleep(sleepTime);
+ operation.numberOfSleeps++;
+ operation.totalSleepTime += sleepTime;
+ operation.lastControlTime = EnvironmentEdgeManager.currentTime();
+ return sleepTime;
+ }
+
+ /**
+ * Check whether to skip control given delta size and control size
+ * @param deltaSize Delta size since last control
+ * @param controlSize Size limit to perform control
+ * @return a boolean indicates whether to skip this control
+ */
+ protected abstract boolean skipControl(long deltaSize, long controlSize);
+
+ @Override
+ public void finish(String opName) {
+ ActiveOperation operation = activeOperations.remove(opName);
+ long elapsedTime = EnvironmentEdgeManager.currentTime() - operation.startTime;
+ LOG.info(opName + " average throughput is "
+ + throughputDesc(operation.totalSize, elapsedTime) + ", slept "
+ + operation.numberOfSleeps + " time(s) and total slept time is "
+ + operation.totalSleepTime + " ms. " + activeOperations.size()
+ + " active operations remaining, total limit is " + throughputDesc(getMaxThroughput()));
+ }
+
+ private volatile boolean stopped = false;
+
+ @Override
+ public void stop(String why) {
+ stopped = true;
+ }
+
+ @Override
+ public boolean isStopped() {
+ return stopped;
+ }
+
+ public double getMaxThroughput() {
+ return maxThroughput;
+ }
+
+ public void setMaxThroughput(double maxThroughput) {
+ this.maxThroughput = maxThroughput;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputControlUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputControlUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputControlUtil.java
new file mode 100644
index 0000000..b3c4147
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputControlUtil.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.throttle;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.Store;
+
+/**
+ * Helper methods for throttling
+ */
+@InterfaceAudience.Private
+public final class ThroughputControlUtil {
+ private ThroughputControlUtil() {
+ }
+
+ private static final AtomicInteger NAME_COUNTER = new AtomicInteger(0);
+ private static final String NAME_DELIMITER = "#";
+
+ /**
+ * Generate a name for throttling, to prevent name conflict when multiple IO operation running
+ * parallel on the same store.
+ * @param store the Store instance on which IO operation is happening
+ * @param opName Name of the IO operation, e.g. "flush", "compaction", etc.
+ * @return The name for throttling
+ */
+ public static String getNameForThrottling(final Store store, final String opName) {
+ int counter;
+ for (;;) {
+ counter = NAME_COUNTER.get();
+ int next = counter == Integer.MAX_VALUE ? 0 : counter + 1;
+ if (NAME_COUNTER.compareAndSet(counter, next)) {
+ break;
+ }
+ }
+ return store.getRegionInfo().getRegionNameAsString() + NAME_DELIMITER
+ + store.getFamily().getNameAsString() + NAME_DELIMITER + opName + NAME_DELIMITER + counter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputController.java
new file mode 100644
index 0000000..f299f98
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputController.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.throttle;
+
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+
+/**
+ * A utility that constrains the total throughput of one or more simultaneous flows by
+ * sleeping when necessary.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public interface ThroughputController extends Stoppable {
+
+ /**
+ * Setup controller for the given region server.
+ */
+ void setup(RegionServerServices server);
+
+ /**
+ * Start the throughput controller.
+ */
+ void start(String name);
+
+ /**
+ * Control the throughput. Will sleep if too fast.
+ * @return the actual sleep time.
+ */
+ long control(String name, long size) throws InterruptedException;
+
+ /**
+ * Finish the controller. Should call this method in a finally block.
+ */
+ void finish(String name);
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index 0986ad7..e634327 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
@@ -317,4 +318,13 @@ public class MockRegionServerServices implements RegionServerServices {
// TODO Auto-generated method stub
return null;
}
+
+ public ThroughputController getFlushThroughputController() {
+ return null;
+ }
+
+ @Override
+ public double getFlushPressure() {
+ return 0;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index 94a63d8..35a7403 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
@@ -44,7 +43,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -122,7 +121,7 @@ public class TestIOFencing {
@Override
public boolean compact(CompactionContext compaction, Store store,
- CompactionThroughputController throughputController) throws IOException {
+ ThroughputController throughputController) throws IOException {
try {
return super.compact(compaction, store, throughputController);
} finally {
@@ -132,7 +131,7 @@ public class TestIOFencing {
@Override
public boolean compact(CompactionContext compaction, Store store,
- CompactionThroughputController throughputController, User user) throws IOException {
+ ThroughputController throughputController, User user) throws IOException {
try {
return super.compact(compaction, store, throughputController, user);
} finally {
http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
index e20c4ad..5241dbe 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
@@ -60,12 +60,12 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -235,7 +235,7 @@ public class TestRegionObserverScannerOpenHook {
@Override
public boolean compact(CompactionContext compaction, Store store,
- CompactionThroughputController throughputController) throws IOException {
+ ThroughputController throughputController) throws IOException {
boolean ret = super.compact(compaction, store, throughputController);
if (ret) compactionStateChangeLatch.countDown();
return ret;
@@ -243,7 +243,7 @@ public class TestRegionObserverScannerOpenHook {
@Override
public boolean compact(CompactionContext compaction, Store store,
- CompactionThroughputController throughputController, User user) throws IOException {
+ ThroughputController throughputController, User user) throws IOException {
boolean ret = super.compact(compaction, store, throughputController, user);
if (ret) compactionStateChangeLatch.countDown();
return ret;
http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index 32f644b..4de4a5f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -103,6 +103,7 @@ import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
@@ -662,4 +663,13 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
// TODO Auto-generated method stub
return null;
}
+
+ public ThroughputController getFlushThroughputController() {
+ return null;
+ }
+
+ @Override
+ public double getFlushPressure() {
+ return 0;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java
index 022279a..f57ade9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java
@@ -22,8 +22,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
-import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
+import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assert;
http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
index b374bdc..06b4c46 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
@@ -59,9 +59,9 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
-import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
+import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
+import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@@ -104,7 +104,7 @@ public class TestCompaction {
conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100);
conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
- NoLimitCompactionThroughputController.class.getName());
+ NoLimitThroughputController.class.getName());
compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
secondRowBytes = START_KEY_BYTES.clone();
@@ -363,13 +363,13 @@ public class TestCompaction {
}
@Override
- public List<Path> compact(CompactionThroughputController throughputController)
+ public List<Path> compact(ThroughputController throughputController)
throws IOException {
return compact(throughputController, null);
}
@Override
- public List<Path> compact(CompactionThroughputController throughputController, User user)
+ public List<Path> compact(ThroughputController throughputController, User user)
throws IOException {
finishCompaction(this.selectedFiles);
return new ArrayList<Path>();
@@ -421,13 +421,13 @@ public class TestCompaction {
}
@Override
- public List<Path> compact(CompactionThroughputController throughputController)
+ public List<Path> compact(ThroughputController throughputController)
throws IOException {
return compact(throughputController, null);
}
@Override
- public List<Path> compact(CompactionThroughputController throughputController, User user)
+ public List<Path> compact(ThroughputController throughputController, User user)
throws IOException {
try {
isInCompact = true;
@@ -510,10 +510,10 @@ public class TestCompaction {
HRegion r = mock(HRegion.class);
when(
r.compact(any(CompactionContext.class), any(Store.class),
- any(CompactionThroughputController.class), any(User.class))).then(new Answer<Boolean>() {
+ any(ThroughputController.class), any(User.class))).then(new Answer<Boolean>() {
public Boolean answer(InvocationOnMock invocation) throws Throwable {
invocation.getArgumentAt(0, CompactionContext.class).compact(
- invocation.getArgumentAt(2, CompactionThroughputController.class));
+ invocation.getArgumentAt(2, ThroughputController.class));
return true;
}
});
http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
index ef02431..385048c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
@@ -59,7 +59,7 @@ import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
-import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.security.EncryptionUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -537,7 +537,7 @@ public class TestHMobStore {
// Trigger major compaction
this.store.triggerMajorCompaction();
CompactionContext requestCompaction = this.store.requestCompaction(1, null);
- this.store.compact(requestCompaction, NoLimitCompactionThroughputController.INSTANCE);
+ this.store.compact(requestCompaction, NoLimitThroughputController.INSTANCE);
Assert.assertEquals(1, this.store.getStorefiles().size());
//Check encryption after compaction
http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
index 382193b..85b2a9d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
@@ -70,7 +70,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescripto
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
-import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -314,7 +314,7 @@ public class TestHRegionReplayEvents {
// compaction from primary
LOG.info("-- Compacting primary, only 1 store");
primaryRegion.compactStore(Bytes.toBytes("cf1"),
- NoLimitCompactionThroughputController.INSTANCE);
+ NoLimitThroughputController.INSTANCE);
// now replay the edits and the flush marker
reader = createWALReaderForPrimary();
http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
index 41fbae6..fe620e7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
@@ -85,7 +85,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
-import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -259,7 +259,7 @@ public class TestSplitTransactionOnCluster {
region.initialize();
// 2, Run Compaction cc
- assertFalse(region.compact(cc, store, NoLimitCompactionThroughputController.INSTANCE));
+ assertFalse(region.compact(cc, store, NoLimitThroughputController.INSTANCE));
assertTrue(fileNum > store.getStorefiles().size());
// 3, Split
http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
index 9e846c6..354ea2d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
@@ -72,7 +72,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
-import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.security.User;
@@ -382,7 +382,7 @@ public class TestStore {
Assert.assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS);
// after compact; check the lowest time stamp
- store.compact(store.requestCompaction(), NoLimitCompactionThroughputController.INSTANCE);
+ store.compact(store.requestCompaction(), NoLimitThroughputController.INSTANCE);
lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
Assert.assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
index eb8513b..cb586f3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
@@ -52,8 +52,8 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
+import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -133,7 +133,7 @@ public class TestStripeCompactor {
StripeCompactor sc = createCompactor(writers, input);
List<Path> paths =
sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom, majorTo,
- NoLimitCompactionThroughputController.INSTANCE);
+ NoLimitThroughputController.INSTANCE);
writers.verifyKvs(output, allFiles, true);
if (allFiles) {
assertEquals(output.length, paths.size());
@@ -170,7 +170,7 @@ public class TestStripeCompactor {
StripeCompactor sc = createCompactor(writers, input);
List<Path> paths =
sc.compact(createDummyRequest(), targetCount, targetSize, left, right, null, null,
- NoLimitCompactionThroughputController.INSTANCE);
+ NoLimitThroughputController.INSTANCE);
assertEquals(output.length, paths.size());
writers.verifyKvs(output, true, true);
List<byte[]> boundaries = new ArrayList<byte[]>();
http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java
index 1454aa8..6b641c1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java
@@ -37,10 +37,10 @@ import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -77,7 +77,7 @@ public class TestStripeStoreEngine {
when(
mockCompactor.compact(any(CompactionRequest.class), anyInt(), anyLong(), any(byte[].class),
any(byte[].class), any(byte[].class), any(byte[].class),
- any(CompactionThroughputController.class), any(User.class)))
+ any(ThroughputController.class), any(User.class)))
.thenReturn(new ArrayList<Path>());
// Produce 3 L0 files.
@@ -96,10 +96,10 @@ public class TestStripeStoreEngine {
assertEquals(2, compaction.getRequest().getFiles().size());
assertFalse(compaction.getRequest().getFiles().contains(sf));
// Make sure the correct method it called on compactor.
- compaction.compact(NoLimitCompactionThroughputController.INSTANCE);
+ compaction.compact(NoLimitThroughputController.INSTANCE);
verify(mockCompactor, times(1)).compact(compaction.getRequest(), targetCount, 0L,
StripeStoreFileManager.OPEN_KEY, StripeStoreFileManager.OPEN_KEY, null, null,
- NoLimitCompactionThroughputController.INSTANCE, null);
+ NoLimitThroughputController.INSTANCE, null);
}
private static StoreFile createFile() throws Exception {
http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactionWithThroughputController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactionWithThroughputController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactionWithThroughputController.java
deleted file mode 100644
index 4456ef2..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactionWithThroughputController.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver.compactions;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Random;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.HStore;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.StoreEngine;
-import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
-import org.apache.hadoop.hbase.regionserver.StripeStoreEngine;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.RegionServerTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.JVMClusterUtil;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({ RegionServerTests.class, MediumTests.class })
-public class TestCompactionWithThroughputController {
-
- private static final Log LOG = LogFactory.getLog(TestCompactionWithThroughputController.class);
-
- private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-
- private static final double EPSILON = 1E-6;
-
- private final TableName tableName = TableName.valueOf(getClass().getSimpleName());
-
- private final byte[] family = Bytes.toBytes("f");
-
- private final byte[] qualifier = Bytes.toBytes("q");
-
- private Store getStoreWithName(TableName tableName) {
- MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
- List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads();
- for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
- HRegionServer hrs = rsts.get(i).getRegionServer();
- for (Region region : hrs.getOnlineRegions(tableName)) {
- return region.getStores().iterator().next();
- }
- }
- return null;
- }
-
- private Store prepareData() throws IOException {
- Admin admin = TEST_UTIL.getHBaseAdmin();
- if (admin.tableExists(tableName)) {
- admin.disableTable(tableName);
- admin.deleteTable(tableName);
- }
- Table table = TEST_UTIL.createTable(tableName, family);
- Random rand = new Random();
- for (int i = 0; i < 10; i++) {
- for (int j = 0; j < 10; j++) {
- byte[] value = new byte[128 * 1024];
- rand.nextBytes(value);
- table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value));
- }
- admin.flush(tableName);
- }
- return getStoreWithName(tableName);
- }
-
- private long testCompactionWithThroughputLimit() throws Exception {
- long throughputLimit = 1024L * 1024;
- Configuration conf = TEST_UTIL.getConfiguration();
- conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
- conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100);
- conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200);
- conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
- conf.setLong(
- PressureAwareCompactionThroughputController
- .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND,
- throughputLimit);
- conf.setLong(
- PressureAwareCompactionThroughputController
- .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND,
- throughputLimit);
- conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
- PressureAwareCompactionThroughputController.class.getName());
- TEST_UTIL.startMiniCluster(1);
- try {
- Store store = prepareData();
- assertEquals(10, store.getStorefilesCount());
- long startTime = System.currentTimeMillis();
- TEST_UTIL.getHBaseAdmin().majorCompact(tableName);
- while (store.getStorefilesCount() != 1) {
- Thread.sleep(20);
- }
- long duration = System.currentTimeMillis() - startTime;
- double throughput = (double) store.getStorefilesSize() / duration * 1000;
- // confirm that the speed limit work properly(not too fast, and also not too slow)
- // 20% is the max acceptable error rate.
- assertTrue(throughput < throughputLimit * 1.2);
- assertTrue(throughput > throughputLimit * 0.8);
- return System.currentTimeMillis() - startTime;
- } finally {
- TEST_UTIL.shutdownMiniCluster();
- }
- }
-
- private long testCompactionWithoutThroughputLimit() throws Exception {
- Configuration conf = TEST_UTIL.getConfiguration();
- conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
- conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100);
- conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200);
- conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
- conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
- NoLimitCompactionThroughputController.class.getName());
- TEST_UTIL.startMiniCluster(1);
- try {
- Store store = prepareData();
- assertEquals(10, store.getStorefilesCount());
- long startTime = System.currentTimeMillis();
- TEST_UTIL.getHBaseAdmin().majorCompact(tableName);
- while (store.getStorefilesCount() != 1) {
- Thread.sleep(20);
- }
- return System.currentTimeMillis() - startTime;
- } finally {
- TEST_UTIL.shutdownMiniCluster();
- }
- }
-
- @Test
- public void testCompaction() throws Exception {
- long limitTime = testCompactionWithThroughputLimit();
- long noLimitTime = testCompactionWithoutThroughputLimit();
- LOG.info("With 1M/s limit, compaction use " + limitTime + "ms; without limit, compaction use "
- + noLimitTime + "ms");
- // usually the throughput of a compaction without limitation is about 40MB/sec at least, so this
- // is a very weak assumption.
- assertTrue(limitTime > noLimitTime * 2);
- }
-
- /**
- * Test the tuning task of {@link PressureAwareCompactionThroughputController}
- */
- @Test
- public void testThroughputTuning() throws Exception {
- Configuration conf = TEST_UTIL.getConfiguration();
- conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
- conf.setLong(
- PressureAwareCompactionThroughputController
- .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND,
- 20L * 1024 * 1024);
- conf.setLong(
- PressureAwareCompactionThroughputController
- .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND,
- 10L * 1024 * 1024);
- conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 4);
- conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 6);
- conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
- PressureAwareCompactionThroughputController.class.getName());
- conf.setInt(
- PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD,
- 1000);
- TEST_UTIL.startMiniCluster(1);
- Connection conn = ConnectionFactory.createConnection(conf);
- try {
- HTableDescriptor htd = new HTableDescriptor(tableName);
- htd.addFamily(new HColumnDescriptor(family));
- htd.setCompactionEnabled(false);
- TEST_UTIL.getHBaseAdmin().createTable(htd);
- TEST_UTIL.waitTableAvailable(tableName);
- HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName);
- PressureAwareCompactionThroughputController throughputController =
- (PressureAwareCompactionThroughputController) regionServer.compactSplitThread
- .getCompactionThroughputController();
- assertEquals(10L * 1024 * 1024, throughputController.maxThroughput, EPSILON);
- Table table = conn.getTable(tableName);
- for (int i = 0; i < 5; i++) {
- byte[] value = new byte[0];
- table.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, value));
- TEST_UTIL.flush(tableName);
- }
- Thread.sleep(2000);
- assertEquals(15L * 1024 * 1024, throughputController.maxThroughput, EPSILON);
-
- byte[] value1 = new byte[0];
- table.put(new Put(Bytes.toBytes(5)).addColumn(family, qualifier, value1));
- TEST_UTIL.flush(tableName);
- Thread.sleep(2000);
- assertEquals(20L * 1024 * 1024, throughputController.maxThroughput, EPSILON);
-
- byte[] value = new byte[0];
- table.put(new Put(Bytes.toBytes(6)).addColumn(family, qualifier, value));
- TEST_UTIL.flush(tableName);
- Thread.sleep(2000);
- assertEquals(Double.MAX_VALUE, throughputController.maxThroughput, EPSILON);
-
- conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
- NoLimitCompactionThroughputController.class.getName());
- regionServer.compactSplitThread.onConfigurationChange(conf);
- assertTrue(throughputController.isStopped());
- assertTrue(regionServer.compactSplitThread.getCompactionThroughputController()
- instanceof NoLimitCompactionThroughputController);
- } finally {
- conn.close();
- TEST_UTIL.shutdownMiniCluster();
- }
- }
-
- /**
- * Test the logic that we calculate compaction pressure for a striped store.
- */
- @Test
- public void testGetCompactionPressureForStripedStore() throws Exception {
- Configuration conf = TEST_UTIL.getConfiguration();
- conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName());
- conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, false);
- conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, 2);
- conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 4);
- conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 12);
- TEST_UTIL.startMiniCluster(1);
- Connection conn = ConnectionFactory.createConnection(conf);
- try {
- HTableDescriptor htd = new HTableDescriptor(tableName);
- htd.addFamily(new HColumnDescriptor(family));
- htd.setCompactionEnabled(false);
- TEST_UTIL.getHBaseAdmin().createTable(htd);
- TEST_UTIL.waitTableAvailable(tableName);
- HStore store = (HStore) getStoreWithName(tableName);
- assertEquals(0, store.getStorefilesCount());
- assertEquals(0.0, store.getCompactionPressure(), EPSILON);
- Table table = conn.getTable(tableName);
- for (int i = 0; i < 4; i++) {
- byte[] value1 = new byte[0];
- table.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, value1));
- byte[] value = new byte[0];
- table.put(new Put(Bytes.toBytes(100 + i)).addColumn(family, qualifier, value));
- TEST_UTIL.flush(tableName);
- }
- assertEquals(8, store.getStorefilesCount());
- assertEquals(0.0, store.getCompactionPressure(), EPSILON);
-
- byte[] value5 = new byte[0];
- table.put(new Put(Bytes.toBytes(4)).addColumn(family, qualifier, value5));
- byte[] value4 = new byte[0];
- table.put(new Put(Bytes.toBytes(104)).addColumn(family, qualifier, value4));
- TEST_UTIL.flush(tableName);
- assertEquals(10, store.getStorefilesCount());
- assertEquals(0.5, store.getCompactionPressure(), EPSILON);
-
- byte[] value3 = new byte[0];
- table.put(new Put(Bytes.toBytes(5)).addColumn(family, qualifier, value3));
- byte[] value2 = new byte[0];
- table.put(new Put(Bytes.toBytes(105)).addColumn(family, qualifier, value2));
- TEST_UTIL.flush(tableName);
- assertEquals(12, store.getStorefilesCount());
- assertEquals(1.0, store.getCompactionPressure(), EPSILON);
-
- byte[] value1 = new byte[0];
- table.put(new Put(Bytes.toBytes(6)).addColumn(family, qualifier, value1));
- byte[] value = new byte[0];
- table.put(new Put(Bytes.toBytes(106)).addColumn(family, qualifier, value));
- TEST_UTIL.flush(tableName);
- assertEquals(14, store.getStorefilesCount());
- assertEquals(2.0, store.getCompactionPressure(), EPSILON);
- } finally {
- conn.close();
- TEST_UTIL.shutdownMiniCluster();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
index 56e71e8..c440a57 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager;
import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
import org.apache.hadoop.hbase.regionserver.TestStripeCompactor.StoreFileWritersCapture;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider;
+import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -216,10 +217,10 @@ public class TestStripeCompactionPolicy {
assertTrue(policy.needsCompactions(si, al()));
StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
assertEquals(si.getStorefiles(), scr.getRequest().getFiles());
- scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
+ scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY),
aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY),
- any(NoLimitCompactionThroughputController.class), any(User.class));
+ any(NoLimitThroughputController.class), any(User.class));
}
@Test
@@ -469,7 +470,7 @@ public class TestStripeCompactionPolicy {
// All the Stripes are expired, so the Compactor will not create any Writers. We need to create
// an empty file to preserve metadata
StripeCompactor sc = createCompactor();
- List<Path> paths = scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
+ List<Path> paths = scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
assertEquals(1, paths.size());
}
@@ -528,7 +529,7 @@ public class TestStripeCompactionPolicy {
assertTrue(policy.needsCompactions(si, al()));
StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
- scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
+ scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(new ArgumentMatcher<List<byte[]>>() {
@Override
public boolean matches(Object argument) {
@@ -542,7 +543,7 @@ public class TestStripeCompactionPolicy {
}
}), dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom),
dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo),
- any(NoLimitCompactionThroughputController.class), any(User.class));
+ any(NoLimitThroughputController.class), any(User.class));
}
/**
@@ -563,12 +564,12 @@ public class TestStripeCompactionPolicy {
assertTrue(!needsCompaction || policy.needsCompactions(si, al()));
StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
- scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
+ scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
verify(sc, times(1)).compact(eq(scr.getRequest()),
count == null ? anyInt() : eq(count.intValue()),
size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end),
dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end),
- any(NoLimitCompactionThroughputController.class), any(User.class));
+ any(NoLimitThroughputController.class), any(User.class));
}
/** Verify arbitrary flush. */
http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestCompactionWithThroughputController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestCompactionWithThroughputController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestCompactionWithThroughputController.java
new file mode 100644
index 0000000..41975eb
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestCompactionWithThroughputController.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.throttle;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreEngine;
+import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
+import org.apache.hadoop.hbase.regionserver.StripeStoreEngine;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
+import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
+import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.PressureAwareCompactionThroughputController;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestCompactionWithThroughputController {
+
+ private static final Log LOG = LogFactory.getLog(TestCompactionWithThroughputController.class);
+
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ private static final double EPSILON = 1E-6;
+
+ private final TableName tableName = TableName.valueOf(getClass().getSimpleName());
+
+ private final byte[] family = Bytes.toBytes("f");
+
+ private final byte[] qualifier = Bytes.toBytes("q");
+
+ private Store getStoreWithName(TableName tableName) {
+ MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
+ List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads();
+ for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
+ HRegionServer hrs = rsts.get(i).getRegionServer();
+ for (Region region : hrs.getOnlineRegions(tableName)) {
+ return region.getStores().iterator().next();
+ }
+ }
+ return null;
+ }
+
+ private Store prepareData() throws IOException {
+ Admin admin = TEST_UTIL.getHBaseAdmin();
+ if (admin.tableExists(tableName)) {
+ admin.disableTable(tableName);
+ admin.deleteTable(tableName);
+ }
+ Table table = TEST_UTIL.createTable(tableName, family);
+ Random rand = new Random();
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 10; j++) {
+ byte[] value = new byte[128 * 1024];
+ rand.nextBytes(value);
+ table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value));
+ }
+ admin.flush(tableName);
+ }
+ return getStoreWithName(tableName);
+ }
+
+ private long testCompactionWithThroughputLimit() throws Exception {
+ long throughputLimit = 1024L * 1024;
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
+ conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100);
+ conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200);
+ conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
+ conf.setLong(
+ PressureAwareCompactionThroughputController
+ .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND,
+ throughputLimit);
+ conf.setLong(
+ PressureAwareCompactionThroughputController
+ .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND,
+ throughputLimit);
+ conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
+ PressureAwareCompactionThroughputController.class.getName());
+ TEST_UTIL.startMiniCluster(1);
+ try {
+ Store store = prepareData();
+ assertEquals(10, store.getStorefilesCount());
+ long startTime = System.currentTimeMillis();
+ TEST_UTIL.getHBaseAdmin().majorCompact(tableName);
+ while (store.getStorefilesCount() != 1) {
+ Thread.sleep(20);
+ }
+ long duration = System.currentTimeMillis() - startTime;
+ double throughput = (double) store.getStorefilesSize() / duration * 1000;
+ // confirm that the speed limit work properly(not too fast, and also not too slow)
+ // 20% is the max acceptable error rate.
+ assertTrue(throughput < throughputLimit * 1.2);
+ assertTrue(throughput > throughputLimit * 0.8);
+ return System.currentTimeMillis() - startTime;
+ } finally {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+ }
+
+ private long testCompactionWithoutThroughputLimit() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
+ conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100);
+ conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200);
+ conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
+ conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
+ NoLimitThroughputController.class.getName());
+ TEST_UTIL.startMiniCluster(1);
+ try {
+ Store store = prepareData();
+ assertEquals(10, store.getStorefilesCount());
+ long startTime = System.currentTimeMillis();
+ TEST_UTIL.getHBaseAdmin().majorCompact(tableName);
+ while (store.getStorefilesCount() != 1) {
+ Thread.sleep(20);
+ }
+ return System.currentTimeMillis() - startTime;
+ } finally {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+ }
+
+ @Test
+ public void testCompaction() throws Exception {
+ long limitTime = testCompactionWithThroughputLimit();
+ long noLimitTime = testCompactionWithoutThroughputLimit();
+ LOG.info("With 1M/s limit, compaction use " + limitTime + "ms; without limit, compaction use "
+ + noLimitTime + "ms");
+ // usually the throughput of a compaction without limitation is about 40MB/sec at least, so this
+ // is a very weak assumption.
+ assertTrue(limitTime > noLimitTime * 2);
+ }
+
+ /**
+ * Test the tuning task of {@link PressureAwareCompactionThroughputController}
+ */
+ @Test
+ public void testThroughputTuning() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
+ conf.setLong(
+ PressureAwareCompactionThroughputController
+ .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND,
+ 20L * 1024 * 1024);
+ conf.setLong(
+ PressureAwareCompactionThroughputController
+ .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND,
+ 10L * 1024 * 1024);
+ conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 4);
+ conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 6);
+ conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
+ PressureAwareCompactionThroughputController.class.getName());
+ conf.setInt(
+ PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD,
+ 1000);
+ TEST_UTIL.startMiniCluster(1);
+ Connection conn = ConnectionFactory.createConnection(conf);
+ try {
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ htd.addFamily(new HColumnDescriptor(family));
+ htd.setCompactionEnabled(false);
+ TEST_UTIL.getHBaseAdmin().createTable(htd);
+ TEST_UTIL.waitTableAvailable(tableName);
+ HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName);
+ PressureAwareCompactionThroughputController throughputController =
+ (PressureAwareCompactionThroughputController) regionServer.compactSplitThread
+ .getCompactionThroughputController();
+ assertEquals(10L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON);
+ Table table = conn.getTable(tableName);
+ for (int i = 0; i < 5; i++) {
+ byte[] value = new byte[0];
+ table.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, value));
+ TEST_UTIL.flush(tableName);
+ }
+ Thread.sleep(2000);
+ assertEquals(15L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON);
+
+ byte[] value1 = new byte[0];
+ table.put(new Put(Bytes.toBytes(5)).addColumn(family, qualifier, value1));
+ TEST_UTIL.flush(tableName);
+ Thread.sleep(2000);
+ assertEquals(20L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON);
+
+ byte[] value = new byte[0];
+ table.put(new Put(Bytes.toBytes(6)).addColumn(family, qualifier, value));
+ TEST_UTIL.flush(tableName);
+ Thread.sleep(2000);
+ assertEquals(Double.MAX_VALUE, throughputController.getMaxThroughput(), EPSILON);
+
+ conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
+ NoLimitThroughputController.class.getName());
+ regionServer.compactSplitThread.onConfigurationChange(conf);
+ assertTrue(throughputController.isStopped());
+ assertTrue(regionServer.compactSplitThread.getCompactionThroughputController()
+ instanceof NoLimitThroughputController);
+ } finally {
+ conn.close();
+ TEST_UTIL.shutdownMiniCluster();
+ }
+ }
+
+ /**
+ * Test the logic that we calculate compaction pressure for a striped store.
+ */
+ @Test
+ public void testGetCompactionPressureForStripedStore() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName());
+ conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, false);
+ conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, 2);
+ conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 4);
+ conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 12);
+ TEST_UTIL.startMiniCluster(1);
+ Connection conn = ConnectionFactory.createConnection(conf);
+ try {
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ htd.addFamily(new HColumnDescriptor(family));
+ htd.setCompactionEnabled(false);
+ TEST_UTIL.getHBaseAdmin().createTable(htd);
+ TEST_UTIL.waitTableAvailable(tableName);
+ HStore store = (HStore) getStoreWithName(tableName);
+ assertEquals(0, store.getStorefilesCount());
+ assertEquals(0.0, store.getCompactionPressure(), EPSILON);
+ Table table = conn.getTable(tableName);
+ for (int i = 0; i < 4; i++) {
+ byte[] value1 = new byte[0];
+ table.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, value1));
+ byte[] value = new byte[0];
+ table.put(new Put(Bytes.toBytes(100 + i)).addColumn(family, qualifier, value));
+ TEST_UTIL.flush(tableName);
+ }
+ assertEquals(8, store.getStorefilesCount());
+ assertEquals(0.0, store.getCompactionPressure(), EPSILON);
+
+ byte[] value5 = new byte[0];
+ table.put(new Put(Bytes.toBytes(4)).addColumn(family, qualifier, value5));
+ byte[] value4 = new byte[0];
+ table.put(new Put(Bytes.toBytes(104)).addColumn(family, qualifier, value4));
+ TEST_UTIL.flush(tableName);
+ assertEquals(10, store.getStorefilesCount());
+ assertEquals(0.5, store.getCompactionPressure(), EPSILON);
+
+ byte[] value3 = new byte[0];
+ table.put(new Put(Bytes.toBytes(5)).addColumn(family, qualifier, value3));
+ byte[] value2 = new byte[0];
+ table.put(new Put(Bytes.toBytes(105)).addColumn(family, qualifier, value2));
+ TEST_UTIL.flush(tableName);
+ assertEquals(12, store.getStorefilesCount());
+ assertEquals(1.0, store.getCompactionPressure(), EPSILON);
+
+ byte[] value1 = new byte[0];
+ table.put(new Put(Bytes.toBytes(6)).addColumn(family, qualifier, value1));
+ byte[] value = new byte[0];
+ table.put(new Put(Bytes.toBytes(106)).addColumn(family, qualifier, value));
+ TEST_UTIL.flush(tableName);
+ assertEquals(14, store.getStorefilesCount());
+ assertEquals(2.0, store.getCompactionPressure(), EPSILON);
+ } finally {
+ conn.close();
+ TEST_UTIL.shutdownMiniCluster();
+ }
+ }
+}