You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2013/11/28 19:13:26 UTC

svn commit: r1546428 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/regionserver/HRegion.java test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java

Author: liyin
Date: Thu Nov 28 18:13:26 2013
New Revision: 1546428

URL: http://svn.apache.org/r1546428
Log:
[HBASE-8805] Add online configeration supporting and corresponding testcases

Author: daviddeng

Summary: HRegion implements ConfigurationObserver, register/deregister from HRegionServer.configurationManager, testcases

Test Plan: Run testcase

Reviewers: gauravm, zelaine, liyintang

Reviewed By: liyintang

CC: mahesh, hbase-eng@

Differential Revision: https://phabricator.fb.com/D1067616

Task ID: 2977299

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1546428&r1=1546427&r2=1546428&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Nov 28 18:13:26 2013
@@ -26,11 +26,13 @@ import java.io.UnsupportedEncodingExcept
 import java.lang.reflect.Constructor;
 import java.net.InetSocketAddress;
 import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.util.AbstractList;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -53,8 +55,6 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.Date;
-import java.text.SimpleDateFormat;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -72,14 +72,15 @@ import org.apache.hadoop.hbase.HRegionIn
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.NotServingRegionException;
-import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.RowLock;
+import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.Reference.Range;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
@@ -143,7 +144,7 @@ import com.google.common.collect.Lists;
  * regionName is a unique identifier for this HRegion. (startKey, endKey]
  * defines the keyspace for this HRegion.
  */
-public class HRegion implements HeapSize {
+public class HRegion implements HeapSize, ConfigurationObserver {
   public static final Log LOG = LogFactory.getLog(HRegion.class);
   static final String SPLITDIR = "splits";
   static final String MERGEDIR = "merges";
@@ -274,7 +275,7 @@ public class HRegion implements HeapSize
   final long memstoreFlushSize;
   // The maximum size a column family's memstore can grow up to,
   // before being flushed.
-  final long columnfamilyMemstoreFlushSize;
+  volatile long columnfamilyMemstoreFlushSize;
   // Last flush time for each Store. Useful when we are flushing for each column
   private Map<Store, Long> lastStoreFlushTimeMap
     = new ConcurrentHashMap<Store, Long>();
@@ -534,15 +535,30 @@ public class HRegion implements HeapSize
     }
     this.disableWAL = regionInfo.getTableDesc().isWALDisabled();
     this.memstoreFlushSize = flushSize;
-    this.columnfamilyMemstoreFlushSize = conf.getLong(
-            HConstants.HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE,
-            HTableDescriptor.DEFAULT_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE);
     this.blockingMemStoreSize = (long)(this.memstoreFlushSize *
       conf.getFloat(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 2));
     this.waitOnMemstoreBlock =
         conf.getBoolean(HConstants.HREGION_MEMSTORE_WAIT_ON_BLOCK, true);
     this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
+    // initialize dynamic parameters with current configuration
+    this.loadDynamicConf(conf);
+  }
 
