You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2013/06/25 20:51:54 UTC
svn commit: r1496587 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/conf/
main/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/ test/java/org/apache/hadoop/hbase/regio...
Author: liyin
Date: Tue Jun 25 18:51:54 2013
New Revision: 1496587
URL: http://svn.apache.org/r1496587
Log:
[HBASE-8805] [89-fb] Making number of compaction threads and the CompactionConfiguration class online configurable
Author: gauravm
Summary: Making the number of large/small compaction threads and the properties in the CompactionConfiguration class online-configurable. Also, a minor change, making the configuration manager object in HRegionServer public static.
Test Plan:
1. Tried changing the configs on the dev cluster
2. Unit Tests
3. mvn test -Dtest=TestRegionServerOnlineConfigChange
Also generated coverage reports for this diff using the IntelliJ coverage tool, and the included unit test covers all the new code added.
Reviewers: liyintang, rshroff, manukranthk, aaiyer, shaneh
Reviewed By: liyintang
CC: hbase-eng@
Differential Revision: https://phabricator.fb.com/D847034
Task ID: 2258346
Added:
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationManager.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionConfiguration.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1496587&r1=1496586&r2=1496587&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Tue Jun 25 18:51:54 2013
@@ -220,6 +220,30 @@ public final class HConstants {
/** Default compaction manager class name. */
public static final String DEFAULT_COMPACTION_MANAGER_CLASS = CompactionManager.class.getName();
+ /** Parameter name for the number of large compaction threads */
+ public static final String LARGE_COMPACTION_THREADS =
+ "hbase.regionserver.thread.compaction.large";
+
+ /** Default number of large compaction threads */
+ public static final int DEFAULT_LARGE_COMPACTION_THREADS = 1;
+
+ /** Parameter name for the number of large compaction threads */
+ public static final String SMALL_COMPACTION_THREADS =
+ "hbase.regionserver.thread.compaction.small";
+
+ /** Default number of small compaction threads */
+ public static final int DEFAULT_SMALL_COMPACTION_THREADS = 1;
+
+ /** Prefix for Compaction related configurations in Store */
+ public static final String HSTORE_COMPACTION_PREFIX =
+ "hbase.hstore.compaction.";
+
+ /** Parameter name for the number of split threads */
+ public static final String SPLIT_THREADS = "hbase.regionserver.thread.split";
+
+ /** Default number of split threads */
+ public static final int DEFAULT_SPLIT_THREADS = 1;
+
/** Parameter name for what master implementation to use. */
public static final String MASTER_IMPL = "hbase.master.impl";
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationManager.java?rev=1496587&r1=1496586&r2=1496587&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationManager.java Tue Jun 25 18:51:54 2013
@@ -29,8 +29,47 @@ import java.util.Set;
import java.util.WeakHashMap;
/**
- * Maintains a set of all the classes which are would like to get notified
- * when the Configuration is reloaded from disk.
+ * Maintains the set of all the classes which would like to get notified
+ * when the Configuration is reloaded from the disk in the Online Configuration
+ * Change mechanism, which lets you update certain configuration properties
+ * on-the-fly, without having to restart the cluster.
+ *
+ * If a class has configuration properties which you would like to be able to
+ * change on-the-fly, do the following:
+ * 1. Implement the {@link ConfigurationObserver} interface. This would require
+ * you to implement the
+ * {@link ConfigurationObserver#notifyOnChange(Configuration)}
+ * method. This is a callback that is used to notify your class' instance
+ * that the configuration has changed. In this method, you need to check
+ * if the new values for the properties that are of interest to your class
+ * are different from the cached values. If yes, update them.
+ *
+ * However, be careful with this. Certain properties might be trivially
+ * mutable online, but others might not. Two properties might be trivially
+ * mutable by themselves, but not when changed together. For example, if a
+ * method uses properties "a" and "b" to make some decision, and is running
+ * in parallel when the notifyOnChange() method updates "a", but hasn't
+ * yet updated "b", it might make a decision on the basis of a new value of
+ * "a", and an old value of "b". This might introduce subtle bugs. This
+ * needs to be dealt on a case-by-case basis, and this class does not provide
+ * any protection from such cases.
+ *
+ * 2. Register the appropriate instance of the class with the
+ * {@link ConfigurationManager} instance, using the
+ * {@link ConfigurationManager#registerObserver(ConfigurationObserver)}
+ * method. For the RS side of things, the ConfigurationManager is a static
+ * member of the {@link org.apache.hadoop.hbase.regionserver.HRegionServer}
+ * class. Be careful not to do this in the constructor, as you might cause
+ * the 'this' reference to escape. Use a factory method, or an initialize()
+ * method which is called after the construction of the object.
+ *
+ * 3. Deregister the instance using the
+ * {@link ConfigurationManager#deregisterObserver(ConfigurationObserver)}
+ * method when it is going out of scope. In case you are not able to do that
+ * for any reason, it is still okay, since entries for dead observers are
+ * automatically collected during GC. But nonetheless, it is still a good
+ * practice to deregister your observer, whenever possible.
+ *
*/
public class ConfigurationManager {
public static final Log LOG = LogFactory.getLog(ConfigurationManager.class);
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=1496587&r1=1496586&r2=1496587&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java Tue Jun 25 18:51:54 2013
@@ -30,15 +30,17 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
/**
* Compact region on request and then run split if appropriate
*/
-public class CompactSplitThread {
+public class CompactSplitThread implements ConfigurationObserver {
static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
private final HRegionServer server;
- private final Configuration conf;
+ private Configuration conf;
private final ThreadPoolExecutor largeCompactions;
private final ThreadPoolExecutor smallCompactions;
@@ -58,10 +60,12 @@ public class CompactSplitThread {
Preconditions.checkArgument(this.server != null && this.conf != null);
int largeThreads = Math.max(1, conf.getInt(
- "hbase.regionserver.thread.compaction.large", 1));
- int smallThreads = conf.getInt(
- "hbase.regionserver.thread.compaction.small", 1);
- int splitThreads = conf.getInt("hbase.regionserver.thread.split", 1);
+ HConstants.LARGE_COMPACTION_THREADS,
+ HConstants.DEFAULT_LARGE_COMPACTION_THREADS));
+ int smallThreads = conf.getInt(HConstants.SMALL_COMPACTION_THREADS,
+ HConstants.DEFAULT_SMALL_COMPACTION_THREADS);
+ int splitThreads = conf.getInt(HConstants.SPLIT_THREADS,
+ HConstants.DEFAULT_SPLIT_THREADS);
Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
@@ -208,4 +212,55 @@ public class CompactSplitThread {
size += smallCompactions.getQueue().size();
return size;
}
+
+ @Override
+ public void notifyOnChange(Configuration newConf) {
+ // Check if number of large / small compaction threads has changed, and then
+ // adjust the core pool size of the thread pools, by using the
+ // setCorePoolSize() method. According to the javadocs, it is safe to
+ // change the core pool size on-the-fly. We need to reset the maximum
+ // pool size, as well.
+ int largeThreads = Math.max(1, newConf.getInt(
+ HConstants.LARGE_COMPACTION_THREADS,
+ HConstants.DEFAULT_LARGE_COMPACTION_THREADS));
+ if (this.largeCompactions.getCorePoolSize() != largeThreads) {
+ LOG.info("Changing the value of " + HConstants.LARGE_COMPACTION_THREADS +
+ " from " + this.largeCompactions.getCorePoolSize() + " to " +
+ largeThreads);
+ this.largeCompactions.setMaximumPoolSize(largeThreads);
+ this.largeCompactions.setCorePoolSize(largeThreads);
+ }
+
+ int smallThreads = newConf.getInt(HConstants.SMALL_COMPACTION_THREADS,
+ HConstants.DEFAULT_SMALL_COMPACTION_THREADS);
+ if (this.smallCompactions.getCorePoolSize() != smallThreads) {
+ LOG.info("Changing the value of " + HConstants.SMALL_COMPACTION_THREADS +
+ " from " + this.smallCompactions.getCorePoolSize() + " to " +
+ smallThreads);
+ this.smallCompactions.setMaximumPoolSize(smallThreads);
+ this.smallCompactions.setCorePoolSize(smallThreads);
+ }
+
+ this.conf = newConf;
+ }
+
+ /**
+ * Helper method for tests to check if the number of small compaction threads
+ * change on-the-fly.
+ *
+ * @return
+ */
+ protected int getSmallCompactionThreadNum() {
+ return this.smallCompactions.getCorePoolSize();
+ }
+
+ /**
+ * Helper method for tests to check if the number of large compaction threads
+ * change on-the-fly.
+ *
+ * @return
+ */
+ protected int getLargeCompactionThreadNum() {
+ return this.largeCompactions.getCorePoolSize();
+ }
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionConfiguration.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionConfiguration.java?rev=1496587&r1=1496586&r2=1496587&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionConfiguration.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionConfiguration.java Tue Jun 25 18:51:54 2013
@@ -47,43 +47,71 @@ public class CompactionConfiguration {
Configuration conf;
Store store;
- long maxCompactSize;
- long minCompactSize;
- boolean shouldExcludeBulk;
- int minFilesToCompact;
- int maxFilesToCompact;
- double compactionRatio;
- double offPeekCompactionRatio;
- int offPeakStartHour;
- int offPeakEndHour;
- long throttlePoint;
- boolean shouldDeleteExpired;
- long majorCompactionPeriod;
- float majorCompactionJitter;
+ /** Since all these properties can change online, they are volatile **/
+ volatile long maxCompactSize;
+ volatile long minCompactSize;
+ volatile boolean shouldExcludeBulk;
+ volatile int minFilesToCompact;
+ volatile int maxFilesToCompact;
+ volatile double compactionRatio;
+ volatile double offPeakCompactionRatio;
+ volatile int offPeakStartHour;
+ volatile int offPeakEndHour;
+ volatile long throttlePoint;
+ volatile boolean shouldDeleteExpired;
+ volatile long majorCompactionPeriod;
+ volatile float majorCompactionJitter;
+
+ /** Default values for the properties **/
+ static final long defaultMaxCompactSize = Long.MAX_VALUE;
+ static final boolean defaultShouldExcludeBulk = false;
+ static final int defaultMaxFilesToCompact = 10;
+ static final float defaultCompactionRatio = 1.2F;
+ static final float defaultOffPeakCompactionRatio = 5.0F;
+ static final int defaultOffPeakStartHour = -1;
+ static final int defaultOffPeakEndHour = -1;
+ static final boolean defaultShouldDeleteExpired = true;
+ static final long defaultMajorCompactionPeriod = 1000*60*60*24;
+ static final float defaultMajorCompactionJitter = 0.20F;
CompactionConfiguration(Configuration conf, Store store) {
this.conf = conf;
this.store = store;
- String strPrefix = "hbase.hstore.compaction.";
+ String strPrefix = HConstants.HSTORE_COMPACTION_PREFIX;
- maxCompactSize = conf.getLong(strPrefix + "max.size", Long.MAX_VALUE);
- minCompactSize = conf.getLong(strPrefix + "min.size", store.getHRegion().memstoreFlushSize);
- shouldExcludeBulk = conf.getBoolean(strPrefix + "exclude.bulk", false);
+ maxCompactSize = conf.getLong(strPrefix + "max.size", defaultMaxCompactSize);
+ minCompactSize = conf.getLong(strPrefix + "min.size",
+ store.getHRegion().memstoreFlushSize);
+ shouldExcludeBulk = conf.getBoolean(strPrefix + "exclude.bulk",
+ defaultShouldExcludeBulk);
minFilesToCompact = Math.max(2, conf.getInt(strPrefix + "min",
- /*old name*/ conf.getInt("hbase.hstore.compactionThreshold", HConstants.DEFAULT_MIN_FILES_TO_COMPACT)));
- maxFilesToCompact = conf.getInt(strPrefix + "max", 10);
- compactionRatio = conf.getFloat(strPrefix + "ratio", 1.2F);
- offPeekCompactionRatio = conf.getFloat(strPrefix + "ratio.offpeak", 5.0F);
-
- offPeakStartHour = conf.getInt("hbase.offpeak.start.hour", -1);
- offPeakEndHour = conf.getInt("hbase.offpeak.end.hour", -1);
-
- throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle",
- 2 * maxFilesToCompact * store.getHRegion().memstoreFlushSize);
- shouldDeleteExpired = conf.getBoolean("hbase.store.delete.expired.storefile", true);
- majorCompactionPeriod = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
- majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.20F);
+ /*old name*/ conf.getInt("hbase.hstore.compactionThreshold",
+ HConstants.DEFAULT_MIN_FILES_TO_COMPACT)));
+ maxFilesToCompact = conf.getInt(strPrefix + "max",
+ defaultMaxFilesToCompact);
+ compactionRatio = conf.getFloat(strPrefix + "ratio",
+ defaultCompactionRatio);
+ offPeakCompactionRatio = conf.getFloat(strPrefix + "ratio.offpeak",
+ defaultOffPeakCompactionRatio);
+
+ offPeakStartHour = conf.getInt("hbase.offpeak.start.hour",
+ defaultOffPeakStartHour);
+ offPeakEndHour = conf.getInt("hbase.offpeak.end.hour",
+ defaultOffPeakEndHour);
+
+ throttlePoint =
+ conf.getLong("hbase.regionserver.thread.compaction.throttle",
+ 2 * maxFilesToCompact * store.getHRegion().memstoreFlushSize);
+ shouldDeleteExpired =
+ conf.getBoolean("hbase.store.delete.expired.storefile",
+ defaultShouldDeleteExpired);
+ majorCompactionPeriod =
+ conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD,
+ defaultMajorCompactionPeriod);
+ majorCompactionJitter =
+ conf.getFloat("hbase.hregion.majorcompaction.jitter",
+ defaultMajorCompactionJitter);
}
/**
@@ -132,7 +160,7 @@ public class CompactionConfiguration {
* @return Off peak Ratio used for compaction
*/
double getCompactionRatioOffPeak() {
- return offPeekCompactionRatio;
+ return offPeakCompactionRatio;
}
/**
@@ -179,4 +207,148 @@ public class CompactionConfiguration {
return shouldDeleteExpired;
}
+ /**
+ * Update the compaction configuration, when an online change is made.
+ *
+ * @param newConf
+ */
+ protected void updateConfiguration(Configuration newConf) {
+ String strPrefix = HConstants.HSTORE_COMPACTION_PREFIX;
+
+ // Check if the compaction ratio has changed.
+ String compactionRatioStr = strPrefix + "ratio";
+ double newCompactionRatio = newConf.getFloat(compactionRatioStr,
+ defaultCompactionRatio);
+ if (newCompactionRatio != this.compactionRatio) {
+ LOG.info("Changing the value of " + compactionRatioStr + " from " +
+ this.compactionRatio + " to " + newCompactionRatio);
+ this.compactionRatio = newCompactionRatio;
+ }
+
+ // Check if the off peak compaction ratio has changed.
+ String offPeakCompactionRatioStr = strPrefix + "ratio.offpeak";
+ double newOffPeakCompactionRatio =
+ newConf.getFloat(offPeakCompactionRatioStr,
+ defaultOffPeakCompactionRatio);
+ if (newOffPeakCompactionRatio != this.offPeakCompactionRatio) {
+ LOG.info("Changing the value of " + offPeakCompactionRatioStr + " from " +
+ this.offPeakCompactionRatio + " to " + newOffPeakCompactionRatio);
+ this.offPeakCompactionRatio = newOffPeakCompactionRatio;
+ }
+
+ // Check if the throttle point has changed.
+ String throttlePointStr = "hbase.regionserver.thread.compaction.throttle";
+ long newThrottlePoint = newConf.getLong(throttlePointStr,
+ 2 * maxFilesToCompact * store.getHRegion().memstoreFlushSize);
+ if (newThrottlePoint != this.throttlePoint) {
+ LOG.info("Changing the value of " + throttlePointStr + " from " +
+ this.throttlePoint + " to " + newThrottlePoint);
+ this.throttlePoint = newThrottlePoint;
+ }
+
+ // Check if the minFilesToCompact has changed.
+ String minFilesToCompactStr = strPrefix + "min";
+ int newMinFilesToCompact = Math.max(2, newConf.getInt(minFilesToCompactStr,
+ /*old name*/ newConf.getInt("hbase.hstore.compactionThreshold",
+ HConstants.DEFAULT_MIN_FILES_TO_COMPACT)));
+ if (newMinFilesToCompact != this.minFilesToCompact) {
+ LOG.info("Changing the value of " + minFilesToCompactStr + " from " +
+ this.minFilesToCompact + " to " + newMinFilesToCompact);
+ this.minFilesToCompact = newMinFilesToCompact;
+ }
+
+ // Check if the maxFile to compact has changed.
+ String maxFilesToCompactStr = strPrefix + "max";
+ int newMaxFilesToCompact = newConf.getInt(maxFilesToCompactStr,
+ defaultMaxFilesToCompact);
+ if (newMaxFilesToCompact != this.maxFilesToCompact) {
+ LOG.info("Changing the value of " + maxFilesToCompactStr + " from " +
+ this.maxFilesToCompact + " to " + newMaxFilesToCompact);
+ this.maxFilesToCompact = newMaxFilesToCompact;
+ }
+
+ // Check if the Off Peak Start Hour has changed.
+ String offPeakStartHourStr = "hbase.offpeak.start.hour";
+ int newOffPeakStartHour = newConf.getInt(offPeakStartHourStr,
+ defaultOffPeakStartHour);
+ if (newOffPeakStartHour != this.offPeakStartHour) {
+ LOG.info("Changing the value of " + offPeakStartHourStr + " from " +
+ this.offPeakStartHour + " to " + newOffPeakStartHour);
+ this.offPeakStartHour = newOffPeakStartHour;
+ }
+
+ // Check if the Off Peak End Hour has changed.
+ String offPeakEndHourStr = "hbase.offpeak.end.hour";
+ int newOffPeakEndHour = newConf.getInt(offPeakEndHourStr,
+ defaultOffPeakEndHour);
+ if (newOffPeakEndHour != this.offPeakEndHour) {
+ LOG.info("Changing the value of " + offPeakEndHourStr + " from " +
+ this.offPeakEndHour + " to " + newOffPeakEndHour);
+ this.offPeakEndHour = newOffPeakEndHour;
+ }
+
+ // Check if the Min Compaction Size has changed
+ String minCompactSizeStr = strPrefix + "min.size";
+ long newMinCompactSize = newConf.getLong(minCompactSizeStr,
+ store.getHRegion().memstoreFlushSize);
+ if (newMinCompactSize != this.minCompactSize) {
+ LOG.info("Changing the value of " + minCompactSizeStr + " from " +
+ this.minCompactSize + " to " + newMinCompactSize);
+ this.minCompactSize = newMinCompactSize;
+ }
+
+ // Check if the Max Compaction Size has changed.
+ String maxCompactSizeStr = strPrefix + "max.size";
+ long newMaxCompactSize = newConf.getLong(maxCompactSizeStr,
+ defaultMaxCompactSize);
+ if (newMaxCompactSize != this.maxCompactSize) {
+ LOG.info("Changing the value of " + maxCompactSizeStr + " from " +
+ this.maxCompactSize + " to " + newMaxCompactSize);
+ this.maxCompactSize = newMaxCompactSize;
+ }
+
+ // Check if shouldExcludeBulk has changed.
+ String shouldExcludeBulkStr = strPrefix + "exclude.bulk";
+ boolean newShouldExcludeBulk = newConf.getBoolean(shouldExcludeBulkStr,
+ defaultShouldExcludeBulk);
+ if (newShouldExcludeBulk != this.shouldExcludeBulk) {
+ LOG.info("Changing the value of " + shouldExcludeBulkStr + " from " +
+ this.shouldExcludeBulk + " to " + newShouldExcludeBulk);
+ this.shouldExcludeBulk = newShouldExcludeBulk;
+ }
+
+ // Check if shouldDeleteExpired has changed.
+ String shouldDeleteExpiredStr = "hbase.store.delete.expired.storefile";
+ boolean newShouldDeleteExpired =
+ newConf.getBoolean(shouldDeleteExpiredStr,
+ defaultShouldDeleteExpired);
+ if (newShouldDeleteExpired != this.shouldDeleteExpired) {
+ LOG.info("Changing the value of " + shouldDeleteExpiredStr + " from " +
+ this.shouldDeleteExpired + " to " + newShouldDeleteExpired);
+ this.shouldDeleteExpired = newShouldDeleteExpired;
+ }
+
+ // Check if majorCompactionPeriod has changed.
+ long newMajorCompactionPeriod =
+ newConf.getLong(HConstants.MAJOR_COMPACTION_PERIOD,
+ defaultMajorCompactionPeriod);
+ if (newMajorCompactionPeriod != this.majorCompactionPeriod) {
+ LOG.info("Changing the value of " + HConstants.MAJOR_COMPACTION_PERIOD +
+ " from " + this.majorCompactionPeriod + " to " +
+ newMajorCompactionPeriod);
+ this.majorCompactionPeriod = newMajorCompactionPeriod;
+ }
+
+ // Check if majorCompactionJitter has changed.
+ String majorCompactionJitterStr = "hbase.hregion.majorcompaction.jitter";
+ float newMajorCompactionJitter =
+ newConf.getFloat(majorCompactionJitterStr,
+ defaultMajorCompactionJitter);
+ if (newMajorCompactionJitter != this.majorCompactionJitter) {
+ LOG.info("Changing the value of " + majorCompactionJitterStr + " from " +
+ this.majorCompactionJitter + " to " + newMajorCompactionJitter);
+ this.majorCompactionJitter = newMajorCompactionJitter;
+ }
+ this.conf = newConf;
+ }
}
\ No newline at end of file
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java?rev=1496587&r1=1496586&r2=1496587&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java Tue Jun 25 18:51:54 2013
@@ -50,6 +50,14 @@ public class CompactionManager {
}
/**
+ * Update the configuration when it changes on-the-fly.
+ * @param conf
+ */
+ protected void updateConfiguration(Configuration conf) {
+ comConf.updateConfiguration(conf);
+ }
+
+ /**
* @param candidateFiles candidate files, ordered from oldest to newest
* @return subset copy of candidate list that meets compaction criteria
* @throws java.io.IOException
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1496587&r1=1496586&r2=1496587&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Jun 25 18:51:54 2013
@@ -860,7 +860,10 @@ public class HRegion implements HeapSize
completionService
.submit(new Callable<ImmutableList<StoreFile>>() {
public ImmutableList<StoreFile> call() throws IOException {
- return store.close();
+ ImmutableList<StoreFile> result = store.close();
+ HRegionServer.configurationManager.
+ deregisterObserver(store);
+ return result;
}
});
}
@@ -2674,7 +2677,10 @@ public class HRegion implements HeapSize
protected Store instantiateHStore(Path tableDir, HColumnDescriptor c)
throws IOException {
- return new Store(tableDir, this, c, this.fs, this.conf);
+ Store store = new Store(tableDir, this, c, this.fs, this.conf);
+ // Register this store with the configuration manager.
+ HRegionServer.configurationManager.registerObserver(store);
+ return store;
}
/**
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1496587&r1=1496586&r2=1496587&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Jun 25 18:51:54 2013
@@ -377,7 +377,8 @@ public class HRegionServer implements HR
// This object lets classes register themselves to get notified on
// Configuration changes.
- public ConfigurationManager configurationManager;
+ public static final ConfigurationManager configurationManager =
+ new ConfigurationManager();
public static long getResponseSizeLimit() {
return responseSizeLimit;
@@ -411,7 +412,6 @@ public class HRegionServer implements HR
this.abortRequested = false;
this.fsOk = true;
this.conf = conf;
- this.configurationManager = new ConfigurationManager();
this.connection = ServerConnectionManager.getConnection(conf);
this.isOnline = false;
@@ -558,6 +558,8 @@ public class HRegionServer implements HR
// Compaction thread
this.compactSplitThread = new CompactSplitThread(this);
+ // Registering the compactSplitThread object with the ConfigurationManager.
+ configurationManager.registerObserver(this.compactSplitThread);
// Log rolling thread
int hlogCntPerServer = this.conf.getInt(HConstants.HLOG_CNT_PER_SERVER, 2);
@@ -1465,14 +1467,6 @@ public class HRegionServer implements HR
return isOnline;
}
- /**
- * Return the ConfigurationManager object.
- * @return
- */
- public ConfigurationManager getConfigurationManager() {
- return this.configurationManager;
- }
-
private void setupHLog(Path logDir, Path oldLogDir, int totalHLogCnt) throws IOException {
hlogs = new HLog[totalHLogCnt];
for (int i = 0; i < totalHLogCnt; i++) {
@@ -3829,12 +3823,9 @@ public class HRegionServer implements HR
*/
public void updateConfiguration() {
LOG.info("Reloading the configuration from disk.");
+ // Reload the configuration from disk.
conf.reloadConfiguration();
- // TODO @gauravm
- // Move this to the notifyOnChange() method in HRegionServer
- for (HRegion r : onlineRegions.values()) {
- r.updateConfiguration();
- }
+
// Notify all the observers that the configuration has changed.
configurationManager.notifyAllObservers(conf);
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1496587&r1=1496586&r2=1496587&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Jun 25 18:51:54 2013
@@ -44,6 +44,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.io.WriteOptions;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -101,7 +102,8 @@ import com.google.common.collect.Lists;
* <p>Locking and transactions are handled at a higher level. This API should
* not be called directly but by an HRegion manager.
*/
-public class Store extends SchemaConfigured implements HeapSize {
+public class Store extends SchemaConfigured implements HeapSize,
+ ConfigurationObserver {
static final Log LOG = LogFactory.getLog(Store.class);
protected final MemStore memstore;
// This stores directory in the filesystem.
@@ -110,7 +112,7 @@ public class Store extends SchemaConfigu
private final HColumnDescriptor family;
CompactionManager compactionManager;
final FileSystem fs;
- final Configuration conf;
+ Configuration conf;
final CacheConfig cacheConf;
// ttl in milliseconds.
protected long ttl;
@@ -1295,8 +1297,8 @@ public class Store extends SchemaConfigu
// we have to use a do/while loop.
ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
boolean hasMore;
- // Create the writer whether or not there are output KVs,
- // iff the maxSequenceID among the compaction candidates is
+ // Create the writer whether or not there are output KVs,
+ // iff the maxSequenceID among the compaction candidates is
// equal to the maxSequenceID among all the on-disk hfiles. [HBASE-7267]
if (maxCompactingSequcenceId == this.getMaxSequenceId(true)) {
writer = createWriterInTmp(maxKeyCount, compression, true);
@@ -1328,7 +1330,7 @@ public class Store extends SchemaConfigu
bytesWritten += kv.getLength();
if (bytesWritten > Store.closeCheckInterval) {
getSchemaMetrics().updatePersistentStoreMetric(
- SchemaMetrics.StoreMetricType.COMPACTION_WRITE_SIZE,
+ SchemaMetrics.StoreMetricType.COMPACTION_WRITE_SIZE,
bytesWritten);
bytesWritten = 0;
if (!this.region.areWritesEnabled()) {
@@ -2004,4 +2006,12 @@ public class Store extends SchemaConfigu
}
}
+ @Override
+ public void notifyOnChange(Configuration conf) {
+ this.conf = new CompoundConfiguration()
+ .add(conf)
+ .add(family.getValues());
+
+ compactionManager.updateConfiguration(conf);
+ }
}
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1496587&r1=1496586&r2=1496587&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Tue Jun 25 18:51:54 2013
@@ -911,6 +911,19 @@ public class HBaseTestingUtility {
}
/**
+ * Get the RegionServer which hosts a region with the given region name.
+ * @param regionName
+ * @return
+ */
+ public HRegionServer getRSWithRegion(byte[] regionName) {
+ int index = hbaseCluster.getServerWith(regionName);
+ if (index == -1) {
+ return null;
+ }
+ return hbaseCluster.getRegionServerThreads().get(index).getRegionServer();
+ }
+
+ /**
* Starts a <code>MiniMRCluster</code> with a default number of
* <code>TaskTracker</code>'s.
*
Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java?rev=1496587&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java Tue Jun 25 18:51:54 2013
@@ -0,0 +1,220 @@
+/**
+ * Copyright 2013 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+
+/**
+ * Verify that the Online Config Changes on the HRegionServer side are actually
+ * happening. We should add tests for important configurations which will be
+ * changed online.
+ */
+public class TestRegionServerOnlineConfigChange extends TestCase {
+ static final Log LOG =
+ LogFactory.getLog(TestRegionServerOnlineConfigChange.class.getName());
+ HBaseTestingUtility hbaseTestingUtility = new HBaseTestingUtility();
+ Configuration conf = null;
+
+ HTable t1 = null;
+ HRegionServer rs1 = null;
+ byte[] r1name = null;
+ HRegion r1 = null;
+
+ final String table1Str = "table1";
+ final String columnFamily1Str = "columnFamily1";
+ final byte[] TABLE1 = Bytes.toBytes(table1Str);
+ final byte[] COLUMN_FAMILY1 = Bytes.toBytes(columnFamily1Str);
+
+
+ @Override
+ public void setUp() throws Exception {
+ conf = hbaseTestingUtility.getConfiguration();
+ hbaseTestingUtility.startMiniCluster(1,1);
+ t1 = hbaseTestingUtility.createTable(TABLE1, COLUMN_FAMILY1);
+ HRegionInfo firstHRI = t1.getRegionsInfo().keySet().iterator().next();
+ r1name = firstHRI.getRegionName();
+ rs1 = hbaseTestingUtility.getRSWithRegion(r1name);
+ r1 = rs1.getRegion(r1name);
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ hbaseTestingUtility.shutdownMiniCluster();
+ }
+
+ /**
+ * Check if the number of compaction threads changes online
+ * @throws IOException
+ */
+ public void testNumCompactionThreadsOnlineChange() throws IOException {
+ assertTrue(rs1.compactSplitThread != null);
+ int newNumSmallThreads =
+ rs1.compactSplitThread.getSmallCompactionThreadNum() + 1;
+ int newNumLargeThreads =
+ rs1.compactSplitThread.getLargeCompactionThreadNum() + 1;
+
+ conf.setInt("hbase.regionserver.thread.compaction.small",
+ newNumSmallThreads);
+ conf.setInt("hbase.regionserver.thread.compaction.large",
+ newNumLargeThreads);
+ HRegionServer.configurationManager.notifyAllObservers(conf);
+
+ assertEquals(newNumSmallThreads,
+ rs1.compactSplitThread.getSmallCompactionThreadNum());
+ assertEquals(newNumLargeThreads,
+ rs1.compactSplitThread.getLargeCompactionThreadNum());
+ }
+
+ /**
+ * Test that the configurations in the CompactionConfiguration class change
+ * properly.
+ *
+ * @throws IOException
+ */
+ public void testCompactionConfigurationOnlineChange() throws IOException {
+ String strPrefix = HConstants.HSTORE_COMPACTION_PREFIX;
+ Store s = r1.getStore(COLUMN_FAMILY1);
+
+ // Set the new compaction ratio to a different value.
+ double newCompactionRatio =
+ s.compactionManager.comConf.getCompactionRatio() + 0.1;
+ conf.setFloat(strPrefix + "ratio", (float)newCompactionRatio);
+
+ // Notify all the observers, which includes the Store object.
+ HRegionServer.configurationManager.notifyAllObservers(conf);
+
+ // Check if the compaction ratio got updated in the Compaction Configuration
+ assertEquals(newCompactionRatio,
+ s.compactionManager.comConf.getCompactionRatio(),
+ 0.00001);
+
+ // Check if the off peak compaction ratio gets updated.
+ double newOffPeakCompactionRatio =
+ s.compactionManager.comConf.getCompactionRatioOffPeak() + 0.1;
+ conf.setFloat(strPrefix + "ratio.offpeak",
+ (float)newOffPeakCompactionRatio);
+ HRegionServer.configurationManager.notifyAllObservers(conf);
+ assertEquals(newOffPeakCompactionRatio,
+ s.compactionManager.comConf.getCompactionRatioOffPeak(),
+ 0.00001);
+
+ // Check if the throttle point gets updated.
+ long newThrottlePoint = s.compactionManager.comConf.getThrottlePoint() + 10;
+ conf.setLong("hbase.regionserver.thread.compaction.throttle",
+ newThrottlePoint);
+ HRegionServer.configurationManager.notifyAllObservers(conf);
+ assertEquals(newThrottlePoint,
+ s.compactionManager.comConf.getThrottlePoint());
+
+ // Check if the minFilesToCompact gets updated.
+ int newMinFilesToCompact =
+ s.compactionManager.comConf.getMinFilesToCompact() + 1;
+ conf.setLong(strPrefix + "min", newMinFilesToCompact);
+ HRegionServer.configurationManager.notifyAllObservers(conf);
+ assertEquals(newMinFilesToCompact,
+ s.compactionManager.comConf.getMinFilesToCompact());
+
+ // Check if the maxFilesToCompact gets updated.
+ int newMaxFilesToCompact =
+ s.compactionManager.comConf.getMaxFilesToCompact() + 1;
+ conf.setLong(strPrefix + "max", newMaxFilesToCompact);
+ HRegionServer.configurationManager.notifyAllObservers(conf);
+ assertEquals(newMaxFilesToCompact,
+ s.compactionManager.comConf.getMaxFilesToCompact());
+
+ // Check if the Off peak start hour gets updated.
+ int newOffPeakStartHour =
+ (s.compactionManager.comConf.getOffPeakStartHour() + 1) % 24;
+ conf.setLong("hbase.offpeak.start.hour", newOffPeakStartHour);
+ HRegionServer.configurationManager.notifyAllObservers(conf);
+ assertEquals(newOffPeakStartHour,
+ s.compactionManager.comConf.getOffPeakStartHour());
+
+ // Check if the Off peak end hour gets updated.
+ int newOffPeakEndHour =
+ (s.compactionManager.comConf.getOffPeakEndHour() + 1) % 24;
+ conf.setLong("hbase.offpeak.end.hour", newOffPeakEndHour);
+ HRegionServer.configurationManager.notifyAllObservers(conf);
+ assertEquals(newOffPeakEndHour,
+ s.compactionManager.comConf.getOffPeakEndHour());
+
+ // Check if the minCompactSize gets updated.
+ long newMinCompactSize =
+ s.compactionManager.comConf.getMinCompactSize() + 1;
+ conf.setLong(strPrefix + "min.size", newMinCompactSize);
+ HRegionServer.configurationManager.notifyAllObservers(conf);
+ assertEquals(newMinCompactSize,
+ s.compactionManager.comConf.getMinCompactSize());
+
+ // Check if the maxCompactSize gets updated.
+ long newMaxCompactSize =
+ s.compactionManager.comConf.getMaxCompactSize() - 1;
+ conf.setLong(strPrefix + "max.size", newMaxCompactSize);
+ HRegionServer.configurationManager.notifyAllObservers(conf);
+ assertEquals(newMaxCompactSize,
+ s.compactionManager.comConf.getMaxCompactSize());
+
+ // Check if shouldExcludeBulk gets updated.
+ boolean newShouldExcludeBulk =
+ !s.compactionManager.comConf.shouldExcludeBulk();
+ conf.setBoolean(strPrefix + "exclude.bulk", newShouldExcludeBulk);
+ HRegionServer.configurationManager.notifyAllObservers(conf);
+ assertEquals(newShouldExcludeBulk,
+ s.compactionManager.comConf.shouldExcludeBulk());
+
+ // Check if shouldDeleteExpired gets updated.
+ boolean newShouldDeleteExpired =
+ !s.compactionManager.comConf.shouldDeleteExpired();
+ conf.setBoolean("hbase.store.delete.expired.storefile",
+ newShouldDeleteExpired);
+ HRegionServer.configurationManager.notifyAllObservers(conf);
+ assertEquals(newShouldDeleteExpired,
+ s.compactionManager.comConf.shouldDeleteExpired());
+
+ // Check if majorCompactionPeriod gets updated.
+ long newMajorCompactionPeriod =
+ s.compactionManager.comConf.getMajorCompactionPeriod() + 10;
+ conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, newMajorCompactionPeriod);
+ HRegionServer.configurationManager.notifyAllObservers(conf);
+ assertEquals(newMajorCompactionPeriod,
+ s.compactionManager.comConf.getMajorCompactionPeriod());
+
+ // Check if majorCompactionJitter gets updated.
+ float newMajorCompactionJitter =
+ s.compactionManager.comConf.getMajorCompactionJitter() + 0.02F;
+ conf.setFloat("hbase.hregion.majorcompaction.jitter",
+ newMajorCompactionJitter);
+ HRegionServer.configurationManager.notifyAllObservers(conf);
+ assertEquals(newMajorCompactionJitter,
+ s.compactionManager.comConf.getMajorCompactionJitter(), 0.00001);
+ }
+}
+