You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2010/12/15 11:12:00 UTC

svn commit: r1049471 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/coprocessor/ src/main/java/org/apache/hadoop/hbase/regionserver/ src/test/java/org/apache/hadoop/hbase/coprocessor/

Author: apurtell
Date: Wed Dec 15 10:11:59 2010
New Revision: 1049471

URL: http://svn.apache.org/viewvc?rev=1049471&view=rev
Log:
HBASE-3348 Coprocessors: Allow Observers to completely override base function

Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorEnvironment.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CoprocessorHost.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1049471&r1=1049470&r2=1049471&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Wed Dec 15 10:11:59 2010
@@ -36,6 +36,8 @@ Release 0.91.0 - Unreleased
    HBASE-1861  Multi-Family support for bulk upload tools
    HBASE-3308  SplitTransaction.splitStoreFiles slows splits a lot
    HBASE-3328  Added Admin API to specify explicit split points
+   HBASE-3345  Coprocessors: Allow observers to completely override base
+               function
 
 
   NEW FEATURES

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java?rev=1049471&r1=1049470&r2=1049471&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java Wed Dec 15 10:11:59 2010
@@ -27,6 +27,8 @@ import org.apache.hadoop.hbase.client.Pu
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+
 import java.io.IOException;
 
 /**
@@ -71,70 +73,65 @@ public abstract class BaseRegionObserver
 
   @Override
   public void preGetClosestRowBefore(final CoprocessorEnvironment e,
-      final byte [] row, final byte [] family)
+      final byte [] row, final byte [] family, final Result result)
     throws IOException {
   }
 
   @Override
-  public Result postGetClosestRowBefore(final CoprocessorEnvironment e,
-      byte[] row, byte[] family, final Result result)
-    throws IOException {
-    return result;
+  public void postGetClosestRowBefore(final CoprocessorEnvironment e,
+      final byte [] row, final byte [] family, final Result result)
+      throws IOException {
   }
 
   @Override
-  public Get preGet(final CoprocessorEnvironment e, final Get get)
-    throws IOException {
-    return get;
+  public void preGet(final CoprocessorEnvironment e, final Get get,
+      final List<KeyValue> results) throws IOException {
   }
 
   @Override
-  public List<KeyValue> postGet(final CoprocessorEnvironment e, final Get get,
-      List<KeyValue> results) throws IOException {
-    return results;
+  public void postGet(final CoprocessorEnvironment e, final Get get,
+      final List<KeyValue> results) throws IOException {
   }
 
   @Override
-  public Get preExists(final CoprocessorEnvironment e, final Get get)
-      throws IOException {
-    return get;
+  public boolean preExists(final CoprocessorEnvironment e, final Get get,
+      final boolean exists) throws IOException {
+    return exists;
   }
 
   @Override
-  public boolean postExists(final CoprocessorEnvironment e,
-      final Get get, boolean exists)
-      throws IOException {
+  public boolean postExists(final CoprocessorEnvironment e, final Get get,
+      boolean exists) throws IOException {
     return exists;
   }
 
   @Override
-  public Map<byte[], List<KeyValue>> prePut(final CoprocessorEnvironment e,
-      final Map<byte[], List<KeyValue>> familyMap) throws IOException {
-    return familyMap;
+  public void prePut(final CoprocessorEnvironment e, final Map<byte[],
+      List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
   }
 
   @Override
-  public void postPut(final CoprocessorEnvironment e,
-      final Map<byte[], List<KeyValue>> familyMap)
-    throws IOException {
+  public void postPut(final CoprocessorEnvironment e, final Map<byte[],
+      List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
   }
 
   @Override
-  public Map<byte[], List<KeyValue>> preDelete(final CoprocessorEnvironment e,
-      final Map<byte[], List<KeyValue>> familyMap) throws IOException {
-    return familyMap;
+  public void preDelete(final CoprocessorEnvironment e, final Map<byte[],
+      List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
   }
 
   @Override
-  public void postDelete(CoprocessorEnvironment e,
-      Map<byte[], List<KeyValue>> familyMap) throws IOException {
+  public void postDelete(final CoprocessorEnvironment e,
+      final Map<byte[], List<KeyValue>> familyMap, final boolean writeToWAL)
+      throws IOException {
   }
 
   @Override
-  public Put preCheckAndPut(final CoprocessorEnvironment e,
+  public boolean preCheckAndPut(final CoprocessorEnvironment e,
       final byte [] row, final byte [] family, final byte [] qualifier,
-      final byte [] value, final Put put) throws IOException {
-    return put;
+      final byte [] value, final Put put, final boolean result)
+      throws IOException {
+    return result;
   }
 
   @Override
@@ -146,18 +143,18 @@ public abstract class BaseRegionObserver
   }
 
   @Override
-  public Delete preCheckAndDelete(final CoprocessorEnvironment e,
+  public boolean preCheckAndDelete(final CoprocessorEnvironment e,
       final byte [] row, final byte [] family, final byte [] qualifier,
-      final byte [] value, final Delete delete)
-    throws IOException {
-    return delete;
+      final byte [] value, final Delete delete, final boolean result)
+      throws IOException {
+    return result;
   }
 
   @Override
   public boolean postCheckAndDelete(final CoprocessorEnvironment e,
       final byte [] row, final byte [] family, final byte [] qualifier,
       final byte [] value, final Delete delete, final boolean result)
-    throws IOException {
+      throws IOException {
     return result;
   }
 
@@ -172,54 +169,53 @@ public abstract class BaseRegionObserver
   public long postIncrementColumnValue(final CoprocessorEnvironment e,
       final byte [] row, final byte [] family, final byte [] qualifier,
       final long amount, final boolean writeToWAL, long result)
-  throws IOException {
+      throws IOException {
     return result;
   }
 
   @Override
-  public Increment preIncrement(final CoprocessorEnvironment e,
-      final Increment increment)
-  throws IOException {
-    return increment;
+  public void preIncrement(final CoprocessorEnvironment e,
+      final Increment increment, final Result result) throws IOException {
   }
 
   @Override
-  public Result postIncrement(final CoprocessorEnvironment e,
-      final Increment increment,
-      final Result result) throws IOException {
-    return result;
+  public void postIncrement(final CoprocessorEnvironment e,
+      final Increment increment, final Result result) throws IOException {
   }
 
   @Override
-  public Scan preScannerOpen(final CoprocessorEnvironment e, final Scan scan)
-    throws IOException {
-    return scan;
+  public InternalScanner preScannerOpen(final CoprocessorEnvironment e,
+      final Scan scan, final InternalScanner s) throws IOException {
+    return s;
   }
 
   @Override
-  public void postScannerOpen(final CoprocessorEnvironment e,
-      final Scan scan,
-      final long scannerId) throws IOException { }
+  public InternalScanner postScannerOpen(final CoprocessorEnvironment e,
+      final Scan scan, final InternalScanner s) throws IOException {
+    return s;
+  }
 
   @Override
-  public void preScannerNext(final CoprocessorEnvironment e,
-      final long scannerId) throws IOException {
+  public boolean preScannerNext(final CoprocessorEnvironment e,
+      final InternalScanner s, final List<KeyValue> results,
+      final int limit, final boolean hasMore) throws IOException {
+    return hasMore;
   }
 
   @Override
-  public List<KeyValue> postScannerNext(final CoprocessorEnvironment e,
-      final long scannerId, final List<KeyValue> results)
-      throws IOException {
-    return results;
+  public boolean postScannerNext(final CoprocessorEnvironment e,
+      final InternalScanner s, final List<KeyValue> results, final int limit,
+      final boolean hasMore) throws IOException {
+    return hasMore;
   }
 
   @Override
   public void preScannerClose(final CoprocessorEnvironment e,
-      final long scannerId)
-    throws IOException { }
+      final InternalScanner s) throws IOException {
+  }
 
   @Override
   public void postScannerClose(final CoprocessorEnvironment e,
-      final long scannerId)
-    throws IOException { }
+      final InternalScanner s) throws IOException {
+  }
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorEnvironment.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorEnvironment.java?rev=1049471&r1=1049470&r2=1049471&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorEnvironment.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorEnvironment.java Wed Dec 15 10:11:59 2010
@@ -45,27 +45,17 @@ public interface CoprocessorEnvironment 
    */
   public HTableInterface getTable(byte[] tableName) throws IOException;
 
-  // environment variables
+  /* Control flow changes */
 
   /**
-   * Get an environment variable
-   * @param key the key
-   * @return the object corresponding to the environment variable, if set
+   * Causes framework to bypass default actions and return with the results
+   * from a preXXX chain.
    */
-  public Object get(Object key);
+  public void bypass();
 
   /**
-   * Set an environment variable
-   * @param key the key
-   * @param value the value
+   * Mark coprocessor chain processing as complete. Causes framework to return
+   * immediately without calling any additional chained coprocessors.
    */
-  public void put(Object key, Object value);
-
-  /**
-   * Remove an environment variable
-   * @param key the key
-   * @return the object corresponding to the environment variable, if set
-   */
-  public Object remove(Object key);
-
+  public void complete();
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java?rev=1049471&r1=1049470&r2=1049471&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java Wed Dec 15 10:11:59 2010
@@ -27,6 +27,8 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+
 import java.io.IOException;
 
 /**
@@ -37,62 +39,92 @@ public interface RegionObserver {
 
   /**
    * Called before a client makes a GetClosestRowBefore request.
+   * <p>
+   * Call CoprocessorEnvironment#bypass to skip default actions
+   * <p>
+   * Call CoprocessorEnvironment#complete to skip any subsequent chained
+   * coprocessors
    * @param e the environment provided by the region server
    * @param row the row
    * @param family the family
+   * @param result The result to return to the client if default processing
+   * is bypassed. Can be modified. Will not be used if default processing
+   * is not bypassed.
    * @throws IOException if an error occurred on the coprocessor
    */
   public void preGetClosestRowBefore(final CoprocessorEnvironment e,
-      final byte [] row, final byte [] family)
+      final byte [] row, final byte [] family, final Result result)
     throws IOException;
 
   /**
    * Called after a client makes a GetClosestRowBefore request.
+   * <p>
+   * Call CoprocessorEnvironment#complete to skip any subsequent chained
+   * coprocessors
    * @param e the environment provided by the region server
    * @param row the row
    * @param family the desired family
-   * @param result the result set
-   * @return the possible tranformed result set to return to the client
+   * @param result the result to return to the client, modify as necessary
    * @throws IOException if an error occurred on the coprocessor
    */
