You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/04/19 20:18:25 UTC
svn commit: r1588689 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/
main/java/org/apache/hadoop/hbase/coprocessor/
main/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/coprocessor/observers/
Author: liyin
Date: Sat Apr 19 18:18:24 2014
New Revision: 1588689
URL: http://svn.apache.org/r1588689
Log:
[HBASE-11035] Online configuration change for loaded coprocessors
Author: adela
Summary: trying out if online config works when new coprocessors are added, so far so good
Test Plan: added a unit test
Reviewers: daviddeng, elliott, gauravm
Reviewed By: daviddeng
CC: hbase-eng@
Differential Revision: https://phabricator.fb.com/D1278985
Task ID: 4146171
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorHost.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/observers/TestHRegionObserverBypassCoprocessor.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java?rev=1588689&r1=1588688&r2=1588689&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java Sat Apr 19 18:18:24 2014
@@ -15,17 +15,11 @@
package org.apache.hadoop.hbase;
-import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-
-import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HTableInterface;
/**
* Coprocessor environment state.
*/
-@InterfaceAudience.Private
public interface CoprocessorEnvironment {
/** @return the Coprocessor interface version */
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java?rev=1588689&r1=1588688&r2=1588689&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java Sat Apr 19 18:18:24 2014
@@ -58,14 +58,17 @@ public abstract class CoprocessorHost<E
"hbase.coprocessor.user.region.classes";
public static final String MASTER_COPROCESSOR_CONF_KEY =
"hbase.coprocessor.master.classes";
+ public static final String REGION_COPROCESSOR_REMOVE_CONF_KEY =
+ "hbase.coprocessor.remove.classes";
public static final String WAL_COPROCESSOR_CONF_KEY =
"hbase.coprocessor.wal.classes";
public static final String ABORT_ON_ERROR_KEY = "hbase.coprocessor.abortonerror";
public static final boolean DEFAULT_ABORT_ON_ERROR = true;
+
private static final Log LOG = LogFactory.getLog(CoprocessorHost.class);
protected ThriftClientInterface tcInter;
- /** Ordered set of loaded coprocessors with lock */
+ /** Ordered set of currently loaded coprocessors with lock */
protected SortedSet<E> coprocessors =
new SortedCopyOnWriteSet<E>(new EnvironmentPriorityComparator());
protected Configuration conf;
@@ -85,10 +88,10 @@ public abstract class CoprocessorHost<E
* the intention is to preserve a history of all loaded coprocessors for
* diagnosis in case of server crash (HBASE-4014).
*/
- private static Set<String> coprocessorNames =
+ private static Set<String> everLoadedCoprocessorNames =
Collections.synchronizedSet(new HashSet<String>());
public static Set<String> getEverLoadedCoprocessors() {
- return coprocessorNames;
+ return everLoadedCoprocessorNames;
}
/**
@@ -133,14 +136,24 @@ public abstract class CoprocessorHost<E
LOG.info("System coprocessor " + className + " was loaded " +
"successfully with priority (" + priority++ + ").");
} catch (Throwable t) {
- // We always abort if system coprocessors cannot be loaded
- //TODO: check if we want to not start regionserver here.. but probably not
+ LOG.error("Coprocessor " + className + "could not be loaded", t);
}
}
coprocessors.addAll(configured);
}
/**
+ * Generally used when we do online configuration change for the loaded coprocessors
+ * @param conf
+ * @param confKey
+ */
+ protected void reloadSysCoprocessorsOnConfigChange(Configuration conf, String confKey) {
+ //remove whatever is loaded already
+ coprocessors.clear();
+ loadSystemCoprocessors(conf, confKey);
+ }
+
+ /**
* Load a coprocessor implementation into the host
* @param path path to implementation jar
* @param className the main class name
@@ -221,7 +234,7 @@ public abstract class CoprocessorHost<E
}
// HBASE-4014: maintain list of loaded coprocessors for later crash analysis
// if server (master or regionserver) aborts.
- coprocessorNames.add(implClass.getName());
+ everLoadedCoprocessorNames.add(implClass.getName());
return env;
}
@@ -491,6 +504,6 @@ public abstract class CoprocessorHost<E
public void close() throws Exception {
Thread.currentThread().setContextClassLoader(currentLoader);
}
-
}
+
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorHost.java?rev=1588689&r1=1588688&r2=1588689&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorHost.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorHost.java Sat Apr 19 18:18:24 2014
@@ -39,9 +39,8 @@ import org.apache.hadoop.hbase.regionser
* Implements the coprocessor environment and runtime support for coprocessors
* loaded within a {@link HRegion}.
*/
-public class RegionCoprocessorHost
- extends CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> {
-
+public class RegionCoprocessorHost extends
+ CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> {
private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class);
// The shared data map
private static ReferenceMap sharedDataMap = new ReferenceMap(
@@ -72,6 +71,20 @@ public class RegionCoprocessorHost
}
}
+ /**
+ * Used mainly when we dynamically reload the configuration
+ */
+ public void reloadCoprocessors(Configuration newConf) {
+ // reload system default cp's from configuration.
+ reloadSysCoprocessorsOnConfigChange(newConf, REGION_COPROCESSOR_CONF_KEY);
+ // reload system default cp's for user tables from configuration.
+ //TODO: check whether this checks for ROOT too
+ if (!region.getRegionInfo().getTableDesc().isMetaRegion()
+ && !region.getRegionInfo().getTableDesc().isRootRegion()) {
+ reloadSysCoprocessorsOnConfigChange(newConf, USER_REGION_COPROCESSOR_CONF_KEY);
+ }
+ }
+
@Override
public RegionEnvironment createEnvironment(Class<?> implClass,
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1588689&r1=1588688&r2=1588689&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sat Apr 19 18:18:24 2014
@@ -547,10 +547,10 @@ public class HRegion implements HeapSize
this.waitOnMemstoreBlock =
conf.getBoolean(HConstants.HREGION_MEMSTORE_WAIT_ON_BLOCK, true);
this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
- // initialize dynamic parameters with current configuration
- this.loadDynamicConf(conf);
this.coprocessorHost = new RegionCoprocessorHost(this,
this.conf);
+ // initialize dynamic parameters with current configuration
+ this.loadDynamicConf(conf);
}
@Override
@@ -574,6 +574,7 @@ public class HRegion implements HeapSize
logIfChange("columnfamilyMemstoreFlushSize",
this.columnfamilyMemstoreFlushSize, newColumnfamilyMemstoreFlushSize);
this.columnfamilyMemstoreFlushSize = newColumnfamilyMemstoreFlushSize;
+ this.coprocessorHost.reloadCoprocessors(conf);
}
/**
@@ -4189,4 +4190,8 @@ public class HRegion implements HeapSize
}
return res;
}
+
+ public RegionCoprocessorHost getCoprocessorHost() {
+ return coprocessorHost;
+ }
}
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/observers/TestHRegionObserverBypassCoprocessor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/observers/TestHRegionObserverBypassCoprocessor.java?rev=1588689&r1=1588688&r2=1588689&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/observers/TestHRegionObserverBypassCoprocessor.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/observers/TestHRegionObserverBypassCoprocessor.java Sat Apr 19 18:18:24 2014
@@ -2,8 +2,11 @@ package org.apache.hadoop.hbase.coproces
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import junit.framework.Assert;
@@ -17,7 +20,10 @@ import org.apache.hadoop.hbase.client.HT
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.environments.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
@@ -25,15 +31,23 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+/**
+ * Testing basic coprocessor behavior
+ *
+ */
public class TestHRegionObserverBypassCoprocessor {
private static HBaseTestingUtility util;
private static final byte[] tableName = Bytes.toBytes("test");
- private static final byte[] dummy = Bytes.toBytes("dummy");
- private static final byte[] row1 = Bytes.toBytes("r1");
- private static final byte[] row2 = Bytes.toBytes("r2");
- private static final byte[] row3 = Bytes.toBytes("r3");
- private static final byte[] test = Bytes.toBytes("test");
+ private static final int MAX_ROWS = 6;
+ private static final List<byte[]> rows = new ArrayList<>();
+ static {
+ for (int i=0; i< MAX_ROWS; i++){
+ rows.add(Bytes.toBytes("row" + i));
+ }
+ }
+ private static final byte[] DUMMY = Bytes.toBytes("dummy");
+ private static final byte[] TEST = Bytes.toBytes("test");
@BeforeClass
public static void setUpBeforeClass() throws Exception {
@@ -58,7 +72,7 @@ public class TestHRegionObserverBypassCo
}
admin.deleteTable(tableName);
}
- util.createTable(tableName, new byte[][] {dummy, test});
+ util.createTable(tableName, new byte[][] {DUMMY, TEST});
}
/**
@@ -71,36 +85,188 @@ public class TestHRegionObserverBypassCo
public void testSimple() throws Exception {
HTable t = new HTable(util.getConfiguration(), tableName);
List<Put> puts = new ArrayList<Put>();
- Put p = new Put(row1);
- p.add(test,dummy,dummy);
- p.add(dummy, dummy, dummy);
+ Put p = new Put(rows.get(0));
+ p.add(TEST,DUMMY,DUMMY);
+ p.add(DUMMY, DUMMY, DUMMY);
puts.add(p);
- p = new Put(row2);
- p.add(dummy, dummy, dummy);
+ p = new Put(rows.get(1));
+ p.add(DUMMY, DUMMY, DUMMY);
puts.add(p);
- p = new Put(row3);
- p.add(dummy, dummy, dummy);
+ p = new Put(rows.get(2));
+ p.add(DUMMY, DUMMY, DUMMY);
puts.add(p);
t.put(puts);
- Result r = t.get(new Get(row1));
+ Result r = t.get(new Get(rows.get(0)));
Assert.assertTrue(
"There should be zero results since the put contains \"test\" CF",
r.isEmpty());
- r = t.get(new Get(row2));
+ r = t.get(new Get(rows.get(1)));
Assert.assertEquals(1, r.getKvs().size());
- r = t.get(new Get(row3));
+ r = t.get(new Get(rows.get(2)));
Assert.assertEquals(1, r.getKvs().size());
-
t.close();
}
/**
+ * Scenario this test case is testing
+ * <ul>
+ * <li>Check whether the first coprocessor is running</li>
+ * <li>Change the configuration such that a second coprocessor is added and
+ * check whether both of them are in the list of loaded coprocessors in each
+ * region</li>
+ * <li> test wheteher both of them are functioning right </li>
+ * <li> remove both coprocessors and confirm regions are not running them</li>
+ * <li> remove there is no action taken by any coprocessor since they are unloaded</li>
+ * <li> add back the first coprocessor and confirm regions have it and it is functioning fine </li>
+ * </ul>
+ * @throws Exception
+ */
+ @Test
+ public void testDynamicLoadingOfCoprocessorsInRegionCoprocessorHost() throws Exception {
+ HTable table = new HTable(util.getConfiguration(), tableName);
+ Set<String> allCoprocessors = RegionCoprocessorHost
+ .getEverLoadedCoprocessors();
+ Assert.assertEquals("There should be only one coprocessor everloaded", 1, allCoprocessors.size());
+ Assert
+ .assertEquals(
+ "Expected loaded coprocessor is different from one which is currently loaded",
+ TestCoprocessor.class.getName(), allCoprocessors.toArray()[0]);
+ Configuration conf = util.getConfiguration();
+ Assert.assertEquals(TestCoprocessor.class.getName(),
+ conf.getStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY)[0]);
+ conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
+ TestCoprocessor2.class.getName(), TestCoprocessor.class.getName());
+ System.out.println("conf strings: "
+ + Arrays.toString(conf
+ .getStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY)));
+
+ // invoke online configuration change
+ HRegionServer.configurationManager.notifyAllObservers(conf);
+
+ // check if the coprocessor is in list of everloaded coprocessors
+ Set<String> expectedCoprocessors = new HashSet<>(2);
+ expectedCoprocessors.add(TestCoprocessor.class.getName());
+ expectedCoprocessors.add(TestCoprocessor2.class.getName());
+ Assert.assertEquals("Mismatch in expected loaded coprocessors", expectedCoprocessors,
+ RegionCoprocessorHost.getEverLoadedCoprocessors());
+
+ // check if both new and old coprocessors are in the list of current
+ // coprocessors in each region
+ List<HRegion> regions = util.getMiniHBaseCluster().getRegions(tableName);
+ Set<String> expectedCoprocessorSimpleName = new HashSet<>();
+ expectedCoprocessorSimpleName.add(TestCoprocessor.class.getSimpleName());
+ expectedCoprocessorSimpleName.add(TestCoprocessor2.class.getSimpleName());
+ for (HRegion region : regions) {
+ Assert.assertEquals("Mismatch in expected loaded coprocessors on region"
+ + region.getRegionNameAsString(), expectedCoprocessorSimpleName,
+ region.getCoprocessorHost().getCoprocessors());
+ }
+
+ //now do some puts and check if both of them are working (none of the puts should go in)
+ createPutAndVerifyObserverCorrectness(table, rows.get(0), rows.get(1), true, true);
+
+ //now remove them both
+ conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, "");
+ System.out.println("conf strings: "
+ + Arrays.toString(conf
+ .getStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY)));
+
+ // invoke online configuration change
+ HRegionServer.configurationManager.notifyAllObservers(conf);
+
+ // check if both of them are still in list of everloaded coprocessors
+ Assert.assertEquals("Mismatch in expected loaded coprocessors", expectedCoprocessors,
+ RegionCoprocessorHost.getEverLoadedCoprocessors());
+
+ // none of the coprocessors should be loaded in the regions
+ for (HRegion region : regions) {
+ System.out.println(region.getCoprocessorHost().getCoprocessors());
+ Assert.assertTrue("Region should not contain any coprocessors", region
+ .getCoprocessorHost().getCoprocessors().isEmpty());
+ }
+
+ //now do some puts and check if both of them are working (all puts should go in)
+ createPutAndVerifyObserverCorrectness(table, rows.get(2), rows.get(3), false, false);
+
+ //add back the first one
+ conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, TestCoprocessor.class.getName());
+ System.out.println("conf strings: "
+ + Arrays.toString(conf
+ .getStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY)));
+
+ // invoke online configuration change
+ HRegionServer.configurationManager.notifyAllObservers(conf);
+
+ //confirm it is the only one on the regions
+ expectedCoprocessorSimpleName.remove(TestCoprocessor2.class.getSimpleName());
+ for (HRegion region : regions) {
+ Assert.assertEquals("Mismatch in expected loaded coprocessors on region",
+ expectedCoprocessorSimpleName, region.getCoprocessorHost()
+ .getCoprocessors());
+ }
+
+ // check if both of them are still in list of everloaded coprocessors (this should not change!)
+ Assert.assertEquals("Mismatch in expected loaded coprocessors", expectedCoprocessors,
+ RegionCoprocessorHost.getEverLoadedCoprocessors());
+
+ //now do some puts and check if only the first one is working
+ createPutAndVerifyObserverCorrectness(table, rows.get(4), rows.get(5), true, false);
+ }
+
+ /**
+ * Create two puts one with cf dummy and one with cf test and make sure
+ * coprocessors are functioning fine when we do put
+ *
+ * @param table
+ * - Htable instance
+ * @param testOn
+ * - whether {@link TestCoprocessor} is enabled
+ * @param dummyOn
+ * - whether {@link TestCoprocessor2} is enabled
+ * @throws Exception
+ */
+ public void createPutAndVerifyObserverCorrectness(HTable table, byte[] row1, byte[] row2,
+ boolean testOn, boolean dummyOn) throws Exception {
+ List<Put> puts = new ArrayList<>();
+ Put p = new Put(row1);
+ p.add(TEST, DUMMY, DUMMY);
+ puts.add(p);
+
+ p = new Put(row2);
+ p.add(DUMMY, DUMMY, DUMMY);
+ puts.add(p);
+ table.put(puts);
+ for (Put put : puts) {
+ Result r = table.get(new Get(put.getRow()));
+ List<KeyValue> kvs = put.getFamilyMap().get(TEST);
+
+ // both puts will have one col family each. This is to check if the put
+ // has column family called "test"
+ boolean hasFamilyTest = (kvs!= null && !kvs.isEmpty());
+ if (testOn && hasFamilyTest) {
+ Assert.assertTrue(
+ "There should be zero results since the put contains \"test\" CF",
+ r.isEmpty());
+ } else if (!testOn && hasFamilyTest){
+ Assert.assertEquals("There should be one result", 1, r.getKvs().size());
+ }
+ if (dummyOn && !hasFamilyTest) {
+ Assert.assertTrue(
+ "There should be zero results since the put contains \"dummy\" CF",
+ r.isEmpty());
+ } else if (!dummyOn && !hasFamilyTest){
+ Assert.assertEquals("There should be one result", 1, r.getKvs().size());
+ }
+ }
+ }
+
+ /**
* Dummy coprocessor which skips put containing "test" as a column family
*
*/
@@ -110,11 +276,27 @@ public class TestHRegionObserverBypassCo
final Put put, final WALEdit edit, final boolean durability)
throws IOException {
Map<byte[], List<KeyValue>> familyMap = put.getFamilyMap();
- if (familyMap.containsKey(test)) {
+ if (familyMap.containsKey(TEST)) {
e.bypass();
System.out.println("bypassing put: " + put);
}
}
}
+ /**
+ * Dummy coprocessor which skips put containing "dummy" as a column family
+ *
+ */
+ public static class TestCoprocessor2 extends BaseRegionObserver {
+ @Override
+ public void prePut(ObserverContext<RegionCoprocessorEnvironment> c,
+ Put put, WALEdit edit, boolean writeToWAL) throws IOException {
+ Map<byte[], List<KeyValue>> familyMap = put.getFamilyMap();
+ if (familyMap.containsKey(DUMMY)) {
+ c.bypass();
+ System.out.println("bypassing put: " + put);
+ }
+ }
+ }
+
}