You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2012/08/05 04:37:35 UTC
svn commit: r1369516 - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/coprocessor/
main/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/coprocessor/
Author: larsh
Date: Sun Aug 5 02:37:35 2012
New Revision: 1369516
URL: http://svn.apache.org/viewvc?rev=1369516&view=rev
Log:
HBASE-6505 Allow shared RegionObserver state
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java?rev=1369516&r1=1369515&r2=1369516&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java Sun Aug 5 02:37:35 2012
@@ -213,6 +213,7 @@ public abstract class CoprocessorHost<E
paths.add(file.toURL());
}
}
+ jarFile.close();
StringTokenizer st = new StringTokenizer(cp, File.pathSeparator);
while (st.hasMoreTokens()) {
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java?rev=1369516&r1=1369515&r2=1369516&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java Sun Aug 5 02:37:35 2012
@@ -20,6 +20,8 @@
package org.apache.hadoop.hbase.coprocessor;
+import java.util.concurrent.ConcurrentMap;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
@@ -35,4 +37,7 @@ public interface RegionCoprocessorEnviro
/** @return reference to the region server services */
public RegionServerServices getRegionServerServices();
+ /** @return shared data between all instances of this coprocessor */
+ public ConcurrentMap<String, Object> getSharedData();
+
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java?rev=1369516&r1=1369515&r2=1369516&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java Sun Aug 5 02:37:35 2012
@@ -20,21 +20,39 @@
package org.apache.hadoop.hbase.regionserver;
-import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.regex.Matcher;
+
+import org.apache.commons.collections.map.AbstractReferenceMap;
+import org.apache.commons.collections.map.ReferenceMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.Coprocessor;
-import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.coprocessor.*;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -45,9 +63,7 @@ import org.apache.hadoop.hbase.util.Byte
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.StringUtils;
-import java.io.IOException;
-import java.util.*;
-import java.util.regex.Matcher;
+import com.google.common.collect.ImmutableList;
/**
* Implements the coprocessor environment and runtime support for coprocessors
@@ -57,6 +73,9 @@ public class RegionCoprocessorHost
extends CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> {
private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class);
+ // The shared data map
+ private static ReferenceMap sharedDataMap =
+ new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK);
/**
* Encapsulation of the environment of each coprocessor
@@ -66,6 +85,7 @@ public class RegionCoprocessorHost
private HRegion region;
private RegionServerServices rsServices;
+ ConcurrentMap<String, Object> sharedData;
/**
* Constructor
@@ -74,10 +94,11 @@ public class RegionCoprocessorHost
*/
public RegionEnvironment(final Coprocessor impl, final int priority,
final int seq, final Configuration conf, final HRegion region,
- final RegionServerServices services) {
+ final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
super(impl, priority, seq, conf);
this.region = region;
this.rsServices = services;
+ this.sharedData = sharedData;
}
/** @return the region */
@@ -95,6 +116,11 @@ public class RegionCoprocessorHost
public void shutdown() {
super.shutdown();
}
+
+ @Override
+ public ConcurrentMap<String, Object> getSharedData() {
+ return sharedData;
+ }
}
/** The region server services */
@@ -194,8 +220,19 @@ public class RegionCoprocessorHost
break;
}
}
+ ConcurrentMap<String, Object> classData;
+ // make sure only one thread can add maps
+ synchronized (sharedDataMap) {
+ // as long as at least one RegionEnvironment holds on to its classData it will
+ // remain in this map
+ classData = (ConcurrentMap<String, Object>)sharedDataMap.get(implClass.getName());
+ if (classData == null) {
+ classData = new ConcurrentHashMap<String, Object>();
+ sharedDataMap.put(implClass.getName(), classData);
+ }
+ }
return new RegionEnvironment(instance, priority, seq, conf, region,
- rsServices);
+ rsServices, classData);
}
@Override
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java?rev=1369516&r1=1369515&r2=1369516&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java Sun Aug 5 02:37:35 2012
@@ -25,13 +25,18 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.commons.collections.map.AbstractReferenceMap;
+import org.apache.commons.collections.map.ReferenceMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -41,6 +46,8 @@ import org.apache.hadoop.hbase.HTableDes
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
@@ -49,6 +56,7 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.SplitTransaction;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.PairOfSameType;
import org.junit.experimental.categories.Category;
@@ -132,14 +140,19 @@ public class TestCoprocessorInterface ex
private boolean postFlushCalled;
private boolean preSplitCalled;
private boolean postSplitCalled;
+ private ConcurrentMap<String, Object> sharedData;
@Override
public void start(CoprocessorEnvironment e) {
+ sharedData = ((RegionCoprocessorEnvironment)e).getSharedData();
+ // using new String here, so that there will be new object on each invocation
+ sharedData.putIfAbsent("test1", new Object());
startCalled = true;
}
@Override
public void stop(CoprocessorEnvironment e) {
+ sharedData = null;
stopCalled = true;
}
@@ -214,6 +227,104 @@ public class TestCoprocessorInterface ex
boolean wasSplit() {
return (preSplitCalled && postSplitCalled);
}
+ Map<String, Object> getSharedData() {
+ return sharedData;
+ }
+ }
+
+ public static class CoprocessorII extends BaseRegionObserver {
+ private ConcurrentMap<String, Object> sharedData;
+ @Override
+ public void start(CoprocessorEnvironment e) {
+ sharedData = ((RegionCoprocessorEnvironment)e).getSharedData();
+ sharedData.putIfAbsent("test2", new Object());
+ }
+ @Override
+ public void stop(CoprocessorEnvironment e) {
+ sharedData = null;
+ }
+ @Override
+ public void preGet(final ObserverContext<RegionCoprocessorEnvironment> e,
+ final Get get, final List<KeyValue> results) throws IOException {
+ if (1/0 == 1) {
+ e.complete();
+ }
+ }
+
+ Map<String, Object> getSharedData() {
+ return sharedData;
+ }
+ }
+
+ public void testSharedData() throws IOException {
+ byte [] tableName = Bytes.toBytes("testtable");
+ byte [][] families = { fam1, fam2, fam3 };
+
+ Configuration hc = initSplit();
+ HRegion region = initHRegion(tableName, getName(), hc,
+ new Class<?>[]{}, families);
+
+ for (int i = 0; i < 3; i++) {
+ addContent(region, fam3);
+ region.flushcache();
+ }
+
+ region.compactStores();
+
+ byte [] splitRow = region.checkSplit();
+
+ assertNotNull(splitRow);
+ HRegion [] regions = split(region, splitRow);
+ for (int i = 0; i < regions.length; i++) {
+ regions[i] = reopenRegion(regions[i], CoprocessorImpl.class, CoprocessorII.class);
+ }
+ Coprocessor c = regions[0].getCoprocessorHost().
+ findCoprocessor(CoprocessorImpl.class.getName());
+ Coprocessor c2 = regions[0].getCoprocessorHost().
+ findCoprocessor(CoprocessorII.class.getName());
+ Object o = ((CoprocessorImpl)c).getSharedData().get("test1");
+ Object o2 = ((CoprocessorII)c2).getSharedData().get("test2");
+ assertNotNull(o);
+ assertNotNull(o2);
+ // to coprocessors get different sharedDatas
+ assertFalse(((CoprocessorImpl)c).getSharedData() == ((CoprocessorII)c2).getSharedData());
+ for (int i = 1; i < regions.length; i++) {
+ c = regions[i].getCoprocessorHost().
+ findCoprocessor(CoprocessorImpl.class.getName());
+ c2 = regions[i].getCoprocessorHost().
+ findCoprocessor(CoprocessorII.class.getName());
+ // make sure that all coprocessor of a class have identical sharedDatas
+ assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o);
+ assertTrue(((CoprocessorII)c2).getSharedData().get("test2") == o2);
+ }
+ // now have all Environments fail
+ for (int i = 0; i < regions.length; i++) {
+ try {
+ Get g = new Get(regions[i].getStartKey());
+ regions[i].get(g, null);
+ fail();
+ } catch (DoNotRetryIOException xc) {
+ }
+ assertNull(regions[i].getCoprocessorHost().
+ findCoprocessor(CoprocessorII.class.getName()));
+ }
+ c = regions[0].getCoprocessorHost().
+ findCoprocessor(CoprocessorImpl.class.getName());
+ assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o);
+ c = c2 = null;
+ // perform a GC
+ System.gc();
+ // reopen the region
+ region = reopenRegion(regions[0], CoprocessorImpl.class, CoprocessorII.class);
+ c = region.getCoprocessorHost().
+ findCoprocessor(CoprocessorImpl.class.getName());
+ // CPimpl is unaffected, still the same reference
+ assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o);
+ c2 = region.getCoprocessorHost().
+ findCoprocessor(CoprocessorII.class.getName());
+ // new map and object created, hence the reference is different
+ // hence the old entry was indeed removed by the GC and new one has been created
+ assertFalse(((CoprocessorII)c2).getSharedData().get("test2") == o2);
}
public void testCoprocessorInterface() throws IOException {
@@ -222,7 +333,7 @@ public class TestCoprocessorInterface ex
Configuration hc = initSplit();
HRegion region = initHRegion(tableName, getName(), hc,
- CoprocessorImpl.class, families);
+ new Class<?>[]{CoprocessorImpl.class}, families);
for (int i = 0; i < 3; i++) {
addContent(region, fam3);
region.flushcache();
@@ -268,7 +379,7 @@ public class TestCoprocessorInterface ex
}
}
- HRegion reopenRegion(final HRegion closedRegion, Class<?> implClass)
+ HRegion reopenRegion(final HRegion closedRegion, Class<?> ... implClasses)
throws IOException {
//HRegionInfo info = new HRegionInfo(tableName, null, null, false);
HRegion r = new HRegion(closedRegion);
@@ -281,7 +392,9 @@ public class TestCoprocessorInterface ex
RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf);
r.setCoprocessorHost(host);
- host.load(implClass, Coprocessor.PRIORITY_USER, conf);
+ for (Class<?> implClass : implClasses) {
+ host.load(implClass, Coprocessor.PRIORITY_USER, conf);
+ }
// we need to manually call pre- and postOpen here since the
// above load() is not the real case for CP loading. A CP is
// expected to be loaded by default from 1) configuration; or 2)
@@ -294,7 +407,7 @@ public class TestCoprocessorInterface ex
}
HRegion initHRegion (byte [] tableName, String callingMethod,
- Configuration conf, Class<?> implClass, byte [] ... families)
+ Configuration conf, Class<?> [] implClasses, byte [][] families)
throws IOException {
HTableDescriptor htd = new HTableDescriptor(tableName);
for(byte [] family : families) {
@@ -308,10 +421,11 @@ public class TestCoprocessorInterface ex
RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf);
r.setCoprocessorHost(host);
- host.load(implClass, Coprocessor.PRIORITY_USER, conf);
-
- Coprocessor c = host.findCoprocessor(implClass.getName());
- assertNotNull(c);
+ for (Class<?> implClass : implClasses) {
+ host.load(implClass, Coprocessor.PRIORITY_USER, conf);
+ Coprocessor c = host.findCoprocessor(implClass.getName());
+ assertNotNull(c);
+ }
// Here we have to call pre and postOpen explicitly.
host.preOpen();