+  @Override
+  public void notifyOnChange(Configuration conf) {
+    LOG.info("Online configuration changed!");
+    this.loadDynamicConf(conf);
+  }
+  /**
+   * Load online configurable parameters from a specified Configuration
+   */
+  private void loadDynamicConf(Configuration conf) {
+    this.columnfamilyMemstoreFlushSize = conf.getLong(
+        HConstants.HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE,
+        HTableDescriptor.DEFAULT_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE);
+
+    LOG.info(String.format("columnfamilyMemstoreFlushSize is set to %d",
+        this.columnfamilyMemstoreFlushSize));
   }
 
   /**
@@ -563,6 +579,8 @@ public class HRegion implements HeapSize
    */
   public long initialize(final Progressable reporter)
   throws IOException {
+    HRegionServer.configurationManager.registerObserver(this);
+
     MonitoredTask status = TaskMonitor.get().createStatus(
         "Initializing region " + this);
     try {
@@ -813,6 +831,8 @@ public class HRegion implements HeapSize
    * @throws IOException e
    */
   public List<StoreFile> close(final boolean abort) throws IOException {
+    HRegionServer.configurationManager.deregisterObserver(this);
+
     MonitoredTask status = TaskMonitor.get().createStatus(
         "Closing region " + this + (abort ? " due to abort" : ""));
     if (isClosed()) {
@@ -4094,5 +4114,4 @@ public class HRegion implements HeapSize
       s.updateConfiguration();
     }
   }
-
 }

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java?rev=1546428&r1=1546427&r2=1546428&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java Thu Nov 28 18:13:26 2013
@@ -20,8 +20,13 @@
 
 package org.apache.hadoop.hbase;
 
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
 import junit.framework.Assert;
 import junit.framework.TestCase;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -36,10 +41,6 @@ import org.apache.hadoop.hbase.util.Byte
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-
 /**
  * This test verifies the correctness of the Per Column Family flushing strategy
  */
@@ -60,9 +61,8 @@ public class TestPerColumnFamilyFlush ex
   public static final byte[] FAMILY2 = families[1];
   public static final byte[] FAMILY3 = families[2];
 
-  private void initHRegion (String callingMethod,
-                            HBaseConfiguration conf)
-          throws IOException {
+  private void initHRegion (String callingMethod, Configuration conf)
+      throws IOException {
     HTableDescriptor htd = new HTableDescriptor(TABLENAME);
     for(byte [] family : families) {
       htd.addFamily(new HColumnDescriptor(family));
@@ -105,12 +105,12 @@ public class TestPerColumnFamilyFlush ex
   @Test
   public void testSelectiveFlushWhenEnabled() throws IOException {
     // Set up the configuration
-    HBaseConfiguration conf = new HBaseConfiguration();
+    Configuration conf = HBaseConfiguration.create();
     conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200*1024);
     conf.setBoolean(HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH, true);
     conf.setLong(HConstants.HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE, 100*1024);
 
-    // Intialize the HRegion
+    // Initialize the HRegion
     initHRegion(getName(), conf);
     // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
     for (int i = 1; i <= 1200; i++) {
@@ -247,15 +247,123 @@ public class TestPerColumnFamilyFlush ex
     Assert.assertEquals(0, region.getMemstoreSize().get());
   }
 
+  public void testSelectiveFlushWithThreshold(long t) throws IOException {
+    /*         t->|
+     * Phase 1    |     |     |
+     *            +-----+-----+
+     *           FM1   FM2   FM3
+     */
+    // flush all
+    region.flushcache(false);
+    region.put(createPut(2, 1));
+    region.put(createPut(3, 1));
+
+
+    int cnt = 0;
+    while (region.getStore(FAMILY1).getMemStoreSize() <= t) {
+      region.put(createPut(1, cnt));
+      cnt ++;
+    }
+
+    final long sizeFamily1_1 = region.getStore(FAMILY1).getMemStoreSize();
+    final long sizeFamily2_1 = region.getStore(FAMILY2).getMemStoreSize();
+    final long sizeFamily3_1 = region.getStore(FAMILY3).getMemStoreSize();
+
+    Assert.assertTrue(sizeFamily1_1 > t);
+    Assert.assertTrue(sizeFamily2_1 > 0);
+    Assert.assertTrue(sizeFamily3_1 > 0);
+
+    /*         t->
+     * Phase 2          |     |
+     *            +-----+-----+
+     *           FM1   FM2   FM3
+     */
+    // flush, should only flush family 1
+    region.flushcache(true);
+
+    final long sizeFamily1_2 = region.getStore(FAMILY1).getMemStoreSize();
+    final long sizeFamily2_2 = region.getStore(FAMILY2).getMemStoreSize();
+    final long sizeFamily3_2 = region.getStore(FAMILY3).getMemStoreSize();
+
+    // should clear FAMILY1 only
+    Assert.assertEquals("sizeFamily1", 0, sizeFamily1_2);
+    Assert.assertEquals("sizeFamily2", sizeFamily2_1, sizeFamily2_2);
+    Assert.assertEquals("sizeFamily3", sizeFamily3_1, sizeFamily3_2);
+    /*         t->,
+     * Phase 3    |     |     |
+     *            +-----+-----+
+     *           FM1   FM2   FM3
+     */
+    // flush all
+    region.flushcache(false);
+
+    region.put(createPut(2, 1));
+    region.put(createPut(3, 1));
+    // reduce cnt to 1 to make the size less than t
+    cnt--;
+    for (int i = 0; i < cnt; i ++) {
+      region.put(createPut(1, i));
+    }
+
+    final long sizeFamily1_3 = region.getStore(FAMILY1).getMemStoreSize();
+    final long sizeFamily2_3 = region.getStore(FAMILY2).getMemStoreSize();
+    final long sizeFamily3_3 = region.getStore(FAMILY3).getMemStoreSize();
+
+    Assert.assertTrue("sizeFamily1 < t", sizeFamily1_3 < t);
+    Assert.assertEquals("sizeFamily2", sizeFamily2_1, sizeFamily2_3);
+    Assert.assertEquals("sizeFamily3", sizeFamily3_1, sizeFamily3_3);
+    /*         t->
+     * Phase 3
+     *            +-----+-----+
+     *           FM1   FM2   FM3
+     */
+    // flush, should flush all
+    region.flushcache(true);
+
+    final long sizeFamily1_4 = region.getStore(FAMILY1).getMemStoreSize();
+    final long sizeFamily2_4 = region.getStore(FAMILY2).getMemStoreSize();
+    final long sizeFamily3_4 = region.getStore(FAMILY3).getMemStoreSize();
+    Assert.assertEquals("sizeFamily1", 0, sizeFamily1_4);
+    Assert.assertEquals("sizeFamily2", 0, sizeFamily2_4);
+    Assert.assertEquals("sizeFamily3", 0, sizeFamily3_4);
+  }
+
+  @Test
+  public void testConfigueChange() throws IOException {
+    final int T_init = 100*1024;
+    final int T_1 = 200*1024;
+    final int T_2 = 50*1024;
+
+    // Set up the configuration
+    Configuration conf = HBaseConfiguration.create();
+    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200*1024);
+    conf.setBoolean(HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH, true);
+    conf.setLong(HConstants.HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE, T_init);
+
+    // Initialize the HRegion
+    initHRegion(getName(), conf);
+
+    // test for threshold of T_init
+    this.testSelectiveFlushWithThreshold(T_init);
+    // test for threshold of T_1
+    conf.setLong(HConstants.HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE, T_1);
+    HRegionServer.configurationManager.notifyAllObservers(conf);
+    this.testSelectiveFlushWithThreshold(T_1);
+    // test for threshold of T_2
+    conf.setLong(HConstants.HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE, T_2);
+    HRegionServer.configurationManager.notifyAllObservers(conf);
+    this.testSelectiveFlushWithThreshold(T_2);
+  }
+
   @Test
   public void testSelectiveFlushWhenNotEnabled() throws IOException {
     // Set up the configuration
-    HBaseConfiguration conf = new HBaseConfiguration();
+    Configuration conf = HBaseConfiguration.create();
     conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024);
     conf.setBoolean(HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH, false);
     conf.setLong(HConstants.HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE, 100 * 1024);
 
-    // Intialize the HRegion
+    // Initialize the HRegion
     initHRegion(getName(), conf);
     // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
     for (int i = 1; i <= 1200; i++) {