You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/04/19 20:18:25 UTC

svn commit: r1588689 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/coprocessor/ main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/coprocessor/observers/

Author: liyin
Date: Sat Apr 19 18:18:24 2014
New Revision: 1588689

URL: http://svn.apache.org/r1588689
Log:
[HBASE-11035] Online configuration change for loaded coprocessors

Author: adela

Summary: trying out if online config works when new coprocessors are added, so far so good

Test Plan: added a unit test

Reviewers: daviddeng, elliott, gauravm

Reviewed By: daviddeng

CC: hbase-eng@

Differential Revision: https://phabricator.fb.com/D1278985

Task ID: 4146171

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorHost.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/observers/TestHRegionObserverBypassCoprocessor.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java?rev=1588689&r1=1588688&r2=1588689&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java Sat Apr 19 18:18:24 2014
@@ -15,17 +15,11 @@
 
 package org.apache.hadoop.hbase;
 
-import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-
-import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HTableInterface;
 
 /**
  * Coprocessor environment state.
  */
-@InterfaceAudience.Private
 public interface CoprocessorEnvironment {
 
   /** @return the Coprocessor interface version */

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java?rev=1588689&r1=1588688&r2=1588689&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java Sat Apr 19 18:18:24 2014
@@ -58,14 +58,17 @@ public abstract class CoprocessorHost<E 
       "hbase.coprocessor.user.region.classes";
   public static final String MASTER_COPROCESSOR_CONF_KEY =
       "hbase.coprocessor.master.classes";
+  public static final String REGION_COPROCESSOR_REMOVE_CONF_KEY =
+      "hbase.coprocessor.remove.classes";
   public static final String WAL_COPROCESSOR_CONF_KEY =
     "hbase.coprocessor.wal.classes";
   public static final String ABORT_ON_ERROR_KEY = "hbase.coprocessor.abortonerror";
   public static final boolean DEFAULT_ABORT_ON_ERROR = true;
 
+
   private static final Log LOG = LogFactory.getLog(CoprocessorHost.class);
   protected ThriftClientInterface tcInter;
-  /** Ordered set of loaded coprocessors with lock */
+  /** Ordered set of currently loaded coprocessors with lock */
   protected SortedSet<E> coprocessors =
       new SortedCopyOnWriteSet<E>(new EnvironmentPriorityComparator());
   protected Configuration conf;
@@ -85,10 +88,10 @@ public abstract class CoprocessorHost<E 
    * the intention is to preserve a history of all loaded coprocessors for
    * diagnosis in case of server crash (HBASE-4014).
    */
-  private static Set<String> coprocessorNames =
+  private static Set<String> everLoadedCoprocessorNames =
       Collections.synchronizedSet(new HashSet<String>());
   public static Set<String> getEverLoadedCoprocessors() {
-      return coprocessorNames;
+      return everLoadedCoprocessorNames;
   }
 
   /**
@@ -133,14 +136,24 @@ public abstract class CoprocessorHost<E 
         LOG.info("System coprocessor " + className + " was loaded " +
             "successfully with priority (" + priority++ + ").");
       } catch (Throwable t) {
-        // We always abort if system coprocessors cannot be loaded
-       //TODO: check if we want to not start regionserver here.. but probably not
+        LOG.error("Coprocessor " + className + "could not be loaded", t);
       }
     }
     coprocessors.addAll(configured);
   }
 
   /**
+   * Generally used when we do online configuration change for the loaded coprocessors
+   * @param conf
+   * @param confKey
+   */
+  protected void reloadSysCoprocessorsOnConfigChange(Configuration conf, String confKey) {
+    //remove whatever is loaded already
+    coprocessors.clear();
+    loadSystemCoprocessors(conf, confKey);
+  }
+
+  /**
    * Load a coprocessor implementation into the host
    * @param path path to implementation jar
    * @param className the main class name
@@ -221,7 +234,7 @@ public abstract class CoprocessorHost<E 
     }
     // HBASE-4014: maintain list of loaded coprocessors for later crash analysis
     // if server (master or regionserver) aborts.
-    coprocessorNames.add(implClass.getName());
+    everLoadedCoprocessorNames.add(implClass.getName());
     return env;
   }
 
@@ -491,6 +504,6 @@ public abstract class CoprocessorHost<E 
     public void close() throws Exception {
       Thread.currentThread().setContextClassLoader(currentLoader);
     }
-
   }
+
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorHost.java?rev=1588689&r1=1588688&r2=1588689&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorHost.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorHost.java Sat Apr 19 18:18:24 2014
@@ -39,9 +39,8 @@ import org.apache.hadoop.hbase.regionser
  * Implements the coprocessor environment and runtime support for coprocessors
  * loaded within a {@link HRegion}.
  */
-public class RegionCoprocessorHost
-    extends CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> {
-
+public class RegionCoprocessorHost extends
+    CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> {
   private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class);
   // The shared data map
   private static ReferenceMap sharedDataMap = new ReferenceMap(
@@ -72,6 +71,20 @@ public class RegionCoprocessorHost
     }
   }
 
+  /**
+   * Used mainly when we dynamically reload the configuration
+   */
+  public void reloadCoprocessors(Configuration newConf) {
+    // reload system default cp's from configuration.
+    reloadSysCoprocessorsOnConfigChange(newConf, REGION_COPROCESSOR_CONF_KEY);
+    // reload system default cp's for user tables from configuration.
+    //TODO: check whether this checks for ROOT too
+    if (!region.getRegionInfo().getTableDesc().isMetaRegion()
+        && !region.getRegionInfo().getTableDesc().isRootRegion()) {
+      reloadSysCoprocessorsOnConfigChange(newConf, USER_REGION_COPROCESSOR_CONF_KEY);
+    }
+  }
+
 
   @Override
   public RegionEnvironment createEnvironment(Class<?> implClass,

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1588689&r1=1588688&r2=1588689&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sat Apr 19 18:18:24 2014
@@ -547,10 +547,10 @@ public class HRegion implements HeapSize
     this.waitOnMemstoreBlock =
         conf.getBoolean(HConstants.HREGION_MEMSTORE_WAIT_ON_BLOCK, true);
     this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
-    // initialize dynamic parameters with current configuration
-    this.loadDynamicConf(conf);
     this.coprocessorHost = new RegionCoprocessorHost(this,
         this.conf);
+    // initialize dynamic parameters with current configuration
+    this.loadDynamicConf(conf);
   }
 
   @Override
@@ -574,6 +574,7 @@ public class HRegion implements HeapSize
     logIfChange("columnfamilyMemstoreFlushSize",
         this.columnfamilyMemstoreFlushSize, newColumnfamilyMemstoreFlushSize);
     this.columnfamilyMemstoreFlushSize = newColumnfamilyMemstoreFlushSize;
+    this.coprocessorHost.reloadCoprocessors(conf);
   }
 
   /**
@@ -4189,4 +4190,8 @@ public class HRegion implements HeapSize
     }
     return res;
   }
+
+  public RegionCoprocessorHost getCoprocessorHost() {
+    return coprocessorHost;
+  }
 }

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/observers/TestHRegionObserverBypassCoprocessor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/observers/TestHRegionObserverBypassCoprocessor.java?rev=1588689&r1=1588688&r2=1588689&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/observers/TestHRegionObserverBypassCoprocessor.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/observers/TestHRegionObserverBypassCoprocessor.java Sat Apr 19 18:18:24 2014
@@ -2,8 +2,11 @@ package org.apache.hadoop.hbase.coproces
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import junit.framework.Assert;
 
@@ -17,7 +20,10 @@ import org.apache.hadoop.hbase.client.HT
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.environments.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.AfterClass;
@@ -25,15 +31,23 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+/**
+ * Testing basic coprocessor behavior
+ *
+ */
 public class TestHRegionObserverBypassCoprocessor {
 
   private static HBaseTestingUtility util;
   private static final byte[] tableName = Bytes.toBytes("test");
-  private static final byte[] dummy = Bytes.toBytes("dummy");
-  private static final byte[] row1 = Bytes.toBytes("r1");
-  private static final byte[] row2 = Bytes.toBytes("r2");
-  private static final byte[] row3 = Bytes.toBytes("r3");
-  private static final byte[] test = Bytes.toBytes("test");
+  private static final int MAX_ROWS = 6;
+  private static final List<byte[]> rows = new ArrayList<>();
+  static {
+    for (int i=0; i< MAX_ROWS; i++){
+      rows.add(Bytes.toBytes("row" + i));
+    }
+  }
+  private static final byte[] DUMMY = Bytes.toBytes("dummy");
+  private static final byte[] TEST = Bytes.toBytes("test");
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
@@ -58,7 +72,7 @@ public class TestHRegionObserverBypassCo
       }
       admin.deleteTable(tableName);
     }
