You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2012/08/05 04:37:28 UTC

svn commit: r1369515 - in /hbase/branches/0.94/src: main/java/org/apache/hadoop/hbase/coprocessor/ main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/coprocessor/

Author: larsh
Date: Sun Aug  5 02:37:27 2012
New Revision: 1369515

URL: http://svn.apache.org/viewvc?rev=1369515&view=rev
Log:
HBASE-6505 Allow shared RegionObserver state

Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java?rev=1369515&r1=1369514&r2=1369515&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java Sun Aug  5 02:37:27 2012
@@ -209,6 +209,7 @@ public abstract class CoprocessorHost<E 
           paths.add(file.toURL());
         }
       }
+      jarFile.close();
 
       StringTokenizer st = new StringTokenizer(cp, File.pathSeparator);
       while (st.hasMoreTokens()) {

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java?rev=1369515&r1=1369514&r2=1369515&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java Sun Aug  5 02:37:27 2012
@@ -20,6 +20,8 @@
 
 package org.apache.hadoop.hbase.coprocessor;
 
+import java.util.concurrent.ConcurrentMap;
+
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
@@ -31,4 +33,7 @@ public interface RegionCoprocessorEnviro
   /** @return reference to the region server services */
   public RegionServerServices getRegionServerServices();
 
+  /** @return shared data between all instances of this coprocessor */
+  public ConcurrentMap<String, Object> getSharedData();
+
 }

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java?rev=1369515&r1=1369514&r2=1369515&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java Sun Aug  5 02:37:27 2012
@@ -20,21 +20,39 @@
 
 package org.apache.hadoop.hbase.regionserver;
 
-import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.regex.Matcher;
+
+import org.apache.commons.collections.map.AbstractReferenceMap;
+import org.apache.commons.collections.map.ReferenceMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.Coprocessor;
-import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.coprocessor.*;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -45,9 +63,7 @@ import org.apache.hadoop.hbase.util.Byte
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.util.StringUtils;
 
-import java.io.IOException;
-import java.util.*;
-import java.util.regex.Matcher;
+import com.google.common.collect.ImmutableList;
 
 /**
  * Implements the coprocessor environment and runtime support for coprocessors
@@ -57,6 +73,9 @@ public class RegionCoprocessorHost
     extends CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> {
 
   private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class);
+  // The shared data map
+  private static ReferenceMap sharedDataMap =
+      new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK);
 
   /**
    * Encapsulation of the environment of each coprocessor
@@ -66,6 +85,7 @@ public class RegionCoprocessorHost
 
     private HRegion region;
     private RegionServerServices rsServices;
+    ConcurrentMap<String, Object> sharedData;
 
     /**
      * Constructor
@@ -74,10 +94,11 @@ public class RegionCoprocessorHost
      */
     public RegionEnvironment(final Coprocessor impl, final int priority,
         final int seq, final Configuration conf, final HRegion region,
-        final RegionServerServices services) {
+        final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
       super(impl, priority, seq, conf);
       this.region = region;
       this.rsServices = services;
+      this.sharedData = sharedData;
     }
 
     /** @return the region */
@@ -95,6 +116,11 @@ public class RegionCoprocessorHost
     public void shutdown() {
       super.shutdown();
     }
+
+    @Override
+    public ConcurrentMap<String, Object> getSharedData() {
+      return sharedData;
+    }
   }
 
   /** The region server services */
@@ -194,8 +220,19 @@ public class RegionCoprocessorHost
         break;
       }
     }