-  public Result postGetClosestRowBefore(final CoprocessorEnvironment e,
+  public void postGetClosestRowBefore(final CoprocessorEnvironment e,
       final byte [] row, final byte [] family, final Result result)
     throws IOException;
 
   /**
-   * Called before the client perform a get()
+   * Called before the client performs a Get
+   * <p>
+   * Call CoprocessorEnvironment#bypass to skip default actions
+   * <p>
+   * Call CoprocessorEnvironment#complete to skip any subsequent chained
+   * coprocessors
    * @param e the environment provided by the region server
    * @param get the Get request
-   * @return the possibly transformed Get object by coprocessor
+   * @param result The result to return to the client if default processing
+   * is bypassed. Can be modified. Will not be used if default processing
+   * is not bypassed.
    * @throws IOException if an error occurred on the coprocessor
    */
-  public Get preGet(final CoprocessorEnvironment e, final Get get)
+  public void preGet(final CoprocessorEnvironment e, final Get get,
+      final List<KeyValue> result)
     throws IOException;
 
   /**
-   * Called after the client perform a get()
+   * Called after the client performs a Get
+   * <p>
+   * Call CoprocessorEnvironment#complete to skip any subsequent chained
+   * coprocessors
    * @param e the environment provided by the region server
    * @param get the Get request
-   * @param results the result list
-   * @return the possibly transformed result list to return to client
+   * @param result the result to return to the client, modify as necessary
    * @throws IOException if an error occurred on the coprocessor
    */
-  public List<KeyValue> postGet(final CoprocessorEnvironment e, final Get get,
-      final List<KeyValue> results)
+  public void postGet(final CoprocessorEnvironment e, final Get get,
+      final List<KeyValue> result)
     throws IOException;
 
   /**
    * Called before the client tests for existence using a Get.
+   * <p>
+   * Call CoprocessorEnvironment#bypass to skip default actions
+   * <p>
+   * Call CoprocessorEnvironment#complete to skip any subsequent chained
+   * coprocessors
    * @param e the environment provided by the region server
    * @param get the Get request
-   * @return the possibly transformed Get object by coprocessor
+   * @param exists
+   * @return the value to return to the client if bypassing default processing
    * @throws IOException if an error occurred on the coprocessor
    */
-  public Get preExists(final CoprocessorEnvironment e, final Get get)
+  public boolean preExists(final CoprocessorEnvironment e, final Get get,
+      final boolean exists)
     throws IOException;
 
   /**
    * Called after the client tests for existence using a Get.
+   * <p>
+   * Call CoprocessorEnvironment#complete to skip any subsequent chained
+   * coprocessors
    * @param e the environment provided by the region server
    * @param get the Get request
    * @param exists the result returned by the region server
@@ -105,64 +137,92 @@ public interface RegionObserver {
 
   /**
    * Called before the client stores a value.
+   * <p>
+   * Call CoprocessorEnvironment#bypass to skip default actions
+   * <p>
+   * Call CoprocessorEnvironment#complete to skip any subsequent chained
+   * coprocessors
    * @param e the environment provided by the region server
-   * @param familyMap map of family to edits for the given family.
-   * @return the possibly transformed map to actually use
+   * @param familyMap map of family to edits for the given family
+   * @param writeToWAL true if the change should be written to the WAL
    * @throws IOException if an error occurred on the coprocessor
    */
-  public Map<byte[], List<KeyValue>> prePut(final CoprocessorEnvironment e,
-      final Map<byte[], List<KeyValue>> familyMap)
+  public void prePut(final CoprocessorEnvironment e, final Map<byte[],
+      List<KeyValue>> familyMap, final boolean writeToWAL)
     throws IOException;
 
   /**
    * Called after the client stores a value.
+   * <p>
+   * Call CoprocessorEnvironment#complete to skip any subsequent chained
+   * coprocessors
    * @param e the environment provided by the region server
-   * @param familyMap map of family to edits for the given family.
+   * @param familyMap map of family to edits for the given family
+   * @param writeToWAL true if the change should be written to the WAL
    * @throws IOException if an error occurred on the coprocessor
    */
   public void postPut(final CoprocessorEnvironment e, final Map<byte[],
-      List<KeyValue>> familyMap)
+      List<KeyValue>> familyMap, final boolean writeToWAL)
     throws IOException;
 
   /**
    * Called before the client deletes a value.
+   * <p>
+   * Call CoprocessorEnvironment#bypass to skip default actions
+   * <p>
+   * Call CoprocessorEnvironment#complete to skip any subsequent chained
+   * coprocessors
    * @param e the environment provided by the region server
-   * @param familyMap map of family to edits for the given family.
-   * @return the possibly transformed map to actually use
+   * @param familyMap map of family to edits for the given family
+   * @param writeToWAL true if the change should be written to the WAL
    * @throws IOException if an error occurred on the coprocessor
    */
-  public Map<byte[], List<KeyValue>> preDelete(final CoprocessorEnvironment e,
-      final Map<byte[], List<KeyValue>> familyMap)
+  public void preDelete(final CoprocessorEnvironment e, final Map<byte[],
+      List<KeyValue>> familyMap, final boolean writeToWAL)
     throws IOException;
 
   /**
    * Called after the client deletes a value.
+   * <p>
+   * Call CoprocessorEnvironment#complete to skip any subsequent chained
+   * coprocessors
    * @param e the environment provided by the region server
-   * @param familyMap map of family to edits for the given family.
+   * @param familyMap map of family to edits for the given family
+   * @param writeToWAL true if the change should be written to the WAL
    * @throws IOException if an error occurred on the coprocessor
    */
   public void postDelete(final CoprocessorEnvironment e,
-      final Map<byte[], List<KeyValue>> familyMap)
+      final Map<byte[], List<KeyValue>> familyMap, final boolean writeToWAL)
     throws IOException;
 
   /**
    * Called before checkAndPut
+   * <p>
+   * Call CoprocessorEnvironment#bypass to skip default actions
+   * <p>
+   * Call CoprocessorEnvironment#complete to skip any subsequent chained
+   * coprocessors
    * @param e the environment provided by the region server
    * @param row row to check
    * @param family column family
    * @param qualifier column qualifier
    * @param value the expected value
    * @param put data to put if check succeeds
-   * @return the possibly transformed map to actually use
+   * @param result 
+   * @return the return value to return to client if bypassing default
+   * processing
    * @throws IOException if an error occurred on the coprocessor
    */