-    util.createTable(tableName, new byte[][] {dummy, test});
+    util.createTable(tableName, new byte[][] {DUMMY, TEST});
   }
 
   /**
@@ -71,36 +85,188 @@ public class TestHRegionObserverBypassCo
   public void testSimple() throws Exception {
     HTable t = new HTable(util.getConfiguration(), tableName);
     List<Put> puts = new ArrayList<Put>();
-    Put p = new Put(row1);
-    p.add(test,dummy,dummy);
-    p.add(dummy, dummy, dummy);
+    Put p = new Put(rows.get(0));
+    p.add(TEST,DUMMY,DUMMY);
+    p.add(DUMMY, DUMMY, DUMMY);
     puts.add(p);
 
-    p = new Put(row2);
-    p.add(dummy, dummy, dummy);
+    p = new Put(rows.get(1));
+    p.add(DUMMY, DUMMY, DUMMY);
     puts.add(p);
 
-    p = new Put(row3);
-    p.add(dummy, dummy, dummy);
+    p = new Put(rows.get(2));
+    p.add(DUMMY, DUMMY, DUMMY);
     puts.add(p);
 
     t.put(puts);
 
-    Result r = t.get(new Get(row1));
+    Result r = t.get(new Get(rows.get(0)));
     Assert.assertTrue(
         "There should be zero results since the put contains \"test\" CF",
         r.isEmpty());
 
-    r = t.get(new Get(row2));
+    r = t.get(new Get(rows.get(1)));
     Assert.assertEquals(1, r.getKvs().size());
 
-    r = t.get(new Get(row3));
+    r = t.get(new Get(rows.get(2)));
     Assert.assertEquals(1, r.getKvs().size());
-
     t.close();
   }
 
   /**
+   * Scenario this test case is testing
+   * <ul>
+   * <li>Check whether the first coprocessor is running</li>
+   * <li>Change the configuration such that a second coprocessor is added and
+   * check whether both of them are in the list of loaded coprocessors in each
+   * region</li>
+   * <li> test wheteher both of them are functioning right </li>
+   * <li> remove both coprocessors and confirm regions are not running them</li>
+   * <li> remove there is no action taken by any coprocessor since they are unloaded</li>
+   * <li> add back the first coprocessor and confirm regions have it and it is functioning fine </li>
+   * </ul>
+   * @throws Exception
+   */
+  @Test
+  public void testDynamicLoadingOfCoprocessorsInRegionCoprocessorHost() throws Exception {
+    HTable table = new HTable(util.getConfiguration(), tableName);
+    Set<String> allCoprocessors = RegionCoprocessorHost
+        .getEverLoadedCoprocessors();
+    Assert.assertEquals("There should be only one coprocessor everloaded", 1, allCoprocessors.size());
+    Assert
+        .assertEquals(
+            "Expected loaded coprocessor is different from one which is currently loaded",
+            TestCoprocessor.class.getName(), allCoprocessors.toArray()[0]);
+    Configuration conf = util.getConfiguration();
+    Assert.assertEquals(TestCoprocessor.class.getName(),
+        conf.getStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY)[0]);
+    conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
+        TestCoprocessor2.class.getName(), TestCoprocessor.class.getName());
+    System.out.println("conf strings: "
+        + Arrays.toString(conf
+            .getStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY)));
+
+    // invoke online configuration change
+    HRegionServer.configurationManager.notifyAllObservers(conf);
+
+    // check if the coprocessor is in list of everloaded coprocessors
+    Set<String> expectedCoprocessors = new HashSet<>(2);
+    expectedCoprocessors.add(TestCoprocessor.class.getName());
+    expectedCoprocessors.add(TestCoprocessor2.class.getName());
+    Assert.assertEquals("Mismatch in expected loaded coprocessors", expectedCoprocessors,
+        RegionCoprocessorHost.getEverLoadedCoprocessors());
+
+    // check if both new and old coprocessors are in the list of current
+    // coprocessors in each region
+    List<HRegion> regions = util.getMiniHBaseCluster().getRegions(tableName);
+    Set<String> expectedCoprocessorSimpleName = new HashSet<>();
+    expectedCoprocessorSimpleName.add(TestCoprocessor.class.getSimpleName());
+    expectedCoprocessorSimpleName.add(TestCoprocessor2.class.getSimpleName());
+    for (HRegion region : regions) {
+      Assert.assertEquals("Mismatch in expected loaded coprocessors on region"
+          + region.getRegionNameAsString(), expectedCoprocessorSimpleName,
+          region.getCoprocessorHost().getCoprocessors());
+    }
+
+    //now do some puts and check if both of them are working (none of the puts should go in)
+    createPutAndVerifyObserverCorrectness(table, rows.get(0), rows.get(1), true, true);
+
+    //now remove them both
+    conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, "");
+    System.out.println("conf strings: "
+        + Arrays.toString(conf
+            .getStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY)));
+
+    // invoke online configuration change
+    HRegionServer.configurationManager.notifyAllObservers(conf);
+
+    // check if both of them are still in list of everloaded coprocessors
+    Assert.assertEquals("Mismatch in expected loaded coprocessors", expectedCoprocessors,
+        RegionCoprocessorHost.getEverLoadedCoprocessors());
+
+    // none of the coprocessors should be loaded in the regions
+    for (HRegion region : regions) {
+      System.out.println(region.getCoprocessorHost().getCoprocessors());
+      Assert.assertTrue("Region should not contain any coprocessors", region
+          .getCoprocessorHost().getCoprocessors().isEmpty());
+    }
+
+    //now do some puts and check if both of them are working (all puts should go in)
+    createPutAndVerifyObserverCorrectness(table, rows.get(2), rows.get(3), false, false);
+
+    //add back the first one
+    conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, TestCoprocessor.class.getName());
+    System.out.println("conf strings: "
+        + Arrays.toString(conf
+            .getStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY)));
+
+    // invoke online configuration change
+    HRegionServer.configurationManager.notifyAllObservers(conf);
+
+    //confirm it is the only one on the regions
+    expectedCoprocessorSimpleName.remove(TestCoprocessor2.class.getSimpleName());
+    for (HRegion region : regions) {
+      Assert.assertEquals("Mismatch in expected loaded coprocessors on region",
+          expectedCoprocessorSimpleName, region.getCoprocessorHost()
+              .getCoprocessors());
+    }
+
+    // check if both of them are still in list of everloaded coprocessors (this should not change!)
+    Assert.assertEquals("Mismatch in expected loaded coprocessors", expectedCoprocessors,
+        RegionCoprocessorHost.getEverLoadedCoprocessors());
+
+    //now do some puts and check if only the first one is working
+    createPutAndVerifyObserverCorrectness(table, rows.get(4), rows.get(5), true, false);
+  }
+
+  /**
+   * Create two puts one with cf dummy and one with cf test and make sure
+   * coprocessors are functioning fine when we do put
+   *
+   * @param table
+   *          - Htable instance
+   * @param testOn
+   *          - whether {@link TestCoprocessor} is enabled
+   * @param dummyOn
+   *          - whether {@link TestCoprocessor2} is enabled
+   * @throws Exception
+   */
+  public void createPutAndVerifyObserverCorrectness(HTable table, byte[] row1, byte[] row2,
+      boolean testOn, boolean dummyOn) throws Exception {
+    List<Put> puts = new ArrayList<>();
+    Put p = new Put(row1);
+    p.add(TEST, DUMMY, DUMMY);
+    puts.add(p);
+
+    p = new Put(row2);
+    p.add(DUMMY, DUMMY, DUMMY);
+    puts.add(p);
+    table.put(puts);
+    for (Put put : puts) {
+      Result r = table.get(new Get(put.getRow()));
+      List<KeyValue> kvs = put.getFamilyMap().get(TEST);
+
+      // both puts will have one col family each. This is to check if the put
+      // has column family called "test"
+      boolean hasFamilyTest = (kvs!= null && !kvs.isEmpty());
+      if (testOn && hasFamilyTest) {
+        Assert.assertTrue(
+            "There should be zero results since the put contains \"test\" CF",
+            r.isEmpty());
+      } else if (!testOn &&  hasFamilyTest){
+        Assert.assertEquals("There should be one result", 1, r.getKvs().size());
+      }
+      if (dummyOn && !hasFamilyTest) {
+        Assert.assertTrue(
+            "There should be zero results since the put contains \"dummy\" CF",
+            r.isEmpty());
+      } else if (!dummyOn && !hasFamilyTest){
+        Assert.assertEquals("There should be one result", 1, r.getKvs().size());
+      }
+    }
+  }
+
+  /**
    * Dummy coprocessor which skips put containing "test" as a column family
    *
    */
@@ -110,11 +276,27 @@ public class TestHRegionObserverBypassCo
         final Put put, final WALEdit edit, final boolean durability)
         throws IOException {
       Map<byte[], List<KeyValue>> familyMap = put.getFamilyMap();
-      if (familyMap.containsKey(test)) {
+      if (familyMap.containsKey(TEST)) {
         e.bypass();
         System.out.println("bypassing put: " + put);
       }
     }
   }
 
+  /**
+   * Dummy coprocessor which skips put containing "dummy" as a column family
+   *
+   */
+  public static class TestCoprocessor2 extends BaseRegionObserver {
+    @Override
+    public void prePut(ObserverContext<RegionCoprocessorEnvironment> c,
+        Put put, WALEdit edit, boolean writeToWAL) throws IOException {
+      Map<byte[], List<KeyValue>> familyMap = put.getFamilyMap();
+      if (familyMap.containsKey(DUMMY)) {
+        c.bypass();
+        System.out.println("bypassing put: " + put);
+      }
+    }
+  }
+
 }