+    ConcurrentMap<String, Object> classData;
+    // make sure only one thread can add maps
+    synchronized (sharedDataMap) {
+      // as long as at least one RegionEnvironment holds on to its classData it will
+      // remain in this map
+      classData = (ConcurrentMap<String, Object>)sharedDataMap.get(implClass.getName());
+      if (classData == null) {
+        classData = new ConcurrentHashMap<String, Object>();
+        sharedDataMap.put(implClass.getName(), classData);
+      }
+    }
     return new RegionEnvironment(instance, priority, seq, conf, region,
-        rsServices);
+        rsServices, classData);
   }
 
   @Override

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java?rev=1369515&r1=1369514&r2=1369515&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java Sun Aug  5 02:37:27 2012
@@ -23,12 +23,15 @@ package org.apache.hadoop.hbase.coproces
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
@@ -105,14 +108,19 @@ public class TestCoprocessorInterface ex
     private boolean postFlushCalled;
     private boolean preSplitCalled;
     private boolean postSplitCalled;
+    private ConcurrentMap<String, Object> sharedData;
 
     @Override
     public void start(CoprocessorEnvironment e) {
+      sharedData = ((RegionCoprocessorEnvironment)e).getSharedData();
+      // using new String here, so that there will be new object on each invocation
+      sharedData.putIfAbsent("test1", new Object());
       startCalled = true;
     }
 
     @Override
     public void stop(CoprocessorEnvironment e) {
+      sharedData = null;
       stopCalled = true;
     }
 
@@ -187,6 +195,104 @@ public class TestCoprocessorInterface ex
     boolean wasSplit() {
       return (preSplitCalled && postSplitCalled);
     }
+    Map<String, Object> getSharedData() {
+      return sharedData;
+    }
+  }
+
+  public static class CoprocessorII extends BaseRegionObserver {
+    private ConcurrentMap<String, Object> sharedData;
+    @Override
+    public void start(CoprocessorEnvironment e) {
+      sharedData = ((RegionCoprocessorEnvironment)e).getSharedData();
+      sharedData.putIfAbsent("test2", new Object());
+    }
+    @Override
+    public void stop(CoprocessorEnvironment e) {
+      sharedData = null;
+    }
+    @Override
+    public void preGet(final ObserverContext<RegionCoprocessorEnvironment> e,
+        final Get get, final List<KeyValue> results) throws IOException {
+      if (1/0 == 1) {
+        e.complete();
+      }
+    }
+
+    Map<String, Object> getSharedData() {
+      return sharedData;
+    }
+  }
+
+  public void testSharedData() throws IOException {
+    byte [] tableName = Bytes.toBytes("testtable");
+    byte [][] families = { fam1, fam2, fam3 };
+
+    Configuration hc = initSplit();
+    HRegion region = initHRegion(tableName, getName(), hc,
+      new Class<?>[]{}, families);
+
+    for (int i = 0; i < 3; i++) {
+      addContent(region, fam3);
+      region.flushcache();
+    }
+
+    region.compactStores();
+
+    byte [] splitRow = region.checkSplit();
+
+    assertNotNull(splitRow);
+    HRegion [] regions = split(region, splitRow);
+    for (int i = 0; i < regions.length; i++) {
+      regions[i] = reopenRegion(regions[i], CoprocessorImpl.class, CoprocessorII.class);
+    }
+    Coprocessor c = regions[0].getCoprocessorHost().
+        findCoprocessor(CoprocessorImpl.class.getName());
+    Coprocessor c2 = regions[0].getCoprocessorHost().
+        findCoprocessor(CoprocessorII.class.getName());
+    Object o = ((CoprocessorImpl)c).getSharedData().get("test1");
+    Object o2 = ((CoprocessorII)c2).getSharedData().get("test2");
+    assertNotNull(o);
+    assertNotNull(o2);
+    // to coprocessors get different sharedDatas
+    assertFalse(((CoprocessorImpl)c).getSharedData() == ((CoprocessorII)c2).getSharedData());
+    for (int i = 1; i < regions.length; i++) {
+      c = regions[i].getCoprocessorHost().
+          findCoprocessor(CoprocessorImpl.class.getName());
+      c2 = regions[i].getCoprocessorHost().
+          findCoprocessor(CoprocessorII.class.getName());
+      // make sure that all coprocessor of a class have identical sharedDatas
+      assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o);
+      assertTrue(((CoprocessorII)c2).getSharedData().get("test2") == o2);
+    }
+    // now have all Environments fail
+    for (int i = 0; i < regions.length; i++) {
+      try {
+        Get g = new Get(regions[i].getStartKey());
+        regions[i].get(g, null);
+        fail();
+      } catch (DoNotRetryIOException xc) {
+      }
+      assertNull(regions[i].getCoprocessorHost().
+          findCoprocessor(CoprocessorII.class.getName()));
+    }
+    c = regions[0].getCoprocessorHost().
+        findCoprocessor(CoprocessorImpl.class.getName());
+    assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o);
+    c = c2 = null;
+    // perform a GC
+    System.gc();
+    // reopen the region
+    region = reopenRegion(regions[0], CoprocessorImpl.class, CoprocessorII.class);
+    c = region.getCoprocessorHost().
+        findCoprocessor(CoprocessorImpl.class.getName());
+    // CPimpl is unaffected, still the same reference
+    assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o);
+    c2 = region.getCoprocessorHost().
+        findCoprocessor(CoprocessorII.class.getName());
+    // new map and object created, hence the reference is different
+    // hence the old entry was indeed removed by the GC and new one has been created
+    assertFalse(((CoprocessorII)c2).getSharedData().get("test2") == o2);
   }
 
   public void testCoprocessorInterface() throws IOException {
@@ -195,7 +301,7 @@ public class TestCoprocessorInterface ex
 
     Configuration hc = initSplit();
     HRegion region = initHRegion(tableName, getName(), hc,
-      CoprocessorImpl.class, families);
+      new Class<?>[]{CoprocessorImpl.class}, families);
     for (int i = 0; i < 3; i++) {
       addContent(region, fam3);
       region.flushcache();
@@ -243,7 +349,7 @@ public class TestCoprocessorInterface ex
     }
   }
 
