You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2013/06/25 20:51:54 UTC

svn commit: r1496587 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/conf/ main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/ test/java/org/apache/hadoop/hbase/regio...

Author: liyin
Date: Tue Jun 25 18:51:54 2013
New Revision: 1496587

URL: http://svn.apache.org/r1496587
Log:
[HBASE-8805] [89-fb] Making number of compaction threads and the CompactionConfiguration class online configurable

Author: gauravm

Summary: Making the number of large/small compaction threads and the properties in the CompactionConfiguration class  online-configurable. Also, a minor change, making the configuration manager object in HRegionServer public static.

Test Plan:
1. Tried changing the configs on the dev cluster
2. Unit Tests
3. mvn test -Dtest=TestRegionServerOnlineConfigChange

Also generated coverage reports for this diff using the IntelliJ coverage tool, and the included unit test covers all the new code added.

Reviewers: liyintang, rshroff, manukranthk, aaiyer, shaneh

Reviewed By: liyintang

CC: hbase-eng@

Differential Revision: https://phabricator.fb.com/D847034

Task ID: 2258346

Added:
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionConfiguration.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1496587&r1=1496586&r2=1496587&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Tue Jun 25 18:51:54 2013
@@ -220,6 +220,30 @@ public final class HConstants {
   /** Default compaction manager class name. */
   public static final String DEFAULT_COMPACTION_MANAGER_CLASS = CompactionManager.class.getName();
 
+  /** Parameter name for the number of large compaction threads */
+  public static final String LARGE_COMPACTION_THREADS =
+          "hbase.regionserver.thread.compaction.large";
+
+  /** Default number of large compaction threads */
+  public static final int DEFAULT_LARGE_COMPACTION_THREADS = 1;
+
+  /** Parameter name for the number of large compaction threads */
+  public static final String SMALL_COMPACTION_THREADS =
+          "hbase.regionserver.thread.compaction.small";
+
+  /** Default number of small compaction threads */
+  public static final int DEFAULT_SMALL_COMPACTION_THREADS = 1;
+
+  /** Prefix for Compaction related configurations in Store */
+  public static final String HSTORE_COMPACTION_PREFIX =
+          "hbase.hstore.compaction.";
+
+  /** Parameter name for the number of split threads */
+  public static final String SPLIT_THREADS = "hbase.regionserver.thread.split";
+
+  /** Default number of split threads */
+  public static final int DEFAULT_SPLIT_THREADS = 1;
+
   /** Parameter name for what master implementation to use. */
   public static final String MASTER_IMPL = "hbase.master.impl";
 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationManager.java?rev=1496587&r1=1496586&r2=1496587&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/conf/ConfigurationManager.java Tue Jun 25 18:51:54 2013
@@ -29,8 +29,47 @@ import java.util.Set;
 import java.util.WeakHashMap;
 
 /**
- * Maintains a set of all the classes which are would like to get notified
- * when the Configuration is reloaded from disk.
+ * Maintains the set of all the classes which would like to get notified
+ * when the Configuration is reloaded from the disk in the Online Configuration
+ * Change mechanism, which lets you update certain configuration properties
+ * on-the-fly, without having to restart the cluster.
+ *
+ * If a class has configuration properties which you would like to be able to
+ * change on-the-fly, do the following:
+ * 1. Implement the {@link ConfigurationObserver} interface. This would require
+ *    you to implement the
+ *    {@link ConfigurationObserver#notifyOnChange(Configuration)}
+ *    method.  This is a callback that is used to notify your class' instance
+ *    that the configuration has changed. In this method, you need to check
+ *    if the new values for the properties that are of interest to your class
+ *    are different from the cached values. If yes, update them.
+ *
+ *    However, be careful with this. Certain properties might be trivially
+ *    mutable online, but others might not. Two properties might be trivially
+ *    mutable by themselves, but not when changed together. For example, if a
+ *    method uses properties "a" and "b" to make some decision, and is running
+ *    in parallel when the notifyOnChange() method updates "a", but hasn't
+ *    yet updated "b", it might make a decision on the basis of a new value of
+ *    "a", and an old value of "b". This might introduce subtle bugs. This
+ *    needs to be dealt on a case-by-case basis, and this class does not provide
+ *    any protection from such cases.
+ *
+ * 2. Register the appropriate instance of the class with the
+ *    {@link ConfigurationManager} instance, using the
+ *    {@link ConfigurationManager#registerObserver(ConfigurationObserver)}
+ *    method. For the RS side of things, the ConfigurationManager is a static
+ *    member of the {@link org.apache.hadoop.hbase.regionserver.HRegionServer}
+ *    class. Be careful not to do this in the constructor, as you might cause
+ *    the 'this' reference to escape. Use a factory method, or an initialize()
+ *    method which is called after the construction of the object.
+ *
+ * 3. Deregister the instance using the
+ *    {@link ConfigurationManager#deregisterObserver(ConfigurationObserver)}
+ *    method when it is going out of scope. In case you are not able to do that
+ *    for any reason, it is still okay, since entries for dead observers are
+ *    automatically collected during GC. But nonetheless, it is still a good
+ *    practice to deregister your observer, whenever possible.
+ *
  */
 public class ConfigurationManager {
   public static final Log LOG = LogFactory.getLog(ConfigurationManager.class);

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=1496587&r1=1496586&r2=1496587&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java Tue Jun 25 18:51:54 2013
@@ -30,15 +30,17 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 
 /**
  * Compact region on request and then run split if appropriate
  */
-public class CompactSplitThread {
+public class CompactSplitThread implements ConfigurationObserver {
   static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
 
   private final HRegionServer server;
-  private final Configuration conf;
+  private Configuration conf;
 
   private final ThreadPoolExecutor largeCompactions;
   private final ThreadPoolExecutor smallCompactions;
@@ -58,10 +60,12 @@ public class CompactSplitThread {
     Preconditions.checkArgument(this.server != null && this.conf != null);
 
     int largeThreads = Math.max(1, conf.getInt(
-        "hbase.regionserver.thread.compaction.large", 1));
-    int smallThreads = conf.getInt(
-        "hbase.regionserver.thread.compaction.small", 1);
-    int splitThreads = conf.getInt("hbase.regionserver.thread.split", 1);
+            HConstants.LARGE_COMPACTION_THREADS,
+            HConstants.DEFAULT_LARGE_COMPACTION_THREADS));
+    int smallThreads = conf.getInt(HConstants.SMALL_COMPACTION_THREADS,
+            HConstants.DEFAULT_SMALL_COMPACTION_THREADS);
+    int splitThreads = conf.getInt(HConstants.SPLIT_THREADS,
+            HConstants.DEFAULT_SPLIT_THREADS);
 
     Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
 
@@ -208,4 +212,55 @@ public class CompactSplitThread {
       size += smallCompactions.getQueue().size();
     return size;
   }
+
+  @Override
+  public void notifyOnChange(Configuration newConf) {
+    // Check if number of large / small compaction threads has changed, and then
+    // adjust the core pool size of the thread pools, by using the
+    // setCorePoolSize() method. According to the javadocs, it is safe to
+    // change the core pool size on-the-fly. We need to reset the maximum
+    // pool size, as well.
+    int largeThreads = Math.max(1, newConf.getInt(
+            HConstants.LARGE_COMPACTION_THREADS,
+            HConstants.DEFAULT_LARGE_COMPACTION_THREADS));
+    if (this.largeCompactions.getCorePoolSize() != largeThreads) {
+      LOG.info("Changing the value of " + HConstants.LARGE_COMPACTION_THREADS +
+              " from " + this.largeCompactions.getCorePoolSize() + " to " +
+              largeThreads);
+      this.largeCompactions.setMaximumPoolSize(largeThreads);
+      this.largeCompactions.setCorePoolSize(largeThreads);
+    }
+
+    int smallThreads = newConf.getInt(HConstants.SMALL_COMPACTION_THREADS,
+            HConstants.DEFAULT_SMALL_COMPACTION_THREADS);
+    if (this.smallCompactions.getCorePoolSize() != smallThreads) {
+      LOG.info("Changing the value of " + HConstants.SMALL_COMPACTION_THREADS +
+                " from " + this.smallCompactions.getCorePoolSize() + " to " +
+                smallThreads);
+      this.smallCompactions.setMaximumPoolSize(smallThreads);
+      this.smallCompactions.setCorePoolSize(smallThreads);
+    }
+
+    this.conf = newConf;
+  }
+
+  /**
+   * Helper method for tests to check if the number of small compaction threads
+   * change on-the-fly.
+   *
+   * @return
+   */
+  protected int getSmallCompactionThreadNum() {
+    return this.smallCompactions.getCorePoolSize();
+  }
+
+  /**
+   * Helper method for tests to check if the number of large compaction threads
+   * change on-the-fly.
+   *
+   * @return
+   */
+  protected int getLargeCompactionThreadNum() {
+    return this.largeCompactions.getCorePoolSize();
+  }
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionConfiguration.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionConfiguration.java?rev=1496587&r1=1496586&r2=1496587&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionConfiguration.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionConfiguration.java Tue Jun 25 18:51:54 2013
@@ -47,43 +47,71 @@ public class CompactionConfiguration {
   Configuration conf;
   Store store;
 
-  long maxCompactSize;
-  long minCompactSize;
-  boolean shouldExcludeBulk;
-  int minFilesToCompact;
-  int maxFilesToCompact;
-  double compactionRatio;
-  double offPeekCompactionRatio;
-  int offPeakStartHour;
-  int offPeakEndHour;
-  long throttlePoint;
-  boolean shouldDeleteExpired;
-  long majorCompactionPeriod;
-  float majorCompactionJitter;
+  /** Since all these properties can change online, they are volatile **/
+  volatile long maxCompactSize;
+  volatile long minCompactSize;
+  volatile boolean shouldExcludeBulk;
+  volatile int minFilesToCompact;
+  volatile int maxFilesToCompact;
+  volatile double compactionRatio;
+  volatile double offPeakCompactionRatio;
+  volatile int offPeakStartHour;
+  volatile int offPeakEndHour;
+  volatile long throttlePoint;
+  volatile boolean shouldDeleteExpired;
+  volatile long majorCompactionPeriod;
+  volatile float majorCompactionJitter;
+
+  /** Default values for the properties **/
+  static final long defaultMaxCompactSize = Long.MAX_VALUE;
+  static final boolean defaultShouldExcludeBulk = false;
+  static final int defaultMaxFilesToCompact = 10;
+  static final float defaultCompactionRatio = 1.2F;
+  static final float defaultOffPeakCompactionRatio = 5.0F;
+  static final int defaultOffPeakStartHour = -1;
+  static final int defaultOffPeakEndHour = -1;
+  static final boolean defaultShouldDeleteExpired = true;
+  static final long defaultMajorCompactionPeriod = 1000*60*60*24;
+  static final float defaultMajorCompactionJitter = 0.20F;
 
   CompactionConfiguration(Configuration conf, Store store) {
     this.conf = conf;
     this.store = store;
 
-    String strPrefix = "hbase.hstore.compaction.";
+    String strPrefix = HConstants.HSTORE_COMPACTION_PREFIX;
 
-    maxCompactSize = conf.getLong(strPrefix + "max.size", Long.MAX_VALUE);
-    minCompactSize = conf.getLong(strPrefix + "min.size", store.getHRegion().memstoreFlushSize);
-    shouldExcludeBulk = conf.getBoolean(strPrefix + "exclude.bulk", false);
+    maxCompactSize = conf.getLong(strPrefix + "max.size", defaultMaxCompactSize);
+    minCompactSize = conf.getLong(strPrefix + "min.size",
+            store.getHRegion().memstoreFlushSize);
+    shouldExcludeBulk = conf.getBoolean(strPrefix + "exclude.bulk",
+            defaultShouldExcludeBulk);
     minFilesToCompact = Math.max(2, conf.getInt(strPrefix + "min",
-          /*old name*/ conf.getInt("hbase.hstore.compactionThreshold", HConstants.DEFAULT_MIN_FILES_TO_COMPACT)));
-    maxFilesToCompact = conf.getInt(strPrefix + "max", 10);
-    compactionRatio = conf.getFloat(strPrefix + "ratio", 1.2F);
-    offPeekCompactionRatio = conf.getFloat(strPrefix + "ratio.offpeak", 5.0F);
-
-    offPeakStartHour = conf.getInt("hbase.offpeak.start.hour", -1);
-    offPeakEndHour = conf.getInt("hbase.offpeak.end.hour", -1);
-
-    throttlePoint =  conf.getLong("hbase.regionserver.thread.compaction.throttle",
-          2 * maxFilesToCompact * store.getHRegion().memstoreFlushSize);
-    shouldDeleteExpired = conf.getBoolean("hbase.store.delete.expired.storefile", true);
-    majorCompactionPeriod = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
-    majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.20F);
+          /*old name*/ conf.getInt("hbase.hstore.compactionThreshold",
+                                    HConstants.DEFAULT_MIN_FILES_TO_COMPACT)));
+    maxFilesToCompact = conf.getInt(strPrefix + "max",
+            defaultMaxFilesToCompact);
+    compactionRatio = conf.getFloat(strPrefix + "ratio",
+            defaultCompactionRatio);
+    offPeakCompactionRatio = conf.getFloat(strPrefix + "ratio.offpeak",
+            defaultOffPeakCompactionRatio);
+
+    offPeakStartHour = conf.getInt("hbase.offpeak.start.hour",
+            defaultOffPeakStartHour);
+    offPeakEndHour = conf.getInt("hbase.offpeak.end.hour",
+            defaultOffPeakEndHour);
+
+    throttlePoint =
+            conf.getLong("hbase.regionserver.thread.compaction.throttle",
+            2 * maxFilesToCompact * store.getHRegion().memstoreFlushSize);
+    shouldDeleteExpired =
+            conf.getBoolean("hbase.store.delete.expired.storefile",
+                            defaultShouldDeleteExpired);
+    majorCompactionPeriod =
+            conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD,
+                        defaultMajorCompactionPeriod);
+    majorCompactionJitter =
+            conf.getFloat("hbase.hregion.majorcompaction.jitter",
+                          defaultMajorCompactionJitter);
   }
 
   /**
@@ -132,7 +160,7 @@ public class CompactionConfiguration {
    * @return Off peak Ratio used for compaction
    */
   double getCompactionRatioOffPeak() {
-    return offPeekCompactionRatio;
+    return offPeakCompactionRatio;
   }
 
   /**
@@ -179,4 +207,148 @@ public class CompactionConfiguration {
     return shouldDeleteExpired;
   }
 
+  /**
+   * Update the compaction configuration, when an online change is made.
+   *
+   * @param newConf
+   */
+  protected void updateConfiguration(Configuration newConf) {
+    String strPrefix = HConstants.HSTORE_COMPACTION_PREFIX;
+
+    // Check if the compaction ratio has changed.
+    String compactionRatioStr = strPrefix + "ratio";
+    double newCompactionRatio = newConf.getFloat(compactionRatioStr,
+            defaultCompactionRatio);
+    if (newCompactionRatio != this.compactionRatio) {
+      LOG.info("Changing the value of " + compactionRatioStr + " from " +
+              this.compactionRatio + " to " + newCompactionRatio);
+      this.compactionRatio = newCompactionRatio;
+    }
+
+    // Check if the off peak compaction ratio has changed.
+    String offPeakCompactionRatioStr = strPrefix + "ratio.offpeak";
+    double newOffPeakCompactionRatio =
+            newConf.getFloat(offPeakCompactionRatioStr,
+                    defaultOffPeakCompactionRatio);
+    if (newOffPeakCompactionRatio != this.offPeakCompactionRatio) {
+      LOG.info("Changing the value of " + offPeakCompactionRatioStr + " from " +
+              this.offPeakCompactionRatio + " to " + newOffPeakCompactionRatio);
+      this.offPeakCompactionRatio = newOffPeakCompactionRatio;
+    }
+
+    // Check if the throttle point has changed.
+    String throttlePointStr = "hbase.regionserver.thread.compaction.throttle";
+    long newThrottlePoint = newConf.getLong(throttlePointStr,
+            2 * maxFilesToCompact * store.getHRegion().memstoreFlushSize);
+    if (newThrottlePoint != this.throttlePoint) {
+      LOG.info("Changing the value of " + throttlePointStr + " from " +
+              this.throttlePoint + " to " + newThrottlePoint);
+      this.throttlePoint = newThrottlePoint;
+    }
+
+    // Check if the minFilesToCompact has changed.
+    String minFilesToCompactStr = strPrefix + "min";
+    int newMinFilesToCompact = Math.max(2, newConf.getInt(minFilesToCompactStr,
+          /*old name*/ newConf.getInt("hbase.hstore.compactionThreshold",
+            HConstants.DEFAULT_MIN_FILES_TO_COMPACT)));
+    if (newMinFilesToCompact != this.minFilesToCompact) {
+      LOG.info("Changing the value of " + minFilesToCompactStr + " from " +
+              this.minFilesToCompact + " to " + newMinFilesToCompact);
+      this.minFilesToCompact = newMinFilesToCompact;
+    }
+
+    // Check if the maxFile to compact has changed.
+    String maxFilesToCompactStr = strPrefix + "max";
+    int newMaxFilesToCompact = newConf.getInt(maxFilesToCompactStr,
+            defaultMaxFilesToCompact);
+    if (newMaxFilesToCompact != this.maxFilesToCompact) {
+      LOG.info("Changing the value of " + maxFilesToCompactStr + " from " +
+              this.maxFilesToCompact + " to " + newMaxFilesToCompact);
+      this.maxFilesToCompact = newMaxFilesToCompact;
+    }
+
+    // Check if the Off Peak Start Hour has changed.
+    String offPeakStartHourStr = "hbase.offpeak.start.hour";
+    int newOffPeakStartHour = newConf.getInt(offPeakStartHourStr,
+            defaultOffPeakStartHour);
+    if (newOffPeakStartHour != this.offPeakStartHour) {
+      LOG.info("Changing the value of " + offPeakStartHourStr + " from " +
+              this.offPeakStartHour + " to " + newOffPeakStartHour);
+      this.offPeakStartHour = newOffPeakStartHour;
+    }
+
+    // Check if the Off Peak End Hour has changed.
+    String offPeakEndHourStr = "hbase.offpeak.end.hour";
+    int newOffPeakEndHour = newConf.getInt(offPeakEndHourStr,
+            defaultOffPeakEndHour);
+    if (newOffPeakEndHour != this.offPeakEndHour) {
+      LOG.info("Changing the value of " + offPeakEndHourStr + " from " +
+              this.offPeakEndHour + " to " + newOffPeakEndHour);
+      this.offPeakEndHour = newOffPeakEndHour;
+    }
+
+    // Check if the Min Compaction Size has changed
+    String minCompactSizeStr = strPrefix + "min.size";
+    long newMinCompactSize = newConf.getLong(minCompactSizeStr,
+            store.getHRegion().memstoreFlushSize);
+    if (newMinCompactSize != this.minCompactSize) {
+      LOG.info("Changing the value of " + minCompactSizeStr + " from " +
+              this.minCompactSize + " to " + newMinCompactSize);
+      this.minCompactSize = newMinCompactSize;
+    }
+
+    // Check if the Max Compaction Size has changed.
+    String maxCompactSizeStr = strPrefix + "max.size";
+    long newMaxCompactSize = newConf.getLong(maxCompactSizeStr,
+            defaultMaxCompactSize);
+    if (newMaxCompactSize != this.maxCompactSize) {
+      LOG.info("Changing the value of " + maxCompactSizeStr + " from " +
+              this.maxCompactSize + " to " + newMaxCompactSize);
+      this.maxCompactSize = newMaxCompactSize;
+    }
+
+    // Check if shouldExcludeBulk has changed.
+    String shouldExcludeBulkStr = strPrefix + "exclude.bulk";
+    boolean newShouldExcludeBulk = newConf.getBoolean(shouldExcludeBulkStr,
+            defaultShouldExcludeBulk);
+    if (newShouldExcludeBulk != this.shouldExcludeBulk) {
+      LOG.info("Changing the value of " + shouldExcludeBulkStr + " from " +
+              this.shouldExcludeBulk + " to " + newShouldExcludeBulk);
+      this.shouldExcludeBulk = newShouldExcludeBulk;
+    }
+
+    // Check if shouldDeleteExpired has changed.
+    String shouldDeleteExpiredStr = "hbase.store.delete.expired.storefile";
+    boolean newShouldDeleteExpired =
+            newConf.getBoolean(shouldDeleteExpiredStr,
+                    defaultShouldDeleteExpired);
+    if (newShouldDeleteExpired != this.shouldDeleteExpired) {
+      LOG.info("Changing the value of " + shouldDeleteExpiredStr + " from " +
+              this.shouldDeleteExpired + " to " + newShouldDeleteExpired);
+      this.shouldDeleteExpired = newShouldDeleteExpired;
+    }
+
+    // Check if majorCompactionPeriod has changed.
+    long newMajorCompactionPeriod =
+            newConf.getLong(HConstants.MAJOR_COMPACTION_PERIOD,
+                    defaultMajorCompactionPeriod);
+    if (newMajorCompactionPeriod != this.majorCompactionPeriod) {
+      LOG.info("Changing the value of " + HConstants.MAJOR_COMPACTION_PERIOD +
+              " from " + this.majorCompactionPeriod + " to " +
+              newMajorCompactionPeriod);
+      this.majorCompactionPeriod = newMajorCompactionPeriod;
+    }
+
+    // Check if majorCompactionJitter has changed.
+    String majorCompactionJitterStr = "hbase.hregion.majorcompaction.jitter";
+    float newMajorCompactionJitter =
+            newConf.getFloat(majorCompactionJitterStr,
+                    defaultMajorCompactionJitter);
+    if (newMajorCompactionJitter != this.majorCompactionJitter) {
+      LOG.info("Changing the value of " + majorCompactionJitterStr + " from " +
+               this.majorCompactionJitter + " to " + newMajorCompactionJitter);
+      this.majorCompactionJitter = newMajorCompactionJitter;
+    }
+    this.conf = newConf;
+  }
 }