-  public Put preCheckAndPut(final CoprocessorEnvironment e,
+  public boolean preCheckAndPut(final CoprocessorEnvironment e,
       final byte [] row, final byte [] family, final byte [] qualifier,
-      final byte [] value, final Put put)
+      final byte [] value, final Put put, final boolean result)
     throws IOException;
 
   /**
    * Called after checkAndPut
+   * <p>
+   * Call CoprocessorEnvironment#complete to skip any subsequent chained
+   * coprocessors
    * @param e the environment provided by the region server
    * @param row row to check
    * @param family column family
@@ -170,7 +230,7 @@ public interface RegionObserver {
    * @param value the expected value
    * @param put data to put if check succeeds
    * @param result from the checkAndPut
-   * @return the possibly transformed value to return to client
+   * @return the possibly transformed return value to return to client
    * @throws IOException if an error occurred on the coprocessor
    */
   public boolean postCheckAndPut(final CoprocessorEnvironment e,
@@ -179,23 +239,32 @@ public interface RegionObserver {
     throws IOException;
 
   /**
-   * Called before checkAndPut
+   * Called before checkAndDelete
+   * <p>
+   * Call CoprocessorEnvironment#bypass to skip default actions
+   * <p>
+   * Call CoprocessorEnvironment#complete to skip any subsequent chained
+   * coprocessors
    * @param e the environment provided by the region server
    * @param row row to check
    * @param family column family
    * @param qualifier column qualifier
    * @param value the expected value
    * @param delete delete to commit if check succeeds
-   * @return the possibly transformed map to actually use
+   * @param result 
+   * @return the value to return to client if bypassing default processing
    * @throws IOException if an error occurred on the coprocessor
    */
-  public Delete preCheckAndDelete(final CoprocessorEnvironment e,
+  public boolean preCheckAndDelete(final CoprocessorEnvironment e,
       final byte [] row, final byte [] family, final byte [] qualifier,
-      final byte [] value, final Delete delete)
+      final byte [] value, final Delete delete, final boolean result)
     throws IOException;
 
   /**
    * Called after checkAndDelete
+   * <p>
+   * Call CoprocessorEnvironment#complete to skip any subsequent chained
+   * coprocessors
    * @param e the environment provided by the region server
    * @param row row to check
    * @param family column family
@@ -203,7 +272,7 @@ public interface RegionObserver {
    * @param value the expected value
    * @param delete delete to commit if check succeeds
    * @param result from the CheckAndDelete
-   * @return the possibly transformed value to return to client
+   * @return the possibly transformed returned value to return to client
    * @throws IOException if an error occurred on the coprocessor
    */
   public boolean postCheckAndDelete(final CoprocessorEnvironment e,
@@ -213,13 +282,18 @@ public interface RegionObserver {
 
   /**
    * Called before incrementColumnValue
+   * <p>
+   * Call CoprocessorEnvironment#bypass to skip default actions
+   * <p>
+   * Call CoprocessorEnvironment#complete to skip any subsequent chained
+   * coprocessors
    * @param e the environment provided by the region server
    * @param row row to check
    * @param family column family
    * @param qualifier column qualifier
    * @param amount long amount to increment
-   * @param writeToWAL whether to write the increment to the WAL
-   * @return new amount to increment
+   * @param writeToWAL true if the change should be written to the WAL
+   * @return value to return to the client if bypassing default processing
    * @throws IOException if an error occurred on the coprocessor
    */
   public long preIncrementColumnValue(final CoprocessorEnvironment e,
@@ -229,12 +303,15 @@ public interface RegionObserver {
 
   /**
    * Called after incrementColumnValue
+   * <p>
+   * Call CoprocessorEnvironment#complete to skip any subsequent chained
+   * coprocessors
    * @param e the environment provided by the region server
    * @param row row to check
    * @param family column family
    * @param qualifier column qualifier
    * @param amount long amount to increment
-   * @param writeToWAL whether to write the increment to the WAL
+   * @param writeToWAL true if the change should be written to the WAL
    * @param result the result returned by incrementColumnValue
    * @return the result to return to the client
    * @throws IOException if an error occurred on the coprocessor
@@ -245,92 +322,137 @@ public interface RegionObserver {
     throws IOException;
 
   /**
-   * Called before incrementColumnValue
+   * Called before Increment
+   * <p>
+   * Call CoprocessorEnvironment#bypass to skip default actions
+   * <p>
+   * Call CoprocessorEnvironment#complete to skip any subsequent chained
+   * coprocessors
    * @param e the environment provided by the region server
    * @param increment increment object
-   * @param writeToWAL whether to write the increment to the WAL
-   * @return new Increment instance
+   * @param result The result to return to the client if default processing
+   * is bypassed. Can be modified. Will not be used if default processing
+   * is not bypassed.
+   * @param writeToWAL true if the change should be written to the WAL
    * @throws IOException if an error occurred on the coprocessor
    */
-  public Increment preIncrement(final CoprocessorEnvironment e,
-      final Increment increment)
+  public void preIncrement(final CoprocessorEnvironment e,
+      final Increment increment, final Result result)
     throws IOException;
 
   /**
    * Called after increment
+   * <p>
+   * Call CoprocessorEnvironment#complete to skip any subsequent chained
+   * coprocessors
    * @param e the environment provided by the region server
    * @param increment increment object
-   * @param writeToWAL whether to write the increment to the WAL
-   * @param result the result returned by increment
-   * @return the result to return to the client
+   * @param writeToWAL true if the change should be written to the WAL
+   * @param result the result returned by increment, can be modified
    * @throws IOException if an error occurred on the coprocessor
    */
-  public Result postIncrement(final CoprocessorEnvironment e,
+  public void postIncrement(final CoprocessorEnvironment e,
       final Increment increment, final Result result)
     throws IOException;
 
   /**
    * Called before the client opens a new scanner.
+   * <p>
+   * Call CoprocessorEnvironment#bypass to skip default actions
+   * <p>
+   * Call CoprocessorEnvironment#complete to skip any subsequent chained
+   * coprocessors
    * @param e the environment provided by the region server
    * @param scan the Scan specification
-   * @return the possibly transformed Scan to actually use
+   * @param s if not null, the base scanner
+   * @return an InternalScanner instance to use instead of the base scanner if
+   * overriding default behavior, null otherwise
    * @throws IOException if an error occurred on the coprocessor
    */
-  public Scan preScannerOpen(final CoprocessorEnvironment e, final Scan scan)
+  public InternalScanner preScannerOpen(final CoprocessorEnvironment e,
+      final Scan scan, final InternalScanner s)
     throws IOException;
 
   /**
    * Called after the client opens a new scanner.
+   * <p>
+   * Call CoprocessorEnvironment#complete to skip any subsequent chained
+   * coprocessors
    * @param e the environment provided by the region server
    * @param scan the Scan specification
-   * @param scannerId the scanner id allocated by the region server
+   * @param s if not null, the base scanner
+   * @return the scanner instance to use
    * @throws IOException if an error occurred on the coprocessor
    */
-  public void postScannerOpen(final CoprocessorEnvironment e, final Scan scan,
-      final long scannerId)
+  public InternalScanner postScannerOpen(final CoprocessorEnvironment e,
+      final Scan scan, final InternalScanner s)
     throws IOException;
 
   /**
    * Called before the client asks for the next row on a scanner.
-   * @param e the environment provided by the region server
-   * @param scannerId the scanner id
-   * @param results the result set returned by the region server
-   * @return the possibly transformed result set to actually return
-   * @throws IOException if an error occurred on the coprocessor
-   */
-  public void preScannerNext(final CoprocessorEnvironment e,
-      final long scannerId)
+   * <p>
+   * Call CoprocessorEnvironment#bypass to skip default actions
+   * <p>
+   * Call CoprocessorEnvironment#complete to skip any subsequent chained
+   * coprocessors
+   * @param e the environment provided by the region server
+   * @param s the scanner
+   * @param result The result to return to the client if default processing
+   * is bypassed. Can be modified. Will not be returned if default processing
+   * is not bypassed.
+   * @param limit the maximum number of results to return
+   * @param hasNext the 'has more' indication
+   * @return 'has more' indication that should be sent to client
+   * @throws IOException if an error occurred on the coprocessor
+   */
+  public boolean preScannerNext(final CoprocessorEnvironment e,
+      final InternalScanner s, final List<KeyValue> result,
+      final int limit, final boolean hasNext)
     throws IOException;
 
   /**
    * Called after the client asks for the next row on a scanner.
-   * @param e the environment provided by the region server
-   * @param scannerId the scanner id
-   * @param results the result set returned by the region server
-   * @return the possibly transformed result set to actually return
+   * <p>
+   * Call CoprocessorEnvironment#complete to skip any subsequent chained
+   * coprocessors
+   * @param e the environment provided by the region server
+   * @param s the scanner
+   * @param result the result to return to the client, can be modified
+   * @param limit the maximum number of results to return
+   * @param hasNext the 'has more' indication
+   * @return 'has more' indication that should be sent to client
    * @throws IOException if an error occurred on the coprocessor
    */
-  public List<KeyValue> postScannerNext(final CoprocessorEnvironment e,
-      final long scannerId, final List<KeyValue> results)
+  public boolean postScannerNext(final CoprocessorEnvironment e,
+      final InternalScanner s, final List<KeyValue> result, final int limit,
+      final boolean hasNext)
     throws IOException;
 
   /**
    * Called before the client closes a scanner.
+   * <p>
+   * Call CoprocessorEnvironment#bypass to skip default actions
+   * <p>
+   * Call CoprocessorEnvironment#complete to skip any subsequent chained
+   * coprocessors
    * @param e the environment provided by the region server
-   * @param scannerId the scanner id
+   * @param s the scanner
    * @throws IOException if an error occurred on the coprocessor
    */
   public void preScannerClose(final CoprocessorEnvironment e,
-      final long scannerId)
+      final InternalScanner s)
     throws IOException;
 
   /**
    * Called after the client closes a scanner.
+   * <p>
+   * Call CoprocessorEnvironment#complete to skip any subsequent chained
+   * coprocessors
    * @param e the environment provided by the region server
-   * @param scannerId the scanner id
+   * @param s the scanner
    * @throws IOException if an error occurred on the coprocessor
    */
   public void postScannerClose(final CoprocessorEnvironment e,
-      final long scannerId)
+      final InternalScanner s)
     throws IOException;
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CoprocessorHost.java?rev=1049471&r1=1049470&r2=1049471&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CoprocessorHost.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CoprocessorHost.java Wed Dec 15 10:11:59 2010
@@ -47,6 +47,7 @@ import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -284,6 +285,14 @@ public class CoprocessorHost {
       }
     }
 
+    boolean shouldBypass() {
+      return bypass.getAndSet(false);
+    }
+
+    boolean shouldComplete() {
+      return complete.getAndSet(false);
+    }
+
     /** @return the coprocessor environment version */
     @Override
     public int getVersion() {
@@ -319,30 +328,14 @@ public class CoprocessorHost {
       return new HTableWrapper(tableName);
     }
 
-    /**
-     * @param key the key
-     * @return the value, or null if it does not exist
-     */
-    @Override
-    public Object get(Object key) {
-      return vars.get(key);
-    }
-
-    /**
-     * @param key the key
-     * @param value the value
-     */
     @Override
-    public void put(Object key, Object value) {
-      vars.put(key, value);
+    public void complete() {
+      complete.set(true);
     }
 
-    /**
-     * @param key the key
-     */
     @Override
-    public Object remove(Object key) {
-      return vars.remove(key);
+    public void bypass() {
+      bypass.set(true);
     }
   }
 
@@ -355,8 +348,10 @@ public class CoprocessorHost {
   HRegion region;
   /** Ordered set of loaded coprocessors with lock */
   final ReentrantReadWriteLock coprocessorLock = new ReentrantReadWriteLock();
-  Set<Environment> coprocessors =
+  final Set<Environment> coprocessors =
     new TreeSet<Environment>(new EnvironmentPriorityComparator());
+  final AtomicBoolean bypass = new AtomicBoolean(false);
+  final AtomicBoolean complete = new AtomicBoolean(false);
 
   /**
    * Constructor
@@ -482,6 +477,7 @@ public class CoprocessorHost {
    * @param priority priority
    * @throws IOException Exception
    */
+  @SuppressWarnings({ "unchecked", "rawtypes" })
   public void load(Class<?> implClass, Coprocessor.Priority priority)
       throws IOException {
     // create the instance
@@ -581,6 +577,9 @@ public class CoprocessorHost {
       coprocessorLock.readLock().lock();
       for (Environment env: coprocessors) {
         env.impl.preOpen(env);
+        if (env.shouldComplete()) {
+          break;
+        }
       }
     } finally {
       coprocessorLock.readLock().unlock();
@@ -595,6 +594,9 @@ public class CoprocessorHost {
       coprocessorLock.readLock().lock();
       for (Environment env: coprocessors) {
         env.impl.postOpen(env);
+        if (env.shouldComplete()) {
+          break;
+        }
       }
     } finally {
       coprocessorLock.readLock().unlock();
@@ -610,7 +612,6 @@ public class CoprocessorHost {
       coprocessorLock.writeLock().lock();
       for (Environment env: coprocessors) {
         env.impl.preClose(env, abortRequested);
-        env.shutdown();
       }
     } finally {
       coprocessorLock.writeLock().unlock();
@@ -642,6 +643,9 @@ public class CoprocessorHost {
       coprocessorLock.readLock().lock();
       for (Environment env: coprocessors) {
         env.impl.preCompact(env, willSplit);
+        if (env.shouldComplete()) {
+          break;
+        }
       }
     } finally {
       coprocessorLock.readLock().unlock();
@@ -657,6 +661,9 @@ public class CoprocessorHost {
       coprocessorLock.readLock().lock();
       for (Environment env: coprocessors) {
         env.impl.postCompact(env, willSplit);
+        if (env.shouldComplete()) {
+          break;
+        }
       }
     } finally {
       coprocessorLock.readLock().unlock();
@@ -671,6 +678,9 @@ public class CoprocessorHost {
       coprocessorLock.readLock().lock();
       for (Environment env: coprocessors) {
         env.impl.preFlush(env);
+        if (env.shouldComplete()) {
+          break;
+        }
       }
     } finally {
       coprocessorLock.readLock().unlock();
@@ -685,6 +695,9 @@ public class CoprocessorHost {
       coprocessorLock.readLock().lock();
       for (Environment env: coprocessors) {
         env.impl.postFlush(env);
+        if (env.shouldComplete()) {
+          break;
+        }
       }
     } finally {
       coprocessorLock.readLock().unlock();
@@ -699,6 +712,9 @@ public class CoprocessorHost {
       coprocessorLock.readLock().lock();
       for (Environment env: coprocessors) {
         env.impl.preSplit(env);
+        if (env.shouldComplete()) {
+          break;
+        }
       }
     } finally {
       coprocessorLock.readLock().unlock();
@@ -715,6 +731,9 @@ public class CoprocessorHost {
       coprocessorLock.readLock().lock();
       for (Environment env: coprocessors) {
         env.impl.postSplit(env, l, r);
+        if (env.shouldComplete()) {
+          break;
+        }
       }
     } finally {
       coprocessorLock.readLock().unlock();
@@ -727,17 +746,25 @@ public class CoprocessorHost {
    * @param row the row key
    * @param family the family
    * @param result the result set from the region
+   * @return true if default processing should be bypassed
    * @exception IOException Exception
    */
-  public void preGetClosestRowBefore(final byte[] row, final byte[] family)
-  throws IOException {
+  public boolean preGetClosestRowBefore(final byte[] row, final byte[] family,
+      final Result result) throws IOException {
     try {
+      boolean bypass = false;
       coprocessorLock.readLock().lock();
       for (Environment env: coprocessors) {
         if (env.impl instanceof RegionObserver) {
-          ((RegionObserver)env.impl).preGetClosestRowBefore(env, row, family);
+          ((RegionObserver)env.impl).preGetClosestRowBefore(env, row, family,
+            result);
+          bypass |= env.shouldBypass();
+          if (env.shouldComplete()) {
+            break;
+          }
         }
       }
+      return bypass;
     } finally {
       coprocessorLock.readLock().unlock();
     }
@@ -747,20 +774,21 @@ public class CoprocessorHost {
    * @param row the row key
    * @param family the family
    * @param result the result set from the region
-   * @return the result set to return to the client
    * @exception IOException Exception
    */
-  public Result postGetClosestRowBefore(final byte[] row, final byte[] family,
-      Result result) throws IOException {
+  public void postGetClosestRowBefore(final byte[] row, final byte[] family,
+      final Result result) throws IOException {
     try {
       coprocessorLock.readLock().lock();
       for (Environment env: coprocessors) {
         if (env.impl instanceof RegionObserver) {
-          result = ((RegionObserver)env.impl)
-            .postGetClosestRowBefore(env, row, family, result);
+          ((RegionObserver)env.impl).postGetClosestRowBefore(env, row, family,
+            result);
+          if (env.shouldComplete()) {
+            break;
+          }
         }
       }
-      return result;
     } finally {
       coprocessorLock.readLock().unlock();
     }
@@ -768,18 +796,24 @@ public class CoprocessorHost {
 
   /**
    * @param get the Get request
-   * @return the possibly transformed Get object by coprocessor
+   * @return true if default processing should be bypassed
    * @exception IOException Exception
    */
-  public Get preGet(Get get) throws IOException {
+  public boolean preGet(final Get get, final List<KeyValue> results)
+      throws IOException {
     try {
+      boolean bypass = false;
       coprocessorLock.readLock().lock();
       for (Environment env: coprocessors) {
         if (env.impl instanceof RegionObserver) {
-          get = ((RegionObserver)env.impl).preGet(env, get);
+          ((RegionObserver)env.impl).preGet(env, get, results);
+          bypass |= env.shouldBypass();
+          if (env.shouldComplete()) {
+            break;
+          }
         }
       }
-      return get;
+      return bypass;
     } finally {
       coprocessorLock.readLock().unlock();
     }
@@ -791,16 +825,18 @@ public class CoprocessorHost {
    * @return the possibly transformed result set to use
    * @exception IOException Exception
    */
-  public List<KeyValue> postGet(final Get get, List<KeyValue> results)
-  throws IOException {
+  public void postGet(final Get get, final List<KeyValue> results)
+      throws IOException {
     try {
       coprocessorLock.readLock().lock();
       for (Environment env: coprocessors) {
         if (env.impl instanceof RegionObserver) {
-          results = ((RegionObserver)env.impl).postGet(env, get, results);
+          ((RegionObserver)env.impl).postGet(env, get, results);
+          if (env.shouldComplete()) {
+            break;
+          }
         }
       }
-      return results;
     } finally {
       coprocessorLock.readLock().unlock();
     }
@@ -808,18 +844,25 @@ public class CoprocessorHost {
 
   /**
    * @param get the Get request
-   * @param exists the result returned by the region server
+   * @return true or false to return to client if bypassing normal operation,
+   * or null otherwise
    * @exception IOException Exception
    */
-  public Get preExists(Get get) throws IOException {
+  public Boolean preExists(final Get get) throws IOException {
     try {
+      boolean bypass = false;
+      boolean exists = false;
       coprocessorLock.readLock().lock();
       for (Environment env: coprocessors) {
         if (env.impl instanceof RegionObserver) {
-          get = ((RegionObserver)env.impl).preExists(env, get);
+          exists = ((RegionObserver)env.impl).preExists(env, get, exists);
+          bypass |= env.shouldBypass();
+          if (env.shouldComplete()) {
+            break;
+          }
         }
       }
-      return get;
+      return bypass ? exists : null;
     } finally {
       coprocessorLock.readLock().unlock();
     }
@@ -837,7 +880,10 @@ public class CoprocessorHost {
       coprocessorLock.readLock().lock();
       for (Environment env: coprocessors) {
         if (env.impl instanceof RegionObserver) {
-          exists &= ((RegionObserver)env.impl).postExists(env, get, exists);
+          exists = ((RegionObserver)env.impl).postExists(env, get, exists);
+          if (env.shouldComplete()) {
+            break;
+          }
         }
       }
       return exists;
@@ -848,19 +894,25 @@ public class CoprocessorHost {
 
   /**
    * @param familyMap map of family to edits for the given family.
-   * @return the possibly transformed map to actually use
+   * @param writeToWAL true if the change should be written to the WAL
+   * @return true if default processing should be bypassed
    * @exception IOException Exception
    */
-  public Map<byte[], List<KeyValue>> prePut(Map<byte[], List<KeyValue>> familyMap)
-  throws IOException {
+  public boolean prePut(final Map<byte[], List<KeyValue>> familyMap,
+      final boolean writeToWAL) throws IOException {
     try {
+      boolean bypass = false;
       coprocessorLock.readLock().lock();
       for (Environment env: coprocessors) {
         if (env.impl instanceof RegionObserver) {
-          familyMap = ((RegionObserver)env.impl).prePut(env, familyMap);
+          ((RegionObserver)env.impl).prePut(env, familyMap, writeToWAL);
+          bypass |= env.shouldBypass();
+          if (env.shouldComplete()) {
+            break;
+          }
         }
       }
-      return familyMap;
+      return bypass;
     } finally {
       coprocessorLock.readLock().unlock();
     }
@@ -868,15 +920,19 @@ public class CoprocessorHost {
 
   /**
    * @param familyMap map of family to edits for the given family.
+   * @param writeToWAL true if the change should be written to the WAL
    * @exception IOException Exception
    */
-  public void postPut(Map<byte[], List<KeyValue>> familyMap)
-  throws IOException {
+  public void postPut(final Map<byte[], List<KeyValue>> familyMap,
+      final boolean writeToWAL) throws IOException {
     try {
       coprocessorLock.readLock().lock();
       for (Environment env: coprocessors) {
         if (env.impl instanceof RegionObserver) {
-          ((RegionObserver)env.impl).postPut(env, familyMap);
+          ((RegionObserver)env.impl).postPut(env, familyMap, writeToWAL);
+          if (env.shouldComplete()) {
+            break;
+          }
         }
       }
     } finally {
@@ -886,19 +942,25 @@ public class CoprocessorHost {
 
   /**
    * @param familyMap map of family to edits for the given family.
-   * @return the possibly transformed map to actually use
+   * @param writeToWAL true if the change should be written to the WAL
+   * @return true if default processing should be bypassed
    * @exception IOException Exception
    */
-  public Map<byte[], List<KeyValue>> preDelete(Map<byte[], List<KeyValue>> familyMap)
-  throws IOException {
+  public boolean preDelete(final Map<byte[], List<KeyValue>> familyMap,
+      final boolean writeToWAL) throws IOException {
     try {
+      boolean bypass = false;
       coprocessorLock.readLock().lock();
       for (Environment env: coprocessors) {
         if (env.impl instanceof RegionObserver) {
-          familyMap = ((RegionObserver)env.impl).preDelete(env, familyMap);
+          ((RegionObserver)env.impl).preDelete(env, familyMap, writeToWAL);
+          bypass |= env.shouldBypass();
+          if (env.shouldComplete()) {
+            break;
+          }
         }
       }
-      return familyMap;
+      return bypass;
     } finally {
       coprocessorLock.readLock().unlock();
     }
@@ -906,15 +968,19 @@ public class CoprocessorHost {
 
   /**
    * @param familyMap map of family to edits for the given family.
+   * @param writeToWAL true if the change should be written to the WAL
    * @exception IOException Exception
    */
-  public void postDelete(Map<byte[], List<KeyValue>> familyMap)
-  throws IOException {
+  public void postDelete(final Map<byte[], List<KeyValue>> familyMap,
+      final boolean writeToWAL) throws IOException {
     try {
       coprocessorLock.readLock().lock();
       for (Environment env: coprocessors) {
         if (env.impl instanceof RegionObserver) {
-          ((RegionObserver)env.impl).postDelete(env, familyMap);
+          ((RegionObserver)env.impl).postDelete(env, familyMap, writeToWAL);
+          if (env.shouldComplete()) {
+            break;
+          }
         }
       }
     } finally {
@@ -928,21 +994,29 @@ public class CoprocessorHost {
    * @param qualifier column qualifier
    * @param value the expected value
    * @param put data to put if check succeeds
+   * @return true or false to return to client if default processing should
+   * be bypassed, or null otherwise
    * @throws IOException e
    */
-  public Put preCheckAndPut(final byte [] row, final byte [] family,
+  public Boolean preCheckAndPut(final byte [] row, final byte [] family,
       final byte [] qualifier, final byte [] value, Put put)
     throws IOException
   {
     try {
+      boolean bypass = false;
+      boolean result = false;
       coprocessorLock.readLock().lock();
       for (Environment env: coprocessors) {
         if (env.impl instanceof RegionObserver) {
-          put = ((RegionObserver)env.impl).preCheckAndPut(env, row, family,
-            qualifier, value, put);
+          result = ((RegionObserver)env.impl).preCheckAndPut(env, row, family,
+            qualifier, value, put, result);
+          bypass |= env.shouldBypass();
+          if (env.shouldComplete()) {
+            break;
+          }
         }
       }
-      return put;
+      return bypass ? result : null;
     } finally {
       coprocessorLock.readLock().unlock();
     }
@@ -967,6 +1041,9 @@ public class CoprocessorHost {
         if (env.impl instanceof RegionObserver) {
           result = ((RegionObserver)env.impl).postCheckAndPut(env, row,
             family, qualifier, value, put, result);
+          if (env.shouldComplete()) {
+            break;
+          }
         }
       }
       return result;
@@ -981,21 +1058,29 @@ public class CoprocessorHost {
    * @param qualifier column qualifier
    * @param value the expected value
    * @param delete delete to commit if check succeeds
+   * @return true or false to return to client if default processing should
+   * be bypassed, or null otherwise
    * @throws IOException e
    */
-  public Delete preCheckAndDelete(final byte [] row, final byte [] family,
+  public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
       final byte [] qualifier, final byte [] value, Delete delete)
     throws IOException
   {
     try {
+      boolean bypass = false;
+      boolean result = false;
       coprocessorLock.readLock().lock();
       for (Environment env: coprocessors) {
         if (env.impl instanceof RegionObserver) {
-          delete = ((RegionObserver)env.impl).preCheckAndDelete(env, row,
-              family, qualifier, value, delete);
+          result = ((RegionObserver)env.impl).preCheckAndDelete(env, row,
+            family, qualifier, value, delete, result);
+          bypass |= env.shouldBypass();
+          if (env.shouldComplete()) {
+            break;
+          }
         }
       }
-      return delete;
+      return bypass ? result : null;
     } finally {
       coprocessorLock.readLock().unlock();
     }
@@ -1020,6 +1105,9 @@ public class CoprocessorHost {
         if (env.impl instanceof RegionObserver) {
           result = ((RegionObserver)env.impl).postCheckAndDelete(env, row,
             family, qualifier, value, delete, result);
+          if (env.shouldComplete()) {
+            break;
+          }
         }
       }
       return result;
@@ -1033,25 +1121,31 @@ public class CoprocessorHost {
    * @param family column family
    * @param qualifier column qualifier
    * @param amount long amount to increment
-   * @param writeToWAL whether to write the increment to the WAL
-   * @return new amount to increment
+   * @param writeToWAL true if the change should be written to the WAL
+   * @return return value for client if default operation should be bypassed,
+   * or null otherwise
    * @throws IOException if an error occurred on the coprocessor
    */
-  public long preIncrementColumnValue(final byte [] row, final byte [] family,
+  public Long preIncrementColumnValue(final byte [] row, final byte [] family,
       final byte [] qualifier, long amount, final boolean writeToWAL)
       throws IOException {
     try {
+      boolean bypass = false;
       coprocessorLock.readLock().lock();
       for (Environment env: coprocessors) {
         if (env.impl instanceof RegionObserver) {
           amount = ((RegionObserver)env.impl).preIncrementColumnValue(env,
-              row, family, qualifier, amount, writeToWAL);
+            row, family, qualifier, amount, writeToWAL);
+          bypass |= env.shouldBypass();
+          if (env.shouldComplete()) {
+            break;
+          }
         }
       }
+      return bypass ? amount : null;
     } finally {
       coprocessorLock.readLock().unlock();
     }
-    return amount;
   }
 
   /**
@@ -1059,7 +1153,7 @@ public class CoprocessorHost {
    * @param family column family
    * @param qualifier column qualifier
    * @param amount long amount to increment
-   * @param writeToWAL whether to write the increment to the WAL
+   * @param writeToWAL true if the change should be written to the WAL
    * @param result the result returned by incrementColumnValue
    * @return the result to return to the client
    * @throws IOException if an error occurred on the coprocessor
@@ -1072,7 +1166,10 @@ public class CoprocessorHost {
       for (Environment env: coprocessors) {
         if (env.impl instanceof RegionObserver) {
           result = ((RegionObserver)env.impl).postIncrementColumnValue(env,
-              row, family, qualifier, amount, writeToWAL, result);
+            row, family, qualifier, amount, writeToWAL, result);
+          if (env.shouldComplete()) {
+            break;
+          }
         }
       }
     } finally {
@@ -1083,155 +1180,202 @@ public class CoprocessorHost {
 
   /**
    * @param increment increment object
-   * @param writeToWAL whether to write the increment to the WAL
-   * @return new amount to increment
+   * @param writeToWAL true if the change should be written to the WAL
+   * @return result to return to client if default operation should be
+   * bypassed, null otherwise
    * @throws IOException if an error occurred on the coprocessor
    */
-  public Increment preIncrement(Increment increment)
+  public Result preIncrement(Increment increment)
       throws IOException {
     try {
+      boolean bypass = false;
+      Result result = new Result();
       coprocessorLock.readLock().lock();
       for (Environment env: coprocessors) {
         if (env.impl instanceof RegionObserver) {
-          increment = ((RegionObserver)env.impl).preIncrement(env, increment);
+          ((RegionObserver)env.impl).preIncrement(env, increment, result);
+          bypass |= env.shouldBypass();
+          if (env.shouldComplete()) {
+            break;
+          }
         }
       }
+      return bypass ? result : null;
     } finally {
       coprocessorLock.readLock().unlock();
     }
-    return increment;
   }
 
   /**
    * @param increment increment object
-   * @param writeToWAL whether to write the increment to the WAL
+   * @param writeToWAL true if the change should be written to the WAL
    * @param result the result returned by incrementColumnValue
-   * @return the result to return to the client
    * @throws IOException if an error occurred on the coprocessor
    */
-  public Result postIncrement(final Increment increment, Result result)
+  public void postIncrement(final Increment increment, Result result)
       throws IOException {
     try {
       coprocessorLock.readLock().lock();
       for (Environment env: coprocessors) {
         if (env.impl instanceof RegionObserver) {
-          result = ((RegionObserver)env.impl).postIncrement(env, increment,
-              result);
+          ((RegionObserver)env.impl).postIncrement(env, increment, result);
+          if (env.shouldComplete()) {
+            break;
+          }
         }
       }
     } finally {
       coprocessorLock.readLock().unlock();
     }
-    return result;
   }
 
   /**
    * @param scan the Scan specification
+   * @return scanner id to return to client if default operation should be
+   * bypassed, false otherwise
    * @exception IOException Exception
    */
-  public Scan preScannerOpen(Scan scan) throws IOException {
+  public InternalScanner preScannerOpen(Scan scan) throws IOException {
     try {
+      boolean bypass = false;
+      InternalScanner s = null;
       coprocessorLock.readLock().lock();
       for (Environment env: coprocessors) {
         if (env.impl instanceof RegionObserver) {
-          scan = ((RegionObserver)env.impl).preScannerOpen(env, scan);
+          s = ((RegionObserver)env.impl).preScannerOpen(env, scan, s);
+          bypass |= env.shouldBypass();
+          if (env.shouldComplete()) {
+            break;
+          }
         }
       }
+      return bypass ? s : null;
     } finally {
       coprocessorLock.readLock().unlock();
     }
-    return scan;
   }
 
   /**
    * @param scan the Scan specification
    * @param scannerId the scanner id allocated by the region server
+   * @return the scanner instance to use
    * @exception IOException Exception
    */
-  public void postScannerOpen(final Scan scan, long scannerId)
+  public InternalScanner postScannerOpen(final Scan scan, InternalScanner s)
       throws IOException {
     try {
       coprocessorLock.readLock().lock();
       for (Environment env: coprocessors) {
         if (env.impl instanceof RegionObserver) {
-          ((RegionObserver)env.impl).postScannerOpen(env, scan, scannerId);
+          s = ((RegionObserver)env.impl).postScannerOpen(env, scan, s);
+          if (env.shouldComplete()) {
+            break;
+          }
         }
       }
+      return s;
     } finally {
       coprocessorLock.readLock().unlock();
     }
   }
 
   /**
-   * @param scannerId the scanner id
+   * @param s the scanner
    * @param results the result set returned by the region server
-   * @return the possibly transformed result set to actually return
+   * @param limit the maximum number of results to return
+   * @return 'has next' indication to client if bypassing default behavior, or
+   * null otherwise
    * @exception IOException Exception
    */
-  public void preScannerNext(final long scannerId) throws IOException {
+  public Boolean preScannerNext(final InternalScanner s,
+      final List<KeyValue> results, int limit) throws IOException {
     try {
+      boolean bypass = false;
+      boolean hasNext = false;
       coprocessorLock.readLock().lock();
       for (Environment env: coprocessors) {
         if (env.impl instanceof RegionObserver) {
-          ((RegionObserver)env.impl).preScannerNext(env, scannerId);
+          hasNext = ((RegionObserver)env.impl).preScannerNext(env, s, results,
+            limit, hasNext);
+          bypass |= env.shouldBypass();
+          if (env.shouldComplete()) {
+            break;
+          }
         }
       }
+      return bypass ? hasNext : null;
     } finally {
       coprocessorLock.readLock().unlock();
     }
   }
 
   /**
-   * @param scannerId the scanner id
+   * @param s the scanner
    * @param results the result set returned by the region server
-   * @return the possibly transformed result set to actually return
+   * @param limit the maximum number of results to return
+   * @param hasMore
+   * @return 'has more' indication to give to client
    * @exception IOException Exception
    */
-  public List<KeyValue> postScannerNext(final long scannerId,
-      List<KeyValue> results) throws IOException {
+  public boolean postScannerNext(final InternalScanner s,
+      final List<KeyValue> results, final int limit, boolean hasMore)
+      throws IOException {
     try {
       coprocessorLock.readLock().lock();
       for (Environment env: coprocessors) {
         if (env.impl instanceof RegionObserver) {
-          results = ((RegionObserver)env.impl).postScannerNext(env, scannerId,
-            results);
+          hasMore = ((RegionObserver)env.impl).postScannerNext(env, s,
+            results, limit, hasMore);
+          if (env.shouldComplete()) {
+            break;
+          }
         }
       }
-      return results;
+      return hasMore;
     } finally {
       coprocessorLock.readLock().unlock();
     }
   }
 
   /**
-   * @param scannerId the scanner id
+   * @param s the scanner
+   * @return true if default behavior should be bypassed, false otherwise
    * @exception IOException Exception
    */
-  public void preScannerClose(final long scannerId)
+  public boolean preScannerClose(final InternalScanner s)
       throws IOException {
     try {
+      boolean bypass = false;
       coprocessorLock.readLock().lock();
       for (Environment env: coprocessors) {
         if (env.impl instanceof RegionObserver) {
-          ((RegionObserver)env.impl).preScannerClose(env, scannerId);
+          ((RegionObserver)env.impl).preScannerClose(env, s);
+          bypass |= env.shouldBypass();
+          if (env.shouldComplete()) {
+            break;
+          }
         }
       }
+      return bypass;
     } finally {
       coprocessorLock.readLock().unlock();
     }
   }
 
   /**
-   * @param scannerId the scanner id
+   * @param s the scanner
    * @exception IOException Exception
    */
-  public void postScannerClose(final long scannerId)
+  public void postScannerClose(final InternalScanner s)
       throws IOException {
     try {
       coprocessorLock.readLock().lock();
       for (Environment env: coprocessors) {
         if (env.impl instanceof RegionObserver) {
-          ((RegionObserver)env.impl).postScannerClose(env, scannerId);
+          ((RegionObserver)env.impl).postScannerClose(env, s);
+          if (env.shouldComplete()) {
+            break;
+          }
         }
       }
     } finally {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1049471&r1=1049470&r2=1049471&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Dec 15 10:11:59 2010
@@ -1112,27 +1112,29 @@ public class HRegion implements HeapSize
    */
   public Result getClosestRowBefore(final byte [] row, final byte [] family)
   throws IOException {
-    Result result = null;
+    if (coprocessorHost != null) {
+      Result result = new Result();
+      if (coprocessorHost.preGetClosestRowBefore(row, family, result)) {
+        return result;
+      }
+    }
     // look across all the HStores for this region and determine what the
     // closest key is across all column families, since the data may be sparse
-    KeyValue key = null;
     checkRow(row);
     startRegionOperation();
-    if (coprocessorHost != null) {
-      coprocessorHost.preGetClosestRowBefore(row, family);
-    }
     try {
       Store store = getStore(family);
       KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
       // get the closest key. (HStore.getRowKeyAtOrBefore can return null)
-      key = store.getRowKeyAtOrBefore(kv);
+      KeyValue key = store.getRowKeyAtOrBefore(kv);
+      Result result = null;
       if (key != null) {
         Get get = new Get(key.getRow());
         get.addFamily(family);
         result = get(get, null);
       }
       if (coprocessorHost != null) {
-        result = coprocessorHost.postGetClosestRowBefore(row, family, result);
+        coprocessorHost.postGetClosestRowBefore(row, family, result);
       }
       return result;
     } finally {
@@ -1150,8 +1152,7 @@ public class HRegion implements HeapSize
    * @return InternalScanner
    * @throws IOException read exceptions
    */
-  public InternalScanner getScanner(Scan scan)
-  throws IOException {
+  public InternalScanner getScanner(Scan scan) throws IOException {
    return getScanner(scan, null);
   }
 
@@ -1175,13 +1176,17 @@ public class HRegion implements HeapSize
     }
   }
 
-  protected InternalScanner instantiateInternalScanner(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
+  protected InternalScanner instantiateInternalScanner(Scan scan,
+      List<KeyValueScanner> additionalScanners) throws IOException {
+    InternalScanner s = null;
     if (coprocessorHost != null) {
-      coprocessorHost.preScannerOpen(scan);
+      s = coprocessorHost.preScannerOpen(scan);
+    }
+    if (s == null) {
+      s = new RegionScanner(scan, additionalScanners);
     }
-    InternalScanner s = new RegionScanner(scan, additionalScanners);
     if (coprocessorHost != null) {
-      coprocessorHost.postScannerOpen(scan, s.hashCode());
+      s = coprocessorHost.postScannerOpen(scan, s);
     }
     return s;
   }
@@ -1243,17 +1248,20 @@ public class HRegion implements HeapSize
    * @throws IOException
    */
   public void delete(Map<byte[], List<KeyValue>> familyMap, boolean writeToWAL)
-  throws IOException {
+      throws IOException {
+    /* Run coprocessor pre hook outside of locks to avoid deadlock */
+    if (coprocessorHost != null) {
+      if (coprocessorHost.preDelete(familyMap, writeToWAL)) {
+        return;
+      }
+    }
+
     long now = EnvironmentEdgeManager.currentTimeMillis();
     byte [] byteNow = Bytes.toBytes(now);
     boolean flush = false;
 
     updatesLock.readLock().lock();
     try {
-      if (coprocessorHost != null) {
-        familyMap = coprocessorHost.preDelete(familyMap);
-      }
-
       for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
 
         byte[] family = e.getKey();
@@ -1318,7 +1326,7 @@ public class HRegion implements HeapSize
       flush = isFlushSize(memstoreSize.addAndGet(addedSize));
 
       if (coprocessorHost != null) {
-        coprocessorHost.postDelete(familyMap);
+        coprocessorHost.postDelete(familyMap, writeToWAL);
       }
     } finally {
       this.updatesLock.readLock().unlock();
@@ -1456,15 +1464,36 @@ public class HRegion implements HeapSize
     return batchOp.retCodes;
   }
 
+  @SuppressWarnings("unchecked")
   private long doMiniBatchPut(BatchOperationInProgress<Pair<Put, Integer>> batchOp) throws IOException {
+    /* Run coprocessor pre hook outside of locks to avoid deadlock */
+    if (coprocessorHost != null) {
+      List<Pair<Put, Integer>> ops =
+        new ArrayList<Pair<Put, Integer>>(batchOp.operations.length);
+      for (int i = 0; i < batchOp.operations.length; i++) {
+        Pair<Put, Integer> nextPair = batchOp.operations[i];
+        Put put = nextPair.getFirst();
+        Map<byte[], List<KeyValue>> familyMap = put.getFamilyMap();
+        if (coprocessorHost.prePut(familyMap, put.getWriteToWAL())) {
+          // pre hook says skip this Put
+          // adjust nextIndexToProcess if we skipped before it
+          if (batchOp.nextIndexToProcess > i) {
+            batchOp.nextIndexToProcess--;
+          }
+          continue;
+        }
+        ops.add(nextPair);
+      }
+      batchOp.operations = ops.toArray(new Pair[ops.size()]);
+    }
+
     long now = EnvironmentEdgeManager.currentTimeMillis();
     byte[] byteNow = Bytes.toBytes(now);
 
     /** Keep track of the locks we hold so we can release them in finally clause */
     List<Integer> acquiredLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
     // reference family maps directly so coprocessors can mutate them if desired
-    Map<byte[],List<KeyValue>>[] familyMaps =
-        new Map[batchOp.operations.length];
+    Map<byte[],List<KeyValue>>[] familyMaps = new Map[batchOp.operations.length];
     // We try to set up a batch in the range [firstIndex,lastIndexExclusive)
     int firstIndex = batchOp.nextIndexToProcess;
     int lastIndexExclusive = firstIndex;
@@ -1481,12 +1510,6 @@ public class HRegion implements HeapSize
         Integer providedLockId = nextPair.getSecond();
 
         Map<byte[], List<KeyValue>> familyMap = put.getFamilyMap();
-        // Check any loaded coprocessors
-        /* TODO: we should catch any throws coprocessor exceptions here to allow the
-           rest of the batch to continue.  This means fixing HBASE-2898 */
-        if (coprocessorHost != null) {
-          familyMap = coprocessorHost.prePut(familyMap);
-        }
         // store the family map reference to allow for mutations
         familyMaps[lastIndexExclusive] = familyMap;
 
@@ -1555,15 +1578,22 @@ public class HRegion implements HeapSize
       long addedSize = 0;
       for (int i = firstIndex; i < lastIndexExclusive; i++) {
         if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue;
-
         addedSize += applyFamilyMapToMemstore(familyMaps[i]);
         batchOp.retCodes[i] = OperationStatusCode.SUCCESS;
+      }
 
-        // execute any coprocessor post-hooks
-        if (coprocessorHost != null) {
-          coprocessorHost.postDelete(familyMaps[i]);
+      // ------------------------------------
+      // STEP 5. Run coprocessor post hooks
+      // ------------------------------------
+      if (coprocessorHost != null) {
+        for (int i = firstIndex; i < lastIndexExclusive; i++) {
+          // only for successful puts
+          if (batchOp.retCodes[i] != OperationStatusCode.SUCCESS) continue;
+          Put p = batchOp.operations[i].getFirst();
+          coprocessorHost.postPut(familyMaps[i], p.getWriteToWAL());
         }
       }
+
       success = true;
       return addedSize;
     } finally {
@@ -1738,8 +1768,14 @@ public class HRegion implements HeapSize
    * @param writeToWAL if true, then we should write to the log
    * @throws IOException
    */
-  private void put(Map<byte [], List<KeyValue>> familyMap,
-    boolean writeToWAL) throws IOException {
+  private void put(Map<byte [], List<KeyValue>> familyMap, boolean writeToWAL)
+      throws IOException {
+    /* run pre put hook outside of lock to avoid deadlock */
+    if (coprocessorHost != null) {
+      if (coprocessorHost.prePut(familyMap, writeToWAL)) {
+        return;
+      }
+    }
 
     long now = EnvironmentEdgeManager.currentTimeMillis();
     byte[] byteNow = Bytes.toBytes(now);
@@ -1747,9 +1783,6 @@ public class HRegion implements HeapSize
 
     this.updatesLock.readLock().lock();
     try {
-      if (coprocessorHost != null) {
-        familyMap = coprocessorHost.prePut(familyMap);
-      }
       checkFamilies(familyMap.keySet());
       updateKVTimestamps(familyMap.values(), byteNow);
       // write/sync to WAL should happen before we touch memstore.
@@ -1766,13 +1799,14 @@ public class HRegion implements HeapSize
 
       long addedSize = applyFamilyMapToMemstore(familyMap);
       flush = isFlushSize(memstoreSize.addAndGet(addedSize));
-
-      if (coprocessorHost != null) {
-        coprocessorHost.postPut(familyMap);
-      }
     } finally {
       this.updatesLock.readLock().unlock();
     }
+
+    if (coprocessorHost != null) {
+      coprocessorHost.postPut(familyMap, writeToWAL);
+    }
+
     if (flush) {
       // Request a cache flush.  Do it outside update lock.
       requestFlush();
@@ -2350,6 +2384,14 @@ public class HRegion implements HeapSize
 
     public synchronized boolean next(List<KeyValue> outResults, int limit)
         throws IOException {
+      if (coprocessorHost != null) {
+        Boolean result = coprocessorHost.preScannerNext((InternalScanner)this,
+          outResults, limit);
+        if (result != null) {
+          return result.booleanValue();
+        }
+      }
+
       if (this.filterClosed) {
         throw new UnknownScannerException("Scanner was closed (timed out?) " +
             "after we renewed it. Could be caused by a very slow scanner " +
@@ -2363,14 +2405,11 @@ public class HRegion implements HeapSize
 
         results.clear();
 
-        if (coprocessorHost != null) {
-          coprocessorHost.preScannerNext(hashCode());
-        }
-
         boolean returnResult = nextInternal(limit);
 
         if (coprocessorHost != null) {
-          results = coprocessorHost.postScannerNext(hashCode(), results);
+          returnResult = coprocessorHost.postScannerNext((InternalScanner)this,
+            results, limit, returnResult);
         }
 
         outResults.addAll(results);
@@ -2416,8 +2455,10 @@ public class HRegion implements HeapSize
           do {
             this.storeHeap.next(results, limit - results.size());
             if (limit > 0 && results.size() == limit) {
-              if (this.filter != null && filter.hasFilterRow()) throw new IncompatibleFilterException(
+              if (this.filter != null && filter.hasFilterRow()) {
+                throw new IncompatibleFilterException(
                   "Filter with filterRow(List<KeyValue>) incompatible with scan with limit!");
+              }
               return true; // we are expecting more yes, but also limited to how many we can return.
             }
           } while (Bytes.equals(currentRow, nextRow = peekRow()));
@@ -2480,7 +2521,9 @@ public class HRegion implements HeapSize
 
     public synchronized void close() throws IOException {
       if (coprocessorHost != null) {
-        coprocessorHost.preScannerClose(hashCode());
+        if (coprocessorHost.preScannerClose((InternalScanner)this)) {
+          return;
+        }
       }
       if (storeHeap != null) {
         storeHeap.close();
@@ -2488,7 +2531,7 @@ public class HRegion implements HeapSize
       }
       this.filterClosed = true;
       if (coprocessorHost != null) {
-        coprocessorHost.postScannerClose(hashCode());
+        coprocessorHost.postScannerClose((InternalScanner)this);
       }
     }
   }
@@ -3064,32 +3107,27 @@ public class HRegion implements HeapSize
   throws IOException {
     Scan scan = new Scan(get);
 
-    List<KeyValue> results = null;
-    List<KeyValue> getResults = new ArrayList<KeyValue>();
+    List<KeyValue> results = new ArrayList<KeyValue>();
 
     // pre-get CP hook
     if (withCoprocessor && (coprocessorHost != null)) {
-      get = coprocessorHost.preGet(get);
+       if (coprocessorHost.preGet(get, results)) {
+         return results;
+       }
     }
 
     InternalScanner scanner = null;
     try {
       scanner = getScanner(scan);
-      scanner.next(getResults);
+      scanner.next(results);
     } finally {
       if (scanner != null)
         scanner.close();
     }
-    // append get results to pre-get results
-    if (results != null){
-      results.addAll(getResults);
-    }
-    else {
-      results = getResults;
-    }
+
     // post-get CP hook
     if (withCoprocessor && (coprocessorHost != null)) {
-      results = coprocessorHost.postGet(get, results);
+      coprocessorHost.postGet(get, results);
     }
 
     return results;

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1049471&r1=1049470&r2=1049471&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Dec 15 10:11:59 2010
@@ -1599,7 +1599,10 @@ public class HRegionServer implements HR
     try {
       HRegion region = getRegion(regionName);
       if (region.getCoprocessorHost() != null) {
-        region.getCoprocessorHost().preExists(get);
+        Boolean result = region.getCoprocessorHost().preExists(get);
+        if (result != null) {
+          return result.booleanValue();
+        }
       }
       Result r = region.get(get, getLockFromId(get.getLockId()));
       boolean result = r != null && !r.isEmpty();
@@ -1702,8 +1705,11 @@ public class HRegionServer implements HR
     }
     HRegion region = getRegion(regionName);
     if (region.getCoprocessorHost() != null) {
-      region.getCoprocessorHost().preCheckAndPut(row, family, qualifier,
-        value, put);
+      Boolean result = region.getCoprocessorHost()
+        .preCheckAndPut(row, family, qualifier, value, put);
+      if (result != null) {
+        return result.booleanValue();
+      }
     }
     boolean result = checkAndMutate(regionName, row, family, qualifier,
       value, put, getLockFromId(put.getLockId()));
@@ -1737,8 +1743,11 @@ public class HRegionServer implements HR
     }
     HRegion region = getRegion(regionName);
     if (region.getCoprocessorHost() != null) {
-      region.getCoprocessorHost().preCheckAndDelete(row, family, qualifier,
-        value, delete);
+      Boolean result = region.getCoprocessorHost().preCheckAndDelete(row,
+        family, qualifier, value, delete);
+      if (result != null) {
+        return result.booleanValue();
+      }
     }
     boolean result = checkAndMutate(regionName, row, family, qualifier, value,
       delete, getLockFromId(delete.getLockId()));
@@ -2462,12 +2471,15 @@ public class HRegionServer implements HR
       Increment incVal = increment;
       Result resVal;
       if (region.getCoprocessorHost() != null) {
-        incVal = region.getCoprocessorHost().preIncrement(incVal);
+        resVal = region.getCoprocessorHost().preIncrement(incVal);
+        if (resVal != null) {
+          return resVal;
+        }
       }
       resVal = region.increment(incVal, getLockFromId(increment.getLockId()),
           increment.getWriteToWAL());
       if (region.getCoprocessorHost() != null) {
-        resVal = region.getCoprocessorHost().postIncrement(incVal, resVal);
+        region.getCoprocessorHost().postIncrement(incVal, resVal);
       }
       return resVal;
     } catch (IOException e) {
@@ -2489,16 +2501,18 @@ public class HRegionServer implements HR
     requestCount.incrementAndGet();
     try {
       HRegion region = getRegion(regionName);
-      long amountVal = amount;
       if (region.getCoprocessorHost() != null) {
-        amountVal = region.getCoprocessorHost().preIncrementColumnValue(row,
-          family, qualifier, amountVal, writeToWAL);
+        Long amountVal = region.getCoprocessorHost().preIncrementColumnValue(row,
+          family, qualifier, amount, writeToWAL);
+        if (amountVal != null) {
+          return amountVal.longValue();
+        }
       }
       long retval = region.incrementColumnValue(row, family, qualifier, amount,
-          writeToWAL);
+        writeToWAL);
       if (region.getCoprocessorHost() != null) {
         retval = region.getCoprocessorHost().postIncrementColumnValue(row,
-          family, qualifier, amountVal, writeToWAL, retval);
+          family, qualifier, amount, writeToWAL, retval);
       }
       return retval;
     } catch (IOException e) {

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java?rev=1049471&r1=1049470&r2=1049471&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java Wed Dec 15 10:11:59 2010
@@ -34,11 +34,8 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 
-import static org.junit.Assert.*;
-
 /**
  * A sample region observer that tests the RegionObserver interface.
  * It works with TestRegionObserverInterface to provide the test case.
@@ -59,22 +56,26 @@ public class SimpleRegionObserver extend
   boolean hadPreIncrement = false;
   boolean hadPostIncrement = false;
 
-
-  // Overriden RegionObserver methods
   @Override
-  public Get preGet(CoprocessorEnvironment e, Get get) {
+  public void preGet(final CoprocessorEnvironment e, final Get get,
+      final List<KeyValue> results) throws IOException {
+    assertNotNull(e);
+    assertNotNull(e.getRegion());
+    assertNotNull(get);
+    assertNotNull(results);
     if (Arrays.equals(e.getRegion().getTableDesc().getName(),
         TestRegionObserverInterface.TEST_TABLE)) {
       hadPreGet = true;
-      assertNotNull(e);
-      assertNotNull(e.getRegion());
     }
-    return get;
   }
 
   @Override
-  public List<KeyValue> postGet(CoprocessorEnvironment e, Get get,
-      List<KeyValue> results) {
+  public void postGet(final CoprocessorEnvironment e, final Get get,
+      final List<KeyValue> results) {
+    assertNotNull(e);
+    assertNotNull(e.getRegion());
+    assertNotNull(get);
+    assertNotNull(results);
     if (Arrays.equals(e.getRegion().getTableDesc().getName(),
         TestRegionObserverInterface.TEST_TABLE)) {
       boolean foundA = false;
@@ -96,12 +97,14 @@ public class SimpleRegionObserver extend
       assertTrue(foundC);
       hadPostGet = true;
     }
-    return results;
   }
 
   @Override
-  public Map<byte[], List<KeyValue>> prePut(CoprocessorEnvironment e,
-      Map<byte[], List<KeyValue>> familyMap) {
+  public void prePut(final CoprocessorEnvironment e, final Map<byte[],
+      List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
+    assertNotNull(e);
+    assertNotNull(e.getRegion());
+    assertNotNull(familyMap);
     if (Arrays.equals(e.getRegion().getTableDesc().getName(),
         TestRegionObserverInterface.TEST_TABLE)) {
       List<KeyValue> kvs = familyMap.get(TestRegionObserverInterface.A);
@@ -121,12 +124,14 @@ public class SimpleRegionObserver extend
           TestRegionObserverInterface.C));
       hadPrePut = true;
     }
-    return familyMap;
   }
 
   @Override
-  public void postPut(CoprocessorEnvironment e,
-      Map<byte[], List<KeyValue>> familyMap) {
+  public void postPut(final CoprocessorEnvironment e, final Map<byte[],
+      List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
+    assertNotNull(e);
+    assertNotNull(e.getRegion());
+    assertNotNull(familyMap);
     List<KeyValue> kvs = familyMap.get(TestRegionObserverInterface.A);
     if (Arrays.equals(e.getRegion().getTableDesc().getName(),
         TestRegionObserverInterface.TEST_TABLE)) {
@@ -149,18 +154,23 @@ public class SimpleRegionObserver extend
   }
 
   @Override
-  public Map<byte[], List<KeyValue>> preDelete(CoprocessorEnvironment e,
-      Map<byte[], List<KeyValue>> familyMap) {
+  public void preDelete(final CoprocessorEnvironment e, final Map<byte[],
+      List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
+    assertNotNull(e);
+    assertNotNull(e.getRegion());
+    assertNotNull(familyMap);
     if (beforeDelete && e.getRegion().getTableDesc().getName().equals(
         TestRegionObserverInterface.TEST_TABLE)) {
       hadPreDeleted = true;
     }
-    return familyMap;
   }
 
   @Override
-  public void postDelete(CoprocessorEnvironment e,
-      Map<byte[], List<KeyValue>> familyMap) {
+  public void postDelete(final CoprocessorEnvironment e, final Map<byte[],
+      List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
+    assertNotNull(e);
+    assertNotNull(e.getRegion());
+    assertNotNull(familyMap);
     if (Arrays.equals(e.getRegion().getTableDesc().getName(),
         TestRegionObserverInterface.TEST_TABLE)) {
       beforeDelete = false;
@@ -170,7 +180,12 @@ public class SimpleRegionObserver extend
 
   @Override
   public void preGetClosestRowBefore(final CoprocessorEnvironment e,
-      final byte[] row, final byte[] family) {
+      final byte[] row, final byte[] family, final Result result)
+      throws IOException {
+    assertNotNull(e);
+    assertNotNull(e.getRegion());
+    assertNotNull(row);
+    assertNotNull(result);
     if (beforeDelete && e.getRegion().getTableDesc().getName().equals(
         TestRegionObserverInterface.TEST_TABLE)) {
       hadPreGetClosestRowBefore = true;
@@ -178,70 +193,35 @@ public class SimpleRegionObserver extend
   }
 
   @Override
-  public Result postGetClosestRowBefore(final CoprocessorEnvironment e,
-      final byte[] row, final byte[] family, Result result) {
+  public void postGetClosestRowBefore(final CoprocessorEnvironment e,
+      final byte[] row, final byte[] family, final Result result)
+      throws IOException {
+    assertNotNull(e);
+    assertNotNull(e.getRegion());
+    assertNotNull(row);
+    assertNotNull(result);
     if (Arrays.equals(e.getRegion().getTableDesc().getName(),
         TestRegionObserverInterface.TEST_TABLE)) {
       hadPostGetClosestRowBefore = true;
     }
-    return result;
-  }
-
-  @Override
-  public Scan preScannerOpen(CoprocessorEnvironment e, Scan scan) {
-    // not tested -- need to go through the RS to get here
-    return scan;
-  }
-
-  @Override
-  public void postScannerOpen(CoprocessorEnvironment e, Scan scan,
-      long scannerId) {
-    // not tested -- need to go through the RS to get here
   }
 
   @Override
-  public void preScannerNext(final CoprocessorEnvironment e,
-      final long scannerId) {
-    // not tested -- need to go through the RS to get here
-  }
-
-  @Override
-  public List<KeyValue> postScannerNext(final CoprocessorEnvironment e,
-      final long scannerId, List<KeyValue> results) {
-    // not tested -- need to go through the RS to get here
-    return results;
-  }
-
-  @Override
-  public void preScannerClose(final CoprocessorEnvironment e,
-      final long scannerId) {
-    // not tested -- need to go through the RS to get here
-  }
-
-  @Override
-  public void postScannerClose(final CoprocessorEnvironment e,
-      final long scannerId) {
-    // not tested -- need to go through the RS to get here
-  }
-
-  @Override
-  public Increment preIncrement(CoprocessorEnvironment e, Increment increment)
-      throws IOException {
+  public void preIncrement(final CoprocessorEnvironment e,
+      final Increment increment, final Result result) throws IOException {
     if (Arrays.equals(e.getRegion().getTableDesc().getName(),
         TestRegionObserverInterface.TEST_TABLE_2)) {
       hadPreIncrement = true;
     }
-    return increment;
   }
 
   @Override
-  public Result postIncrement(CoprocessorEnvironment e, Increment increment,
-      Result result) throws IOException {
+  public void postIncrement(final CoprocessorEnvironment e,
+      final Increment increment, final Result result) throws IOException {
     if (Arrays.equals(e.getRegion().getTableDesc().getName(),
         TestRegionObserverInterface.TEST_TABLE_2)) {
       hadPostIncrement = true;
     }
-    return result;
   }
 
   boolean hadPreGet() {

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java?rev=1049471&r1=1049470&r2=1049471&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java Wed Dec 15 10:11:59 2010
@@ -46,8 +46,9 @@ public class TestRegionObserverStacking 
   public static class ObserverA extends BaseRegionObserverCoprocessor {
     long id;
     @Override
-    public void postPut(final CoprocessorEnvironment e,
-        Map<byte[], List<KeyValue>> familyMap) {
+    public void postPut(final CoprocessorEnvironment e, final Map<byte[],
+        List<KeyValue>> familyMap, final boolean writeToWAL)
+        throws IOException {
       id = System.currentTimeMillis();
       try {
         Thread.sleep(10);
@@ -59,8 +60,9 @@ public class TestRegionObserverStacking 
   public static class ObserverB extends BaseRegionObserverCoprocessor {
     long id;
     @Override
-    public void postPut(final CoprocessorEnvironment e,
-        Map<byte[], List<KeyValue>> familyMap) {
+    public void postPut(final CoprocessorEnvironment e, final Map<byte[],
+        List<KeyValue>> familyMap, final boolean writeToWAL)
+        throws IOException {
       id = System.currentTimeMillis();
       try {
         Thread.sleep(10);
@@ -73,8 +75,9 @@ public class TestRegionObserverStacking 
     long id;
 
     @Override
-    public void postPut(final CoprocessorEnvironment e,
-        Map<byte[], List<KeyValue>> familyMap) {
+    public void postPut(final CoprocessorEnvironment e, final Map<byte[],
+        List<KeyValue>> familyMap, final boolean writeToWAL)
+        throws IOException {
       id = System.currentTimeMillis();
       try {
         Thread.sleep(10);