-  HRegion reopenRegion(final HRegion closedRegion, Class<?> implClass)
+  HRegion reopenRegion(final HRegion closedRegion, Class<?> ... implClasses)
       throws IOException {
     //HRegionInfo info = new HRegionInfo(tableName, null, null, false);
     HRegion r = new HRegion(closedRegion.getTableDir(), closedRegion.getLog(),
@@ -258,7 +364,9 @@ public class TestCoprocessorInterface ex
     RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf);
     r.setCoprocessorHost(host);
 
-    host.load(implClass, Coprocessor.PRIORITY_USER, conf);
+    for (Class<?> implClass : implClasses) {
+      host.load(implClass, Coprocessor.PRIORITY_USER, conf);
+    }
     // we need to manually call pre- and postOpen here since the
     // above load() is not the real case for CP loading. A CP is
     // expected to be loaded by default from 1) configuration; or 2)
@@ -271,7 +379,7 @@ public class TestCoprocessorInterface ex
   }
 
   HRegion initHRegion (byte [] tableName, String callingMethod,
-      Configuration conf, Class<?> implClass, byte [] ... families)
+      Configuration conf, Class<?> [] implClasses, byte [][] families)
       throws IOException {
     HTableDescriptor htd = new HTableDescriptor(tableName);
     for(byte [] family : families) {
@@ -285,10 +393,11 @@ public class TestCoprocessorInterface ex
     RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf);
     r.setCoprocessorHost(host);
 
-    host.load(implClass, Coprocessor.PRIORITY_USER, conf);
-
-    Coprocessor c = host.findCoprocessor(implClass.getName());
-    assertNotNull(c);
+    for (Class<?> implClass : implClasses) {
+      host.load(implClass, Coprocessor.PRIORITY_USER, conf);
+      Coprocessor c = host.findCoprocessor(implClass.getName());
+      assertNotNull(c);
+    }
 
     // Here we have to call pre and postOpen explicitly.
     host.preOpen();