\ No newline at end of file

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java?rev=1496587&r1=1496586&r2=1496587&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java Tue Jun 25 18:51:54 2013
@@ -50,6 +50,14 @@ public class CompactionManager {
   }
 
   /**
+   * Update the configuration when it changes on-the-fly.
+   * @param conf
+   */
+  protected void updateConfiguration(Configuration conf) {
+    comConf.updateConfiguration(conf);
+  }
+
+  /**
    * @param candidateFiles candidate files, ordered from oldest to newest
    * @return subset copy of candidate list that meets compaction criteria
    * @throws java.io.IOException

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1496587&r1=1496586&r2=1496587&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Jun 25 18:51:54 2013
@@ -860,7 +860,10 @@ public class HRegion implements HeapSize
               completionService
                   .submit(new Callable<ImmutableList<StoreFile>>() {
                     public ImmutableList<StoreFile> call() throws IOException {
-                      return store.close();
+                      ImmutableList<StoreFile> result = store.close();
+                      HRegionServer.configurationManager.
+                              deregisterObserver(store);
+                      return result;
                     }
                   });
             }
@@ -2674,7 +2677,10 @@ public class HRegion implements HeapSize
 
   protected Store instantiateHStore(Path tableDir, HColumnDescriptor c)
   throws IOException {
-    return new Store(tableDir, this, c, this.fs, this.conf);
+    Store store = new Store(tableDir, this, c, this.fs, this.conf);
+    // Register this store with the configuration manager.
+    HRegionServer.configurationManager.registerObserver(store);
+    return store;
   }
 
   /**

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1496587&r1=1496586&r2=1496587&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Jun 25 18:51:54 2013
@@ -377,7 +377,8 @@ public class HRegionServer implements HR
 
   // This object lets classes register themselves to get notified on
   // Configuration changes.
-  public ConfigurationManager configurationManager;
+  public static final ConfigurationManager configurationManager =
+          new ConfigurationManager();
 
   public static long getResponseSizeLimit() {
     return responseSizeLimit;
@@ -411,7 +412,6 @@ public class HRegionServer implements HR
     this.abortRequested = false;
     this.fsOk = true;
     this.conf = conf;
-    this.configurationManager = new ConfigurationManager();
     this.connection = ServerConnectionManager.getConnection(conf);
 
     this.isOnline = false;
@@ -558,6 +558,8 @@ public class HRegionServer implements HR
 
     // Compaction thread
     this.compactSplitThread = new CompactSplitThread(this);
+    // Registering the compactSplitThread object with the ConfigurationManager.
+    configurationManager.registerObserver(this.compactSplitThread);
 
     // Log rolling thread
     int hlogCntPerServer = this.conf.getInt(HConstants.HLOG_CNT_PER_SERVER, 2);
@@ -1465,14 +1467,6 @@ public class HRegionServer implements HR
     return isOnline;
   }
 
-  /**
-   * Return the ConfigurationManager object.
-   * @return
-   */
-  public ConfigurationManager getConfigurationManager() {
-    return this.configurationManager;
-  }
-
   private void setupHLog(Path logDir, Path oldLogDir, int totalHLogCnt) throws IOException {
     hlogs = new HLog[totalHLogCnt];
     for (int i = 0; i < totalHLogCnt; i++) { 
@@ -3829,12 +3823,9 @@ public class HRegionServer implements HR
    */
   public void updateConfiguration() {
     LOG.info("Reloading the configuration from disk.");
+    // Reload the configuration from disk.
     conf.reloadConfiguration();
-    // TODO @gauravm
-    // Move this to the notifyOnChange() method in HRegionServer
-    for (HRegion r : onlineRegions.values()) {
-      r.updateConfiguration();
-    }
+
     // Notify all the observers that the configuration has changed.
     configurationManager.notifyAllObservers(conf);
   }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1496587&r1=1496586&r2=1496587&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Jun 25 18:51:54 2013
@@ -44,6 +44,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.io.WriteOptions;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -101,7 +102,8 @@ import com.google.common.collect.Lists;
  * <p>Locking and transactions are handled at a higher level.  This API should
  * not be called directly but by an HRegion manager.
  */
-public class Store extends SchemaConfigured implements HeapSize {
+public class Store extends SchemaConfigured implements HeapSize,
+        ConfigurationObserver {
   static final Log LOG = LogFactory.getLog(Store.class);
   protected final MemStore memstore;
   // This stores directory in the filesystem.
@@ -110,7 +112,7 @@ public class Store extends SchemaConfigu
   private final HColumnDescriptor family;
   CompactionManager compactionManager;
   final FileSystem fs;
-  final Configuration conf;
+  Configuration conf;
   final CacheConfig cacheConf;
   // ttl in milliseconds.
   protected long ttl;
@@ -1295,8 +1297,8 @@ public class Store extends SchemaConfigu
         // we have to use a do/while loop.
         ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
         boolean hasMore;
-        // Create the writer whether or not there are output KVs, 
-        // iff the maxSequenceID among the compaction candidates is 
+        // Create the writer whether or not there are output KVs,
+        // iff the maxSequenceID among the compaction candidates is
         // equal to the maxSequenceID among all the on-disk hfiles. [HBASE-7267]
         if (maxCompactingSequcenceId == this.getMaxSequenceId(true)) {
           writer = createWriterInTmp(maxKeyCount, compression, true);
@@ -1328,7 +1330,7 @@ public class Store extends SchemaConfigu
                 bytesWritten += kv.getLength();
                 if (bytesWritten > Store.closeCheckInterval) {
                   getSchemaMetrics().updatePersistentStoreMetric(
-                    SchemaMetrics.StoreMetricType.COMPACTION_WRITE_SIZE, 
+                    SchemaMetrics.StoreMetricType.COMPACTION_WRITE_SIZE,
                     bytesWritten);
                   bytesWritten = 0;
                   if (!this.region.areWritesEnabled()) {
@@ -2004,4 +2006,12 @@ public class Store extends SchemaConfigu
    }
   }
 
+  @Override
+  public void notifyOnChange(Configuration conf) {
+    this.conf = new CompoundConfiguration()
+            .add(conf)
+            .add(family.getValues());
+
+    compactionManager.updateConfiguration(conf);
+  }
 }

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1496587&r1=1496586&r2=1496587&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Tue Jun 25 18:51:54 2013
@@ -911,6 +911,19 @@ public class HBaseTestingUtility {
   }
 
   /**
+   * Get the RegionServer which hosts a region with the given region name.
+   * @param regionName
+   * @return
+   */
+  public HRegionServer getRSWithRegion(byte[] regionName) {
+    int index = hbaseCluster.getServerWith(regionName);
+    if (index == -1) {
+      return null;
+    }
+    return hbaseCluster.getRegionServerThreads().get(index).getRegionServer();
+  }
+
+  /**
    * Starts a <code>MiniMRCluster</code> with a default number of
    * <code>TaskTracker</code>'s.
    *

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java?rev=1496587&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java Tue Jun 25 18:51:54 2013
@@ -0,0 +1,220 @@
+/**
+ * Copyright 2013 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+
+/**
+ * Verify that the Online Config Changes on the HRegionServer side are actually
+ * happening. We should add tests for important configurations which will be
+ * changed online.
+ */
+public class TestRegionServerOnlineConfigChange extends TestCase {
+  static final Log LOG =
+          LogFactory.getLog(TestRegionServerOnlineConfigChange.class.getName());
+  HBaseTestingUtility hbaseTestingUtility = new HBaseTestingUtility();
+  Configuration conf = null;
+
+  HTable t1 = null;
+  HRegionServer rs1 = null;
+  byte[] r1name = null;
+  HRegion r1 = null;
+
+  final String table1Str = "table1";
+  final String columnFamily1Str = "columnFamily1";
+  final byte[] TABLE1 = Bytes.toBytes(table1Str);
+  final byte[] COLUMN_FAMILY1 = Bytes.toBytes(columnFamily1Str);
+
+
+  @Override
+  public void setUp() throws Exception {
+    conf = hbaseTestingUtility.getConfiguration();
+    hbaseTestingUtility.startMiniCluster(1,1);
+    t1 = hbaseTestingUtility.createTable(TABLE1, COLUMN_FAMILY1);
+    HRegionInfo firstHRI = t1.getRegionsInfo().keySet().iterator().next();
+    r1name = firstHRI.getRegionName();
+    rs1 = hbaseTestingUtility.getRSWithRegion(r1name);
+    r1 = rs1.getRegion(r1name);
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    hbaseTestingUtility.shutdownMiniCluster();
+  }
+
+  /**
+   * Check if the number of compaction threads changes online
+   * @throws IOException
+   */
+  public void testNumCompactionThreadsOnlineChange() throws IOException {
+    assertTrue(rs1.compactSplitThread != null);
+    int newNumSmallThreads =
+            rs1.compactSplitThread.getSmallCompactionThreadNum() + 1;
+    int newNumLargeThreads =
+            rs1.compactSplitThread.getLargeCompactionThreadNum() + 1;
+
+    conf.setInt("hbase.regionserver.thread.compaction.small",
+            newNumSmallThreads);
+    conf.setInt("hbase.regionserver.thread.compaction.large",
+            newNumLargeThreads);
+    HRegionServer.configurationManager.notifyAllObservers(conf);
+
+    assertEquals(newNumSmallThreads,
+                  rs1.compactSplitThread.getSmallCompactionThreadNum());
+    assertEquals(newNumLargeThreads,
+                  rs1.compactSplitThread.getLargeCompactionThreadNum());
+  }
+
+  /**
+   * Test that the configurations in the CompactionConfiguration class change
+   * properly.
+   *
+   * @throws IOException
+   */
+  public void testCompactionConfigurationOnlineChange() throws IOException {
+    String strPrefix = HConstants.HSTORE_COMPACTION_PREFIX;
+    Store s = r1.getStore(COLUMN_FAMILY1);
+
+    // Set the new compaction ratio to a different value.
+    double newCompactionRatio =
+            s.compactionManager.comConf.getCompactionRatio() + 0.1;
+    conf.setFloat(strPrefix + "ratio", (float)newCompactionRatio);
+
+    // Notify all the observers, which includes the Store object.
+    HRegionServer.configurationManager.notifyAllObservers(conf);
+
+    // Check if the compaction ratio got updated in the Compaction Configuration
+    assertEquals(newCompactionRatio,
+                 s.compactionManager.comConf.getCompactionRatio(),
+                 0.00001);
+
+    // Check if the off peak compaction ratio gets updated.
+    double newOffPeakCompactionRatio =
+            s.compactionManager.comConf.getCompactionRatioOffPeak() + 0.1;
+    conf.setFloat(strPrefix + "ratio.offpeak",
+            (float)newOffPeakCompactionRatio);
+    HRegionServer.configurationManager.notifyAllObservers(conf);
+    assertEquals(newOffPeakCompactionRatio,
+                 s.compactionManager.comConf.getCompactionRatioOffPeak(),
+                 0.00001);
+
+    // Check if the throttle point gets updated.
+    long newThrottlePoint = s.compactionManager.comConf.getThrottlePoint() + 10;
+    conf.setLong("hbase.regionserver.thread.compaction.throttle",
+                  newThrottlePoint);
+    HRegionServer.configurationManager.notifyAllObservers(conf);
+    assertEquals(newThrottlePoint,
+                 s.compactionManager.comConf.getThrottlePoint());
+
+    // Check if the minFilesToCompact gets updated.
+    int newMinFilesToCompact =
+            s.compactionManager.comConf.getMinFilesToCompact() + 1;
+    conf.setLong(strPrefix + "min", newMinFilesToCompact);
+    HRegionServer.configurationManager.notifyAllObservers(conf);
+    assertEquals(newMinFilesToCompact,
+                 s.compactionManager.comConf.getMinFilesToCompact());
+
+    // Check if the maxFilesToCompact gets updated.
+    int newMaxFilesToCompact =
+            s.compactionManager.comConf.getMaxFilesToCompact() + 1;
+    conf.setLong(strPrefix + "max", newMaxFilesToCompact);
+    HRegionServer.configurationManager.notifyAllObservers(conf);
+    assertEquals(newMaxFilesToCompact,
+                 s.compactionManager.comConf.getMaxFilesToCompact());
+
+    // Check if the Off peak start hour gets updated.
+    int newOffPeakStartHour =
+            (s.compactionManager.comConf.getOffPeakStartHour() + 1) % 24;
+    conf.setLong("hbase.offpeak.start.hour", newOffPeakStartHour);
+    HRegionServer.configurationManager.notifyAllObservers(conf);
+    assertEquals(newOffPeakStartHour,
+            s.compactionManager.comConf.getOffPeakStartHour());
+
+    // Check if the Off peak end hour gets updated.
+    int newOffPeakEndHour =
+            (s.compactionManager.comConf.getOffPeakEndHour() + 1) % 24;
+    conf.setLong("hbase.offpeak.end.hour", newOffPeakEndHour);
+    HRegionServer.configurationManager.notifyAllObservers(conf);
+    assertEquals(newOffPeakEndHour,
+            s.compactionManager.comConf.getOffPeakEndHour());
+
+    // Check if the minCompactSize gets updated.
+    long newMinCompactSize =
+            s.compactionManager.comConf.getMinCompactSize() + 1;
+    conf.setLong(strPrefix + "min.size", newMinCompactSize);
+    HRegionServer.configurationManager.notifyAllObservers(conf);
+    assertEquals(newMinCompactSize,
+                 s.compactionManager.comConf.getMinCompactSize());
+
+    // Check if the maxCompactSize gets updated.
+    long newMaxCompactSize =
+            s.compactionManager.comConf.getMaxCompactSize() - 1;
+    conf.setLong(strPrefix + "max.size", newMaxCompactSize);
+    HRegionServer.configurationManager.notifyAllObservers(conf);
+    assertEquals(newMaxCompactSize,
+                 s.compactionManager.comConf.getMaxCompactSize());
+
+    // Check if shouldExcludeBulk gets updated.
+    boolean newShouldExcludeBulk =
+            !s.compactionManager.comConf.shouldExcludeBulk();
+    conf.setBoolean(strPrefix + "exclude.bulk", newShouldExcludeBulk);
+    HRegionServer.configurationManager.notifyAllObservers(conf);
+    assertEquals(newShouldExcludeBulk,
+            s.compactionManager.comConf.shouldExcludeBulk());
+
+    // Check if shouldDeleteExpired gets updated.
+    boolean newShouldDeleteExpired =
+            !s.compactionManager.comConf.shouldDeleteExpired();
+    conf.setBoolean("hbase.store.delete.expired.storefile",
+            newShouldDeleteExpired);
+    HRegionServer.configurationManager.notifyAllObservers(conf);
+    assertEquals(newShouldDeleteExpired,
+            s.compactionManager.comConf.shouldDeleteExpired());
+
+    // Check if majorCompactionPeriod gets updated.
+    long newMajorCompactionPeriod =
+            s.compactionManager.comConf.getMajorCompactionPeriod() + 10;
+    conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, newMajorCompactionPeriod);
+    HRegionServer.configurationManager.notifyAllObservers(conf);
+    assertEquals(newMajorCompactionPeriod,
+            s.compactionManager.comConf.getMajorCompactionPeriod());
+
+    // Check if majorCompactionJitter gets updated.
+    float newMajorCompactionJitter =
+            s.compactionManager.comConf.getMajorCompactionJitter() + 0.02F;
+    conf.setFloat("hbase.hregion.majorcompaction.jitter",
+                  newMajorCompactionJitter);
+    HRegionServer.configurationManager.notifyAllObservers(conf);
+    assertEquals(newMajorCompactionJitter,
+            s.compactionManager.comConf.getMajorCompactionJitter(), 0.00001);
+  }
+}
+