You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2010/12/15 11:12:00 UTC
svn commit: r1049471 - in /hbase/trunk: ./
src/main/java/org/apache/hadoop/hbase/coprocessor/
src/main/java/org/apache/hadoop/hbase/regionserver/
src/test/java/org/apache/hadoop/hbase/coprocessor/
Author: apurtell
Date: Wed Dec 15 10:11:59 2010
New Revision: 1049471
URL: http://svn.apache.org/viewvc?rev=1049471&view=rev
Log:
HBASE-3348 Coprocessors: Allow Observers to completely override base function
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorEnvironment.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CoprocessorHost.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1049471&r1=1049470&r2=1049471&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Wed Dec 15 10:11:59 2010
@@ -36,6 +36,8 @@ Release 0.91.0 - Unreleased
HBASE-1861 Multi-Family support for bulk upload tools
HBASE-3308 SplitTransaction.splitStoreFiles slows splits a lot
HBASE-3328 Added Admin API to specify explicit split points
+ HBASE-3345 Coprocessors: Allow observers to completely override base
+ function
NEW FEATURES
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java?rev=1049471&r1=1049470&r2=1049471&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java Wed Dec 15 10:11:59 2010
@@ -27,6 +27,8 @@ import org.apache.hadoop.hbase.client.Pu
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+
import java.io.IOException;
/**
@@ -71,70 +73,65 @@ public abstract class BaseRegionObserver
@Override
public void preGetClosestRowBefore(final CoprocessorEnvironment e,
- final byte [] row, final byte [] family)
+ final byte [] row, final byte [] family, final Result result)
throws IOException {
}
@Override
- public Result postGetClosestRowBefore(final CoprocessorEnvironment e,
- byte[] row, byte[] family, final Result result)
- throws IOException {
- return result;
+ public void postGetClosestRowBefore(final CoprocessorEnvironment e,
+ final byte [] row, final byte [] family, final Result result)
+ throws IOException {
}
@Override
- public Get preGet(final CoprocessorEnvironment e, final Get get)
- throws IOException {
- return get;
+ public void preGet(final CoprocessorEnvironment e, final Get get,
+ final List<KeyValue> results) throws IOException {
}
@Override
- public List<KeyValue> postGet(final CoprocessorEnvironment e, final Get get,
- List<KeyValue> results) throws IOException {
- return results;
+ public void postGet(final CoprocessorEnvironment e, final Get get,
+ final List<KeyValue> results) throws IOException {
}
@Override
- public Get preExists(final CoprocessorEnvironment e, final Get get)
- throws IOException {
- return get;
+ public boolean preExists(final CoprocessorEnvironment e, final Get get,
+ final boolean exists) throws IOException {
+ return exists;
}
@Override
- public boolean postExists(final CoprocessorEnvironment e,
- final Get get, boolean exists)
- throws IOException {
+ public boolean postExists(final CoprocessorEnvironment e, final Get get,
+ boolean exists) throws IOException {
return exists;
}
@Override
- public Map<byte[], List<KeyValue>> prePut(final CoprocessorEnvironment e,
- final Map<byte[], List<KeyValue>> familyMap) throws IOException {
- return familyMap;
+ public void prePut(final CoprocessorEnvironment e, final Map<byte[],
+ List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
}
@Override
- public void postPut(final CoprocessorEnvironment e,
- final Map<byte[], List<KeyValue>> familyMap)
- throws IOException {
+ public void postPut(final CoprocessorEnvironment e, final Map<byte[],
+ List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
}
@Override
- public Map<byte[], List<KeyValue>> preDelete(final CoprocessorEnvironment e,
- final Map<byte[], List<KeyValue>> familyMap) throws IOException {
- return familyMap;
+ public void preDelete(final CoprocessorEnvironment e, final Map<byte[],
+ List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
}
@Override
- public void postDelete(CoprocessorEnvironment e,
- Map<byte[], List<KeyValue>> familyMap) throws IOException {
+ public void postDelete(final CoprocessorEnvironment e,
+ final Map<byte[], List<KeyValue>> familyMap, final boolean writeToWAL)
+ throws IOException {
}
@Override
- public Put preCheckAndPut(final CoprocessorEnvironment e,
+ public boolean preCheckAndPut(final CoprocessorEnvironment e,
final byte [] row, final byte [] family, final byte [] qualifier,
- final byte [] value, final Put put) throws IOException {
- return put;
+ final byte [] value, final Put put, final boolean result)
+ throws IOException {
+ return result;
}
@Override
@@ -146,18 +143,18 @@ public abstract class BaseRegionObserver
}
@Override
- public Delete preCheckAndDelete(final CoprocessorEnvironment e,
+ public boolean preCheckAndDelete(final CoprocessorEnvironment e,
final byte [] row, final byte [] family, final byte [] qualifier,
- final byte [] value, final Delete delete)
- throws IOException {
- return delete;
+ final byte [] value, final Delete delete, final boolean result)
+ throws IOException {
+ return result;
}
@Override
public boolean postCheckAndDelete(final CoprocessorEnvironment e,
final byte [] row, final byte [] family, final byte [] qualifier,
final byte [] value, final Delete delete, final boolean result)
- throws IOException {
+ throws IOException {
return result;
}
@@ -172,54 +169,53 @@ public abstract class BaseRegionObserver
public long postIncrementColumnValue(final CoprocessorEnvironment e,
final byte [] row, final byte [] family, final byte [] qualifier,
final long amount, final boolean writeToWAL, long result)
- throws IOException {
+ throws IOException {
return result;
}
@Override
- public Increment preIncrement(final CoprocessorEnvironment e,
- final Increment increment)
- throws IOException {
- return increment;
+ public void preIncrement(final CoprocessorEnvironment e,
+ final Increment increment, final Result result) throws IOException {
}
@Override
- public Result postIncrement(final CoprocessorEnvironment e,
- final Increment increment,
- final Result result) throws IOException {
- return result;
+ public void postIncrement(final CoprocessorEnvironment e,
+ final Increment increment, final Result result) throws IOException {
}
@Override
- public Scan preScannerOpen(final CoprocessorEnvironment e, final Scan scan)
- throws IOException {
- return scan;
+ public InternalScanner preScannerOpen(final CoprocessorEnvironment e,
+ final Scan scan, final InternalScanner s) throws IOException {
+ return s;
}
@Override
- public void postScannerOpen(final CoprocessorEnvironment e,
- final Scan scan,
- final long scannerId) throws IOException { }
+ public InternalScanner postScannerOpen(final CoprocessorEnvironment e,
+ final Scan scan, final InternalScanner s) throws IOException {
+ return s;
+ }
@Override
- public void preScannerNext(final CoprocessorEnvironment e,
- final long scannerId) throws IOException {
+ public boolean preScannerNext(final CoprocessorEnvironment e,
+ final InternalScanner s, final List<KeyValue> results,
+ final int limit, final boolean hasMore) throws IOException {
+ return hasMore;
}
@Override
- public List<KeyValue> postScannerNext(final CoprocessorEnvironment e,
- final long scannerId, final List<KeyValue> results)
- throws IOException {
- return results;
+ public boolean postScannerNext(final CoprocessorEnvironment e,
+ final InternalScanner s, final List<KeyValue> results, final int limit,
+ final boolean hasMore) throws IOException {
+ return hasMore;
}
@Override
public void preScannerClose(final CoprocessorEnvironment e,
- final long scannerId)
- throws IOException { }
+ final InternalScanner s) throws IOException {
+ }
@Override
public void postScannerClose(final CoprocessorEnvironment e,
- final long scannerId)
- throws IOException { }
+ final InternalScanner s) throws IOException {
+ }
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorEnvironment.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorEnvironment.java?rev=1049471&r1=1049470&r2=1049471&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorEnvironment.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorEnvironment.java Wed Dec 15 10:11:59 2010
@@ -45,27 +45,17 @@ public interface CoprocessorEnvironment
*/
public HTableInterface getTable(byte[] tableName) throws IOException;
- // environment variables
+ /* Control flow changes */
/**
- * Get an environment variable
- * @param key the key
- * @return the object corresponding to the environment variable, if set
+ * Causes framework to bypass default actions and return with the results
+ * from a preXXX chain.
*/
- public Object get(Object key);
+ public void bypass();
/**
- * Set an environment variable
- * @param key the key
- * @param value the value
+ * Mark coprocessor chain processing as complete. Causes framework to return
+ * immediately without calling any additional chained coprocessors.
*/
- public void put(Object key, Object value);
-
- /**
- * Remove an environment variable
- * @param key the key
- * @return the object corresponding to the environment variable, if set
- */
- public Object remove(Object key);
-
+ public void complete();
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java?rev=1049471&r1=1049470&r2=1049471&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java Wed Dec 15 10:11:59 2010
@@ -27,6 +27,8 @@ import org.apache.hadoop.hbase.client.Re
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.coprocessor.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+
import java.io.IOException;
/**
@@ -37,62 +39,92 @@ public interface RegionObserver {
/**
* Called before a client makes a GetClosestRowBefore request.
+ * <p>
+ * Call CoprocessorEnvironment#bypass to skip default actions
+ * <p>
+ * Call CoprocessorEnvironment#complete to skip any subsequent chained
+ * coprocessors
* @param e the environment provided by the region server
* @param row the row
* @param family the family
+ * @param result The result to return to the client if default processing
+ * is bypassed. Can be modified. Will not be used if default processing
+ * is not bypassed.
* @throws IOException if an error occurred on the coprocessor
*/
public void preGetClosestRowBefore(final CoprocessorEnvironment e,
- final byte [] row, final byte [] family)
+ final byte [] row, final byte [] family, final Result result)
throws IOException;
/**
* Called after a client makes a GetClosestRowBefore request.
+ * <p>
+ * Call CoprocessorEnvironment#complete to skip any subsequent chained
+ * coprocessors
* @param e the environment provided by the region server
* @param row the row
* @param family the desired family
- * @param result the result set
- * @return the possible tranformed result set to return to the client
+ * @param result the result to return to the client, modify as necessary
* @throws IOException if an error occurred on the coprocessor
*/
- public Result postGetClosestRowBefore(final CoprocessorEnvironment e,
+ public void postGetClosestRowBefore(final CoprocessorEnvironment e,
final byte [] row, final byte [] family, final Result result)
throws IOException;
/**
- * Called before the client perform a get()
+ * Called before the client performs a Get
+ * <p>
+ * Call CoprocessorEnvironment#bypass to skip default actions
+ * <p>
+ * Call CoprocessorEnvironment#complete to skip any subsequent chained
+ * coprocessors
* @param e the environment provided by the region server
* @param get the Get request
- * @return the possibly transformed Get object by coprocessor
+ * @param result The result to return to the client if default processing
+ * is bypassed. Can be modified. Will not be used if default processing
+ * is not bypassed.
* @throws IOException if an error occurred on the coprocessor
*/
- public Get preGet(final CoprocessorEnvironment e, final Get get)
+ public void preGet(final CoprocessorEnvironment e, final Get get,
+ final List<KeyValue> result)
throws IOException;
/**
- * Called after the client perform a get()
+ * Called after the client performs a Get
+ * <p>
+ * Call CoprocessorEnvironment#complete to skip any subsequent chained
+ * coprocessors
* @param e the environment provided by the region server
* @param get the Get request
- * @param results the result list
- * @return the possibly transformed result list to return to client
+ * @param result the result to return to the client, modify as necessary
* @throws IOException if an error occurred on the coprocessor
*/
- public List<KeyValue> postGet(final CoprocessorEnvironment e, final Get get,
- final List<KeyValue> results)
+ public void postGet(final CoprocessorEnvironment e, final Get get,
+ final List<KeyValue> result)
throws IOException;
/**
* Called before the client tests for existence using a Get.
+ * <p>
+ * Call CoprocessorEnvironment#bypass to skip default actions
+ * <p>
+ * Call CoprocessorEnvironment#complete to skip any subsequent chained
+ * coprocessors
* @param e the environment provided by the region server
* @param get the Get request
- * @return the possibly transformed Get object by coprocessor
+ * @param exists
+ * @return the value to return to the client if bypassing default processing
* @throws IOException if an error occurred on the coprocessor
*/
- public Get preExists(final CoprocessorEnvironment e, final Get get)
+ public boolean preExists(final CoprocessorEnvironment e, final Get get,
+ final boolean exists)
throws IOException;
/**
* Called after the client tests for existence using a Get.
+ * <p>
+ * Call CoprocessorEnvironment#complete to skip any subsequent chained
+ * coprocessors
* @param e the environment provided by the region server
* @param get the Get request
* @param exists the result returned by the region server
@@ -105,64 +137,92 @@ public interface RegionObserver {
/**
* Called before the client stores a value.
+ * <p>
+ * Call CoprocessorEnvironment#bypass to skip default actions
+ * <p>
+ * Call CoprocessorEnvironment#complete to skip any subsequent chained
+ * coprocessors
* @param e the environment provided by the region server
- * @param familyMap map of family to edits for the given family.
- * @return the possibly transformed map to actually use
+ * @param familyMap map of family to edits for the given family
+ * @param writeToWAL true if the change should be written to the WAL
* @throws IOException if an error occurred on the coprocessor
*/
- public Map<byte[], List<KeyValue>> prePut(final CoprocessorEnvironment e,
- final Map<byte[], List<KeyValue>> familyMap)
+ public void prePut(final CoprocessorEnvironment e, final Map<byte[],
+ List<KeyValue>> familyMap, final boolean writeToWAL)
throws IOException;
/**
* Called after the client stores a value.
+ * <p>
+ * Call CoprocessorEnvironment#complete to skip any subsequent chained
+ * coprocessors
* @param e the environment provided by the region server
- * @param familyMap map of family to edits for the given family.
+ * @param familyMap map of family to edits for the given family
+ * @param writeToWAL true if the change should be written to the WAL
* @throws IOException if an error occurred on the coprocessor
*/
public void postPut(final CoprocessorEnvironment e, final Map<byte[],
- List<KeyValue>> familyMap)
+ List<KeyValue>> familyMap, final boolean writeToWAL)
throws IOException;
/**
* Called before the client deletes a value.
+ * <p>
+ * Call CoprocessorEnvironment#bypass to skip default actions
+ * <p>
+ * Call CoprocessorEnvironment#complete to skip any subsequent chained
+ * coprocessors
* @param e the environment provided by the region server
- * @param familyMap map of family to edits for the given family.
- * @return the possibly transformed map to actually use
+ * @param familyMap map of family to edits for the given family
+ * @param writeToWAL true if the change should be written to the WAL
* @throws IOException if an error occurred on the coprocessor
*/
- public Map<byte[], List<KeyValue>> preDelete(final CoprocessorEnvironment e,
- final Map<byte[], List<KeyValue>> familyMap)
+ public void preDelete(final CoprocessorEnvironment e, final Map<byte[],
+ List<KeyValue>> familyMap, final boolean writeToWAL)
throws IOException;
/**
* Called after the client deletes a value.
+ * <p>
+ * Call CoprocessorEnvironment#complete to skip any subsequent chained
+ * coprocessors
* @param e the environment provided by the region server
- * @param familyMap map of family to edits for the given family.
+ * @param familyMap map of family to edits for the given family
+ * @param writeToWAL true if the change should be written to the WAL
* @throws IOException if an error occurred on the coprocessor
*/
public void postDelete(final CoprocessorEnvironment e,
- final Map<byte[], List<KeyValue>> familyMap)
+ final Map<byte[], List<KeyValue>> familyMap, final boolean writeToWAL)
throws IOException;
/**
* Called before checkAndPut
+ * <p>
+ * Call CoprocessorEnvironment#bypass to skip default actions
+ * <p>
+ * Call CoprocessorEnvironment#complete to skip any subsequent chained
+ * coprocessors
* @param e the environment provided by the region server
* @param row row to check
* @param family column family
* @param qualifier column qualifier
* @param value the expected value
* @param put data to put if check succeeds
- * @return the possibly transformed map to actually use
+ * @param result
+ * @return the return value to return to client if bypassing default
+ * processing
* @throws IOException if an error occurred on the coprocessor
*/
- public Put preCheckAndPut(final CoprocessorEnvironment e,
+ public boolean preCheckAndPut(final CoprocessorEnvironment e,
final byte [] row, final byte [] family, final byte [] qualifier,
- final byte [] value, final Put put)
+ final byte [] value, final Put put, final boolean result)
throws IOException;
/**
* Called after checkAndPut
+ * <p>
+ * Call CoprocessorEnvironment#complete to skip any subsequent chained
+ * coprocessors
* @param e the environment provided by the region server
* @param row row to check
* @param family column family
@@ -170,7 +230,7 @@ public interface RegionObserver {
* @param value the expected value
* @param put data to put if check succeeds
* @param result from the checkAndPut
- * @return the possibly transformed value to return to client
+ * @return the possibly transformed return value to return to client
* @throws IOException if an error occurred on the coprocessor
*/
public boolean postCheckAndPut(final CoprocessorEnvironment e,
@@ -179,23 +239,32 @@ public interface RegionObserver {
throws IOException;
/**
- * Called before checkAndPut
+ * Called before checkAndDelete
+ * <p>
+ * Call CoprocessorEnvironment#bypass to skip default actions
+ * <p>
+ * Call CoprocessorEnvironment#complete to skip any subsequent chained
+ * coprocessors
* @param e the environment provided by the region server
* @param row row to check
* @param family column family
* @param qualifier column qualifier
* @param value the expected value
* @param delete delete to commit if check succeeds
- * @return the possibly transformed map to actually use
+ * @param result
+ * @return the value to return to client if bypassing default processing
* @throws IOException if an error occurred on the coprocessor
*/
- public Delete preCheckAndDelete(final CoprocessorEnvironment e,
+ public boolean preCheckAndDelete(final CoprocessorEnvironment e,
final byte [] row, final byte [] family, final byte [] qualifier,
- final byte [] value, final Delete delete)
+ final byte [] value, final Delete delete, final boolean result)
throws IOException;
/**
* Called after checkAndDelete
+ * <p>
+ * Call CoprocessorEnvironment#complete to skip any subsequent chained
+ * coprocessors
* @param e the environment provided by the region server
* @param row row to check
* @param family column family
@@ -203,7 +272,7 @@ public interface RegionObserver {
* @param value the expected value
* @param delete delete to commit if check succeeds
* @param result from the CheckAndDelete
- * @return the possibly transformed value to return to client
+ * @return the possibly transformed returned value to return to client
* @throws IOException if an error occurred on the coprocessor
*/
public boolean postCheckAndDelete(final CoprocessorEnvironment e,
@@ -213,13 +282,18 @@ public interface RegionObserver {
/**
* Called before incrementColumnValue
+ * <p>
+ * Call CoprocessorEnvironment#bypass to skip default actions
+ * <p>
+ * Call CoprocessorEnvironment#complete to skip any subsequent chained
+ * coprocessors
* @param e the environment provided by the region server
* @param row row to check
* @param family column family
* @param qualifier column qualifier
* @param amount long amount to increment
- * @param writeToWAL whether to write the increment to the WAL
- * @return new amount to increment
+ * @param writeToWAL true if the change should be written to the WAL
+ * @return value to return to the client if bypassing default processing
* @throws IOException if an error occurred on the coprocessor
*/
public long preIncrementColumnValue(final CoprocessorEnvironment e,
@@ -229,12 +303,15 @@ public interface RegionObserver {
/**
* Called after incrementColumnValue
+ * <p>
+ * Call CoprocessorEnvironment#complete to skip any subsequent chained
+ * coprocessors
* @param e the environment provided by the region server
* @param row row to check
* @param family column family
* @param qualifier column qualifier
* @param amount long amount to increment
- * @param writeToWAL whether to write the increment to the WAL
+ * @param writeToWAL true if the change should be written to the WAL
* @param result the result returned by incrementColumnValue
* @return the result to return to the client
* @throws IOException if an error occurred on the coprocessor
@@ -245,92 +322,137 @@ public interface RegionObserver {
throws IOException;
/**
- * Called before incrementColumnValue
+ * Called before Increment
+ * <p>
+ * Call CoprocessorEnvironment#bypass to skip default actions
+ * <p>
+ * Call CoprocessorEnvironment#complete to skip any subsequent chained
+ * coprocessors
* @param e the environment provided by the region server
* @param increment increment object
- * @param writeToWAL whether to write the increment to the WAL
- * @return new Increment instance
+ * @param result The result to return to the client if default processing
+ * is bypassed. Can be modified. Will not be used if default processing
+ * is not bypassed.
+ * @param writeToWAL true if the change should be written to the WAL
* @throws IOException if an error occurred on the coprocessor
*/
- public Increment preIncrement(final CoprocessorEnvironment e,
- final Increment increment)
+ public void preIncrement(final CoprocessorEnvironment e,
+ final Increment increment, final Result result)
throws IOException;
/**
* Called after increment
+ * <p>
+ * Call CoprocessorEnvironment#complete to skip any subsequent chained
+ * coprocessors
* @param e the environment provided by the region server
* @param increment increment object
- * @param writeToWAL whether to write the increment to the WAL
- * @param result the result returned by increment
- * @return the result to return to the client
+ * @param writeToWAL true if the change should be written to the WAL
+ * @param result the result returned by increment, can be modified
* @throws IOException if an error occurred on the coprocessor
*/
- public Result postIncrement(final CoprocessorEnvironment e,
+ public void postIncrement(final CoprocessorEnvironment e,
final Increment increment, final Result result)
throws IOException;
/**
* Called before the client opens a new scanner.
+ * <p>
+ * Call CoprocessorEnvironment#bypass to skip default actions
+ * <p>
+ * Call CoprocessorEnvironment#complete to skip any subsequent chained
+ * coprocessors
* @param e the environment provided by the region server
* @param scan the Scan specification
- * @return the possibly transformed Scan to actually use
+ * @param s if not null, the base scanner
+ * @return an InternalScanner instance to use instead of the base scanner if
+ * overriding default behavior, null otherwise
* @throws IOException if an error occurred on the coprocessor
*/
- public Scan preScannerOpen(final CoprocessorEnvironment e, final Scan scan)
+ public InternalScanner preScannerOpen(final CoprocessorEnvironment e,
+ final Scan scan, final InternalScanner s)
throws IOException;
/**
* Called after the client opens a new scanner.
+ * <p>
+ * Call CoprocessorEnvironment#complete to skip any subsequent chained
+ * coprocessors
* @param e the environment provided by the region server
* @param scan the Scan specification
- * @param scannerId the scanner id allocated by the region server
+ * @param s if not null, the base scanner
+ * @return the scanner instance to use
* @throws IOException if an error occurred on the coprocessor
*/
- public void postScannerOpen(final CoprocessorEnvironment e, final Scan scan,
- final long scannerId)
+ public InternalScanner postScannerOpen(final CoprocessorEnvironment e,
+ final Scan scan, final InternalScanner s)
throws IOException;
/**
* Called before the client asks for the next row on a scanner.
- * @param e the environment provided by the region server
- * @param scannerId the scanner id
- * @param results the result set returned by the region server
- * @return the possibly transformed result set to actually return
- * @throws IOException if an error occurred on the coprocessor
- */
- public void preScannerNext(final CoprocessorEnvironment e,
- final long scannerId)
+ * <p>
+ * Call CoprocessorEnvironment#bypass to skip default actions
+ * <p>
+ * Call CoprocessorEnvironment#complete to skip any subsequent chained
+ * coprocessors
+ * @param e the environment provided by the region server
+ * @param s the scanner
+ * @param result The result to return to the client if default processing
+ * is bypassed. Can be modified. Will not be returned if default processing
+ * is not bypassed.
+ * @param limit the maximum number of results to return
+ * @param hasNext the 'has more' indication
+ * @return 'has more' indication that should be sent to client
+ * @throws IOException if an error occurred on the coprocessor
+ */
+ public boolean preScannerNext(final CoprocessorEnvironment e,
+ final InternalScanner s, final List<KeyValue> result,
+ final int limit, final boolean hasNext)
throws IOException;
/**
* Called after the client asks for the next row on a scanner.
- * @param e the environment provided by the region server
- * @param scannerId the scanner id
- * @param results the result set returned by the region server
- * @return the possibly transformed result set to actually return
+ * <p>
+ * Call CoprocessorEnvironment#complete to skip any subsequent chained
+ * coprocessors
+ * @param e the environment provided by the region server
+ * @param s the scanner
+ * @param result the result to return to the client, can be modified
+ * @param limit the maximum number of results to return
+ * @param hasNext the 'has more' indication
+ * @return 'has more' indication that should be sent to client
* @throws IOException if an error occurred on the coprocessor
*/
- public List<KeyValue> postScannerNext(final CoprocessorEnvironment e,
- final long scannerId, final List<KeyValue> results)
+ public boolean postScannerNext(final CoprocessorEnvironment e,
+ final InternalScanner s, final List<KeyValue> result, final int limit,
+ final boolean hasNext)
throws IOException;
/**
* Called before the client closes a scanner.
+ * <p>
+ * Call CoprocessorEnvironment#bypass to skip default actions
+ * <p>
+ * Call CoprocessorEnvironment#complete to skip any subsequent chained
+ * coprocessors
* @param e the environment provided by the region server
- * @param scannerId the scanner id
+ * @param s the scanner
* @throws IOException if an error occurred on the coprocessor
*/
public void preScannerClose(final CoprocessorEnvironment e,
- final long scannerId)
+ final InternalScanner s)
throws IOException;
/**
* Called after the client closes a scanner.
+ * <p>
+ * Call CoprocessorEnvironment#complete to skip any subsequent chained
+ * coprocessors
* @param e the environment provided by the region server
- * @param scannerId the scanner id
+ * @param s the scanner
* @throws IOException if an error occurred on the coprocessor
*/
public void postScannerClose(final CoprocessorEnvironment e,
- final long scannerId)
+ final InternalScanner s)
throws IOException;
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CoprocessorHost.java?rev=1049471&r1=1049470&r2=1049471&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CoprocessorHost.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CoprocessorHost.java Wed Dec 15 10:11:59 2010
@@ -47,6 +47,7 @@ import java.net.URL;
import java.net.URLClassLoader;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -284,6 +285,14 @@ public class CoprocessorHost {
}
}
+ boolean shouldBypass() {
+ return bypass.getAndSet(false);
+ }
+
+ boolean shouldComplete() {
+ return complete.getAndSet(false);
+ }
+
/** @return the coprocessor environment version */
@Override
public int getVersion() {
@@ -319,30 +328,14 @@ public class CoprocessorHost {
return new HTableWrapper(tableName);
}
- /**
- * @param key the key
- * @return the value, or null if it does not exist
- */
- @Override
- public Object get(Object key) {
- return vars.get(key);
- }
-
- /**
- * @param key the key
- * @param value the value
- */
@Override
- public void put(Object key, Object value) {
- vars.put(key, value);
+ public void complete() {
+ complete.set(true);
}
- /**
- * @param key the key
- */
@Override
- public Object remove(Object key) {
- return vars.remove(key);
+ public void bypass() {
+ bypass.set(true);
}
}
@@ -355,8 +348,10 @@ public class CoprocessorHost {
HRegion region;
/** Ordered set of loaded coprocessors with lock */
final ReentrantReadWriteLock coprocessorLock = new ReentrantReadWriteLock();
- Set<Environment> coprocessors =
+ final Set<Environment> coprocessors =
new TreeSet<Environment>(new EnvironmentPriorityComparator());
+ final AtomicBoolean bypass = new AtomicBoolean(false);
+ final AtomicBoolean complete = new AtomicBoolean(false);
/**
* Constructor
@@ -482,6 +477,7 @@ public class CoprocessorHost {
* @param priority priority
* @throws IOException Exception
*/
+ @SuppressWarnings({ "unchecked", "rawtypes" })
public void load(Class<?> implClass, Coprocessor.Priority priority)
throws IOException {
// create the instance
@@ -581,6 +577,9 @@ public class CoprocessorHost {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
env.impl.preOpen(env);
+ if (env.shouldComplete()) {
+ break;
+ }
}
} finally {
coprocessorLock.readLock().unlock();
@@ -595,6 +594,9 @@ public class CoprocessorHost {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
env.impl.postOpen(env);
+ if (env.shouldComplete()) {
+ break;
+ }
}
} finally {
coprocessorLock.readLock().unlock();
@@ -610,7 +612,6 @@ public class CoprocessorHost {
coprocessorLock.writeLock().lock();
for (Environment env: coprocessors) {
env.impl.preClose(env, abortRequested);
- env.shutdown();
}
} finally {
coprocessorLock.writeLock().unlock();
@@ -642,6 +643,9 @@ public class CoprocessorHost {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
env.impl.preCompact(env, willSplit);
+ if (env.shouldComplete()) {
+ break;
+ }
}
} finally {
coprocessorLock.readLock().unlock();
@@ -657,6 +661,9 @@ public class CoprocessorHost {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
env.impl.postCompact(env, willSplit);
+ if (env.shouldComplete()) {
+ break;
+ }
}
} finally {
coprocessorLock.readLock().unlock();
@@ -671,6 +678,9 @@ public class CoprocessorHost {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
env.impl.preFlush(env);
+ if (env.shouldComplete()) {
+ break;
+ }
}
} finally {
coprocessorLock.readLock().unlock();
@@ -685,6 +695,9 @@ public class CoprocessorHost {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
env.impl.postFlush(env);
+ if (env.shouldComplete()) {
+ break;
+ }
}
} finally {
coprocessorLock.readLock().unlock();
@@ -699,6 +712,9 @@ public class CoprocessorHost {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
env.impl.preSplit(env);
+ if (env.shouldComplete()) {
+ break;
+ }
}
} finally {
coprocessorLock.readLock().unlock();
@@ -715,6 +731,9 @@ public class CoprocessorHost {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
env.impl.postSplit(env, l, r);
+ if (env.shouldComplete()) {
+ break;
+ }
}
} finally {
coprocessorLock.readLock().unlock();
@@ -727,17 +746,25 @@ public class CoprocessorHost {
* @param row the row key
* @param family the family
* @param result the result set from the region
+ * @return true if default processing should be bypassed
* @exception IOException Exception
*/
- public void preGetClosestRowBefore(final byte[] row, final byte[] family)
- throws IOException {
+ public boolean preGetClosestRowBefore(final byte[] row, final byte[] family,
+ final Result result) throws IOException {
try {
+ boolean bypass = false;
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
- ((RegionObserver)env.impl).preGetClosestRowBefore(env, row, family);
+ ((RegionObserver)env.impl).preGetClosestRowBefore(env, row, family,
+ result);
+ bypass |= env.shouldBypass();
+ if (env.shouldComplete()) {
+ break;
+ }
}
}
+ return bypass;
} finally {
coprocessorLock.readLock().unlock();
}
@@ -747,20 +774,21 @@ public class CoprocessorHost {
* @param row the row key
* @param family the family
* @param result the result set from the region
- * @return the result set to return to the client
* @exception IOException Exception
*/
- public Result postGetClosestRowBefore(final byte[] row, final byte[] family,
- Result result) throws IOException {
+ public void postGetClosestRowBefore(final byte[] row, final byte[] family,
+ final Result result) throws IOException {
try {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
- result = ((RegionObserver)env.impl)
- .postGetClosestRowBefore(env, row, family, result);
+ ((RegionObserver)env.impl).postGetClosestRowBefore(env, row, family,
+ result);
+ if (env.shouldComplete()) {
+ break;
+ }
}
}
- return result;
} finally {
coprocessorLock.readLock().unlock();
}
@@ -768,18 +796,24 @@ public class CoprocessorHost {
/**
* @param get the Get request
- * @return the possibly transformed Get object by coprocessor
+ * @return true if default processing should be bypassed
* @exception IOException Exception
*/
- public Get preGet(Get get) throws IOException {
+ public boolean preGet(final Get get, final List<KeyValue> results)
+ throws IOException {
try {
+ boolean bypass = false;
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
- get = ((RegionObserver)env.impl).preGet(env, get);
+ ((RegionObserver)env.impl).preGet(env, get, results);
+ bypass |= env.shouldBypass();
+ if (env.shouldComplete()) {
+ break;
+ }
}
}
- return get;
+ return bypass;
} finally {
coprocessorLock.readLock().unlock();
}
@@ -791,16 +825,18 @@ public class CoprocessorHost {
* @return the possibly transformed result set to use
* @exception IOException Exception
*/
- public List<KeyValue> postGet(final Get get, List<KeyValue> results)
- throws IOException {
+ public void postGet(final Get get, final List<KeyValue> results)
+ throws IOException {
try {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
- results = ((RegionObserver)env.impl).postGet(env, get, results);
+ ((RegionObserver)env.impl).postGet(env, get, results);
+ if (env.shouldComplete()) {
+ break;
+ }
}
}
- return results;
} finally {
coprocessorLock.readLock().unlock();
}
@@ -808,18 +844,25 @@ public class CoprocessorHost {
/**
* @param get the Get request
- * @param exists the result returned by the region server
+ * @return true or false to return to client if bypassing normal operation,
+ * or null otherwise
* @exception IOException Exception
*/
- public Get preExists(Get get) throws IOException {
+ public Boolean preExists(final Get get) throws IOException {
try {
+ boolean bypass = false;
+ boolean exists = false;
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
- get = ((RegionObserver)env.impl).preExists(env, get);
+ exists = ((RegionObserver)env.impl).preExists(env, get, exists);
+ bypass |= env.shouldBypass();
+ if (env.shouldComplete()) {
+ break;
+ }
}
}
- return get;
+ return bypass ? exists : null;
} finally {
coprocessorLock.readLock().unlock();
}
@@ -837,7 +880,10 @@ public class CoprocessorHost {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
- exists &= ((RegionObserver)env.impl).postExists(env, get, exists);
+ exists = ((RegionObserver)env.impl).postExists(env, get, exists);
+ if (env.shouldComplete()) {
+ break;
+ }
}
}
return exists;
@@ -848,19 +894,25 @@ public class CoprocessorHost {
/**
* @param familyMap map of family to edits for the given family.
- * @return the possibly transformed map to actually use
+ * @param writeToWAL true if the change should be written to the WAL
+ * @return true if default processing should be bypassed
* @exception IOException Exception
*/
- public Map<byte[], List<KeyValue>> prePut(Map<byte[], List<KeyValue>> familyMap)
- throws IOException {
+ public boolean prePut(final Map<byte[], List<KeyValue>> familyMap,
+ final boolean writeToWAL) throws IOException {
try {
+ boolean bypass = false;
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
- familyMap = ((RegionObserver)env.impl).prePut(env, familyMap);
+ ((RegionObserver)env.impl).prePut(env, familyMap, writeToWAL);
+ bypass |= env.shouldBypass();
+ if (env.shouldComplete()) {
+ break;
+ }
}
}
- return familyMap;
+ return bypass;
} finally {
coprocessorLock.readLock().unlock();
}
@@ -868,15 +920,19 @@ public class CoprocessorHost {
/**
* @param familyMap map of family to edits for the given family.
+ * @param writeToWAL true if the change should be written to the WAL
* @exception IOException Exception
*/
- public void postPut(Map<byte[], List<KeyValue>> familyMap)
- throws IOException {
+ public void postPut(final Map<byte[], List<KeyValue>> familyMap,
+ final boolean writeToWAL) throws IOException {
try {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
- ((RegionObserver)env.impl).postPut(env, familyMap);
+ ((RegionObserver)env.impl).postPut(env, familyMap, writeToWAL);
+ if (env.shouldComplete()) {
+ break;
+ }
}
}
} finally {
@@ -886,19 +942,25 @@ public class CoprocessorHost {
/**
* @param familyMap map of family to edits for the given family.
- * @return the possibly transformed map to actually use
+ * @param writeToWAL true if the change should be written to the WAL
+ * @return true if default processing should be bypassed
* @exception IOException Exception
*/
- public Map<byte[], List<KeyValue>> preDelete(Map<byte[], List<KeyValue>> familyMap)
- throws IOException {
+ public boolean preDelete(final Map<byte[], List<KeyValue>> familyMap,
+ final boolean writeToWAL) throws IOException {
try {
+ boolean bypass = false;
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
- familyMap = ((RegionObserver)env.impl).preDelete(env, familyMap);
+ ((RegionObserver)env.impl).preDelete(env, familyMap, writeToWAL);
+ bypass |= env.shouldBypass();
+ if (env.shouldComplete()) {
+ break;
+ }
}
}
- return familyMap;
+ return bypass;
} finally {
coprocessorLock.readLock().unlock();
}
@@ -906,15 +968,19 @@ public class CoprocessorHost {
/**
* @param familyMap map of family to edits for the given family.
+ * @param writeToWAL true if the change should be written to the WAL
* @exception IOException Exception
*/
- public void postDelete(Map<byte[], List<KeyValue>> familyMap)
- throws IOException {
+ public void postDelete(final Map<byte[], List<KeyValue>> familyMap,
+ final boolean writeToWAL) throws IOException {
try {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
- ((RegionObserver)env.impl).postDelete(env, familyMap);
+ ((RegionObserver)env.impl).postDelete(env, familyMap, writeToWAL);
+ if (env.shouldComplete()) {
+ break;
+ }
}
}
} finally {
@@ -928,21 +994,29 @@ public class CoprocessorHost {
* @param qualifier column qualifier
* @param value the expected value
* @param put data to put if check succeeds
+ * @return true or false to return to client if default processing should
+ * be bypassed, or null otherwise
* @throws IOException e
*/
- public Put preCheckAndPut(final byte [] row, final byte [] family,
+ public Boolean preCheckAndPut(final byte [] row, final byte [] family,
final byte [] qualifier, final byte [] value, Put put)
throws IOException
{
try {
+ boolean bypass = false;
+ boolean result = false;
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
- put = ((RegionObserver)env.impl).preCheckAndPut(env, row, family,
- qualifier, value, put);
+ result = ((RegionObserver)env.impl).preCheckAndPut(env, row, family,
+ qualifier, value, put, result);
+ bypass |= env.shouldBypass();
+ if (env.shouldComplete()) {
+ break;
+ }
}
}
- return put;
+ return bypass ? result : null;
} finally {
coprocessorLock.readLock().unlock();
}
@@ -967,6 +1041,9 @@ public class CoprocessorHost {
if (env.impl instanceof RegionObserver) {
result = ((RegionObserver)env.impl).postCheckAndPut(env, row,
family, qualifier, value, put, result);
+ if (env.shouldComplete()) {
+ break;
+ }
}
}
return result;
@@ -981,21 +1058,29 @@ public class CoprocessorHost {
* @param qualifier column qualifier
* @param value the expected value
* @param delete delete to commit if check succeeds
+ * @return true or false to return to client if default processing should
+ * be bypassed, or null otherwise
* @throws IOException e
*/
- public Delete preCheckAndDelete(final byte [] row, final byte [] family,
+ public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
final byte [] qualifier, final byte [] value, Delete delete)
throws IOException
{
try {
+ boolean bypass = false;
+ boolean result = false;
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
- delete = ((RegionObserver)env.impl).preCheckAndDelete(env, row,
- family, qualifier, value, delete);
+ result = ((RegionObserver)env.impl).preCheckAndDelete(env, row,
+ family, qualifier, value, delete, result);
+ bypass |= env.shouldBypass();
+ if (env.shouldComplete()) {
+ break;
+ }
}
}
- return delete;
+ return bypass ? result : null;
} finally {
coprocessorLock.readLock().unlock();
}
@@ -1020,6 +1105,9 @@ public class CoprocessorHost {
if (env.impl instanceof RegionObserver) {
result = ((RegionObserver)env.impl).postCheckAndDelete(env, row,
family, qualifier, value, delete, result);
+ if (env.shouldComplete()) {
+ break;
+ }
}
}
return result;
@@ -1033,25 +1121,31 @@ public class CoprocessorHost {
* @param family column family
* @param qualifier column qualifier
* @param amount long amount to increment
- * @param writeToWAL whether to write the increment to the WAL
- * @return new amount to increment
+ * @param writeToWAL true if the change should be written to the WAL
+ * @return return value for client if default operation should be bypassed,
+ * or null otherwise
* @throws IOException if an error occurred on the coprocessor
*/
- public long preIncrementColumnValue(final byte [] row, final byte [] family,
+ public Long preIncrementColumnValue(final byte [] row, final byte [] family,
final byte [] qualifier, long amount, final boolean writeToWAL)
throws IOException {
try {
+ boolean bypass = false;
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
amount = ((RegionObserver)env.impl).preIncrementColumnValue(env,
- row, family, qualifier, amount, writeToWAL);
+ row, family, qualifier, amount, writeToWAL);
+ bypass |= env.shouldBypass();
+ if (env.shouldComplete()) {
+ break;
+ }
}
}
+ return bypass ? amount : null;
} finally {
coprocessorLock.readLock().unlock();
}
- return amount;
}
/**
@@ -1059,7 +1153,7 @@ public class CoprocessorHost {
* @param family column family
* @param qualifier column qualifier
* @param amount long amount to increment
- * @param writeToWAL whether to write the increment to the WAL
+ * @param writeToWAL true if the change should be written to the WAL
* @param result the result returned by incrementColumnValue
* @return the result to return to the client
* @throws IOException if an error occurred on the coprocessor
@@ -1072,7 +1166,10 @@ public class CoprocessorHost {
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
result = ((RegionObserver)env.impl).postIncrementColumnValue(env,
- row, family, qualifier, amount, writeToWAL, result);
+ row, family, qualifier, amount, writeToWAL, result);
+ if (env.shouldComplete()) {
+ break;
+ }
}
}
} finally {
@@ -1083,155 +1180,202 @@ public class CoprocessorHost {
/**
* @param increment increment object
- * @param writeToWAL whether to write the increment to the WAL
- * @return new amount to increment
+ * @param writeToWAL true if the change should be written to the WAL
+ * @return result to return to client if default operation should be
+ * bypassed, null otherwise
* @throws IOException if an error occurred on the coprocessor
*/
- public Increment preIncrement(Increment increment)
+ public Result preIncrement(Increment increment)
throws IOException {
try {
+ boolean bypass = false;
+ Result result = new Result();
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
- increment = ((RegionObserver)env.impl).preIncrement(env, increment);
+ ((RegionObserver)env.impl).preIncrement(env, increment, result);
+ bypass |= env.shouldBypass();
+ if (env.shouldComplete()) {
+ break;
+ }
}
}
+ return bypass ? result : null;
} finally {
coprocessorLock.readLock().unlock();
}
- return increment;
}
/**
* @param increment increment object
- * @param writeToWAL whether to write the increment to the WAL
+ * @param writeToWAL true if the change should be written to the WAL
* @param result the result returned by incrementColumnValue
- * @return the result to return to the client
* @throws IOException if an error occurred on the coprocessor
*/
- public Result postIncrement(final Increment increment, Result result)
+ public void postIncrement(final Increment increment, Result result)
throws IOException {
try {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
- result = ((RegionObserver)env.impl).postIncrement(env, increment,
- result);
+ ((RegionObserver)env.impl).postIncrement(env, increment, result);
+ if (env.shouldComplete()) {
+ break;
+ }
}
}
} finally {
coprocessorLock.readLock().unlock();
}
- return result;
}
/**
* @param scan the Scan specification
+ * @return scanner id to return to client if default operation should be
+ * bypassed, false otherwise
* @exception IOException Exception
*/
- public Scan preScannerOpen(Scan scan) throws IOException {
+ public InternalScanner preScannerOpen(Scan scan) throws IOException {
try {
+ boolean bypass = false;
+ InternalScanner s = null;
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
- scan = ((RegionObserver)env.impl).preScannerOpen(env, scan);
+ s = ((RegionObserver)env.impl).preScannerOpen(env, scan, s);
+ bypass |= env.shouldBypass();
+ if (env.shouldComplete()) {
+ break;
+ }
}
}
+ return bypass ? s : null;
} finally {
coprocessorLock.readLock().unlock();
}
- return scan;
}
/**
* @param scan the Scan specification
* @param scannerId the scanner id allocated by the region server
+ * @return the scanner instance to use
* @exception IOException Exception
*/
- public void postScannerOpen(final Scan scan, long scannerId)
+ public InternalScanner postScannerOpen(final Scan scan, InternalScanner s)
throws IOException {
try {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
- ((RegionObserver)env.impl).postScannerOpen(env, scan, scannerId);
+ s = ((RegionObserver)env.impl).postScannerOpen(env, scan, s);
+ if (env.shouldComplete()) {
+ break;
+ }
}
}
+ return s;
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
- * @param scannerId the scanner id
+ * @param s the scanner
* @param results the result set returned by the region server
- * @return the possibly transformed result set to actually return
+ * @param limit the maximum number of results to return
+ * @return 'has next' indication to client if bypassing default behavior, or
+ * null otherwise
* @exception IOException Exception
*/
- public void preScannerNext(final long scannerId) throws IOException {
+ public Boolean preScannerNext(final InternalScanner s,
+ final List<KeyValue> results, int limit) throws IOException {
try {
+ boolean bypass = false;
+ boolean hasNext = false;
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
- ((RegionObserver)env.impl).preScannerNext(env, scannerId);
+ hasNext = ((RegionObserver)env.impl).preScannerNext(env, s, results,
+ limit, hasNext);
+ bypass |= env.shouldBypass();
+ if (env.shouldComplete()) {
+ break;
+ }
}
}
+ return bypass ? hasNext : null;
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
- * @param scannerId the scanner id
+ * @param s the scanner
* @param results the result set returned by the region server
- * @return the possibly transformed result set to actually return
+ * @param limit the maximum number of results to return
+ * @param hasMore
+ * @return 'has more' indication to give to client
* @exception IOException Exception
*/
- public List<KeyValue> postScannerNext(final long scannerId,
- List<KeyValue> results) throws IOException {
+ public boolean postScannerNext(final InternalScanner s,
+ final List<KeyValue> results, final int limit, boolean hasMore)
+ throws IOException {
try {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
- results = ((RegionObserver)env.impl).postScannerNext(env, scannerId,
- results);
+ hasMore = ((RegionObserver)env.impl).postScannerNext(env, s,
+ results, limit, hasMore);
+ if (env.shouldComplete()) {
+ break;
+ }
}
}
- return results;
+ return hasMore;
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
- * @param scannerId the scanner id
+ * @param s the scanner
+ * @return true if default behavior should be bypassed, false otherwise
* @exception IOException Exception
*/
- public void preScannerClose(final long scannerId)
+ public boolean preScannerClose(final InternalScanner s)
throws IOException {
try {
+ boolean bypass = false;
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
- ((RegionObserver)env.impl).preScannerClose(env, scannerId);
+ ((RegionObserver)env.impl).preScannerClose(env, s);
+ bypass |= env.shouldBypass();
+ if (env.shouldComplete()) {
+ break;
+ }
}
}
+ return bypass;
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
- * @param scannerId the scanner id
+ * @param s the scanner
* @exception IOException Exception
*/
- public void postScannerClose(final long scannerId)
+ public void postScannerClose(final InternalScanner s)
throws IOException {
try {
coprocessorLock.readLock().lock();
for (Environment env: coprocessors) {
if (env.impl instanceof RegionObserver) {
- ((RegionObserver)env.impl).postScannerClose(env, scannerId);
+ ((RegionObserver)env.impl).postScannerClose(env, s);
+ if (env.shouldComplete()) {
+ break;
+ }
}
}
} finally {
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1049471&r1=1049470&r2=1049471&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Dec 15 10:11:59 2010
@@ -1112,27 +1112,29 @@ public class HRegion implements HeapSize
*/
public Result getClosestRowBefore(final byte [] row, final byte [] family)
throws IOException {
- Result result = null;
+ if (coprocessorHost != null) {
+ Result result = new Result();
+ if (coprocessorHost.preGetClosestRowBefore(row, family, result)) {
+ return result;
+ }
+ }
// look across all the HStores for this region and determine what the
// closest key is across all column families, since the data may be sparse
- KeyValue key = null;
checkRow(row);
startRegionOperation();
- if (coprocessorHost != null) {
- coprocessorHost.preGetClosestRowBefore(row, family);
- }
try {
Store store = getStore(family);
KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
// get the closest key. (HStore.getRowKeyAtOrBefore can return null)
- key = store.getRowKeyAtOrBefore(kv);
+ KeyValue key = store.getRowKeyAtOrBefore(kv);
+ Result result = null;
if (key != null) {
Get get = new Get(key.getRow());
get.addFamily(family);
result = get(get, null);
}
if (coprocessorHost != null) {
- result = coprocessorHost.postGetClosestRowBefore(row, family, result);
+ coprocessorHost.postGetClosestRowBefore(row, family, result);
}
return result;
} finally {
@@ -1150,8 +1152,7 @@ public class HRegion implements HeapSize
* @return InternalScanner
* @throws IOException read exceptions
*/
- public InternalScanner getScanner(Scan scan)
- throws IOException {
+ public InternalScanner getScanner(Scan scan) throws IOException {
return getScanner(scan, null);
}
@@ -1175,13 +1176,17 @@ public class HRegion implements HeapSize
}
}
- protected InternalScanner instantiateInternalScanner(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
+ protected InternalScanner instantiateInternalScanner(Scan scan,
+ List<KeyValueScanner> additionalScanners) throws IOException {
+ InternalScanner s = null;
if (coprocessorHost != null) {
- coprocessorHost.preScannerOpen(scan);
+ s = coprocessorHost.preScannerOpen(scan);
+ }
+ if (s == null) {
+ s = new RegionScanner(scan, additionalScanners);
}
- InternalScanner s = new RegionScanner(scan, additionalScanners);
if (coprocessorHost != null) {
- coprocessorHost.postScannerOpen(scan, s.hashCode());
+ s = coprocessorHost.postScannerOpen(scan, s);
}
return s;
}
@@ -1243,17 +1248,20 @@ public class HRegion implements HeapSize
* @throws IOException
*/
public void delete(Map<byte[], List<KeyValue>> familyMap, boolean writeToWAL)
- throws IOException {
+ throws IOException {
+ /* Run coprocessor pre hook outside of locks to avoid deadlock */
+ if (coprocessorHost != null) {
+ if (coprocessorHost.preDelete(familyMap, writeToWAL)) {
+ return;
+ }
+ }
+
long now = EnvironmentEdgeManager.currentTimeMillis();
byte [] byteNow = Bytes.toBytes(now);
boolean flush = false;
updatesLock.readLock().lock();
try {
- if (coprocessorHost != null) {
- familyMap = coprocessorHost.preDelete(familyMap);
- }
-
for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
byte[] family = e.getKey();
@@ -1318,7 +1326,7 @@ public class HRegion implements HeapSize
flush = isFlushSize(memstoreSize.addAndGet(addedSize));
if (coprocessorHost != null) {
- coprocessorHost.postDelete(familyMap);
+ coprocessorHost.postDelete(familyMap, writeToWAL);
}
} finally {
this.updatesLock.readLock().unlock();
@@ -1456,15 +1464,36 @@ public class HRegion implements HeapSize
return batchOp.retCodes;
}
+ @SuppressWarnings("unchecked")
private long doMiniBatchPut(BatchOperationInProgress<Pair<Put, Integer>> batchOp) throws IOException {
+ /* Run coprocessor pre hook outside of locks to avoid deadlock */
+ if (coprocessorHost != null) {
+ List<Pair<Put, Integer>> ops =
+ new ArrayList<Pair<Put, Integer>>(batchOp.operations.length);
+ for (int i = 0; i < batchOp.operations.length; i++) {
+ Pair<Put, Integer> nextPair = batchOp.operations[i];
+ Put put = nextPair.getFirst();
+ Map<byte[], List<KeyValue>> familyMap = put.getFamilyMap();
+ if (coprocessorHost.prePut(familyMap, put.getWriteToWAL())) {
+ // pre hook says skip this Put
+ // adjust nextIndexToProcess if we skipped before it
+ if (batchOp.nextIndexToProcess > i) {
+ batchOp.nextIndexToProcess--;
+ }
+ continue;
+ }
+ ops.add(nextPair);
+ }
+ batchOp.operations = ops.toArray(new Pair[ops.size()]);
+ }
+
long now = EnvironmentEdgeManager.currentTimeMillis();
byte[] byteNow = Bytes.toBytes(now);
/** Keep track of the locks we hold so we can release them in finally clause */
List<Integer> acquiredLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
// reference family maps directly so coprocessors can mutate them if desired
- Map<byte[],List<KeyValue>>[] familyMaps =
- new Map[batchOp.operations.length];
+ Map<byte[],List<KeyValue>>[] familyMaps = new Map[batchOp.operations.length];
// We try to set up a batch in the range [firstIndex,lastIndexExclusive)
int firstIndex = batchOp.nextIndexToProcess;
int lastIndexExclusive = firstIndex;
@@ -1481,12 +1510,6 @@ public class HRegion implements HeapSize
Integer providedLockId = nextPair.getSecond();
Map<byte[], List<KeyValue>> familyMap = put.getFamilyMap();
- // Check any loaded coprocessors
- /* TODO: we should catch any throws coprocessor exceptions here to allow the
- rest of the batch to continue. This means fixing HBASE-2898 */
- if (coprocessorHost != null) {
- familyMap = coprocessorHost.prePut(familyMap);
- }
// store the family map reference to allow for mutations
familyMaps[lastIndexExclusive] = familyMap;
@@ -1555,15 +1578,22 @@ public class HRegion implements HeapSize
long addedSize = 0;
for (int i = firstIndex; i < lastIndexExclusive; i++) {
if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue;
-
addedSize += applyFamilyMapToMemstore(familyMaps[i]);
batchOp.retCodes[i] = OperationStatusCode.SUCCESS;
+ }
- // execute any coprocessor post-hooks
- if (coprocessorHost != null) {
- coprocessorHost.postDelete(familyMaps[i]);
+ // ------------------------------------
+ // STEP 5. Run coprocessor post hooks
+ // ------------------------------------
+ if (coprocessorHost != null) {
+ for (int i = firstIndex; i < lastIndexExclusive; i++) {
+ // only for successful puts
+ if (batchOp.retCodes[i] != OperationStatusCode.SUCCESS) continue;
+ Put p = batchOp.operations[i].getFirst();
+ coprocessorHost.postPut(familyMaps[i], p.getWriteToWAL());
}
}
+
success = true;
return addedSize;
} finally {
@@ -1738,8 +1768,14 @@ public class HRegion implements HeapSize
* @param writeToWAL if true, then we should write to the log
* @throws IOException
*/
- private void put(Map<byte [], List<KeyValue>> familyMap,
- boolean writeToWAL) throws IOException {
+ private void put(Map<byte [], List<KeyValue>> familyMap, boolean writeToWAL)
+ throws IOException {
+ /* run pre put hook outside of lock to avoid deadlock */
+ if (coprocessorHost != null) {
+ if (coprocessorHost.prePut(familyMap, writeToWAL)) {
+ return;
+ }
+ }
long now = EnvironmentEdgeManager.currentTimeMillis();
byte[] byteNow = Bytes.toBytes(now);
@@ -1747,9 +1783,6 @@ public class HRegion implements HeapSize
this.updatesLock.readLock().lock();
try {
- if (coprocessorHost != null) {
- familyMap = coprocessorHost.prePut(familyMap);
- }
checkFamilies(familyMap.keySet());
updateKVTimestamps(familyMap.values(), byteNow);
// write/sync to WAL should happen before we touch memstore.
@@ -1766,13 +1799,14 @@ public class HRegion implements HeapSize
long addedSize = applyFamilyMapToMemstore(familyMap);
flush = isFlushSize(memstoreSize.addAndGet(addedSize));
-
- if (coprocessorHost != null) {
- coprocessorHost.postPut(familyMap);
- }
} finally {
this.updatesLock.readLock().unlock();
}
+
+ if (coprocessorHost != null) {
+ coprocessorHost.postPut(familyMap, writeToWAL);
+ }
+
if (flush) {
// Request a cache flush. Do it outside update lock.
requestFlush();
@@ -2350,6 +2384,14 @@ public class HRegion implements HeapSize
public synchronized boolean next(List<KeyValue> outResults, int limit)
throws IOException {
+ if (coprocessorHost != null) {
+ Boolean result = coprocessorHost.preScannerNext((InternalScanner)this,
+ outResults, limit);
+ if (result != null) {
+ return result.booleanValue();
+ }
+ }
+
if (this.filterClosed) {
throw new UnknownScannerException("Scanner was closed (timed out?) " +
"after we renewed it. Could be caused by a very slow scanner " +
@@ -2363,14 +2405,11 @@ public class HRegion implements HeapSize
results.clear();
- if (coprocessorHost != null) {
- coprocessorHost.preScannerNext(hashCode());
- }
-
boolean returnResult = nextInternal(limit);
if (coprocessorHost != null) {
- results = coprocessorHost.postScannerNext(hashCode(), results);
+ returnResult = coprocessorHost.postScannerNext((InternalScanner)this,
+ results, limit, returnResult);
}
outResults.addAll(results);
@@ -2416,8 +2455,10 @@ public class HRegion implements HeapSize
do {
this.storeHeap.next(results, limit - results.size());
if (limit > 0 && results.size() == limit) {
- if (this.filter != null && filter.hasFilterRow()) throw new IncompatibleFilterException(
+ if (this.filter != null && filter.hasFilterRow()) {
+ throw new IncompatibleFilterException(
"Filter with filterRow(List<KeyValue>) incompatible with scan with limit!");
+ }
return true; // we are expecting more yes, but also limited to how many we can return.
}
} while (Bytes.equals(currentRow, nextRow = peekRow()));
@@ -2480,7 +2521,9 @@ public class HRegion implements HeapSize
public synchronized void close() throws IOException {
if (coprocessorHost != null) {
- coprocessorHost.preScannerClose(hashCode());
+ if (coprocessorHost.preScannerClose((InternalScanner)this)) {
+ return;
+ }
}
if (storeHeap != null) {
storeHeap.close();
@@ -2488,7 +2531,7 @@ public class HRegion implements HeapSize
}
this.filterClosed = true;
if (coprocessorHost != null) {
- coprocessorHost.postScannerClose(hashCode());
+ coprocessorHost.postScannerClose((InternalScanner)this);
}
}
}
@@ -3064,32 +3107,27 @@ public class HRegion implements HeapSize
throws IOException {
Scan scan = new Scan(get);
- List<KeyValue> results = null;
- List<KeyValue> getResults = new ArrayList<KeyValue>();
+ List<KeyValue> results = new ArrayList<KeyValue>();
// pre-get CP hook
if (withCoprocessor && (coprocessorHost != null)) {
- get = coprocessorHost.preGet(get);
+ if (coprocessorHost.preGet(get, results)) {
+ return results;
+ }
}
InternalScanner scanner = null;
try {
scanner = getScanner(scan);
- scanner.next(getResults);
+ scanner.next(results);
} finally {
if (scanner != null)
scanner.close();
}
- // append get results to pre-get results
- if (results != null){
- results.addAll(getResults);
- }
- else {
- results = getResults;
- }
+
// post-get CP hook
if (withCoprocessor && (coprocessorHost != null)) {
- results = coprocessorHost.postGet(get, results);
+ coprocessorHost.postGet(get, results);
}
return results;
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1049471&r1=1049470&r2=1049471&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Dec 15 10:11:59 2010
@@ -1599,7 +1599,10 @@ public class HRegionServer implements HR
try {
HRegion region = getRegion(regionName);
if (region.getCoprocessorHost() != null) {
- region.getCoprocessorHost().preExists(get);
+ Boolean result = region.getCoprocessorHost().preExists(get);
+ if (result != null) {
+ return result.booleanValue();
+ }
}
Result r = region.get(get, getLockFromId(get.getLockId()));
boolean result = r != null && !r.isEmpty();
@@ -1702,8 +1705,11 @@ public class HRegionServer implements HR
}
HRegion region = getRegion(regionName);
if (region.getCoprocessorHost() != null) {
- region.getCoprocessorHost().preCheckAndPut(row, family, qualifier,
- value, put);
+ Boolean result = region.getCoprocessorHost()
+ .preCheckAndPut(row, family, qualifier, value, put);
+ if (result != null) {
+ return result.booleanValue();
+ }
}
boolean result = checkAndMutate(regionName, row, family, qualifier,
value, put, getLockFromId(put.getLockId()));
@@ -1737,8 +1743,11 @@ public class HRegionServer implements HR
}
HRegion region = getRegion(regionName);
if (region.getCoprocessorHost() != null) {
- region.getCoprocessorHost().preCheckAndDelete(row, family, qualifier,
- value, delete);
+ Boolean result = region.getCoprocessorHost().preCheckAndDelete(row,
+ family, qualifier, value, delete);
+ if (result != null) {
+ return result.booleanValue();
+ }
}
boolean result = checkAndMutate(regionName, row, family, qualifier, value,
delete, getLockFromId(delete.getLockId()));
@@ -2462,12 +2471,15 @@ public class HRegionServer implements HR
Increment incVal = increment;
Result resVal;
if (region.getCoprocessorHost() != null) {
- incVal = region.getCoprocessorHost().preIncrement(incVal);
+ resVal = region.getCoprocessorHost().preIncrement(incVal);
+ if (resVal != null) {
+ return resVal;
+ }
}
resVal = region.increment(incVal, getLockFromId(increment.getLockId()),
increment.getWriteToWAL());
if (region.getCoprocessorHost() != null) {
- resVal = region.getCoprocessorHost().postIncrement(incVal, resVal);
+ region.getCoprocessorHost().postIncrement(incVal, resVal);
}
return resVal;
} catch (IOException e) {
@@ -2489,16 +2501,18 @@ public class HRegionServer implements HR
requestCount.incrementAndGet();
try {
HRegion region = getRegion(regionName);
- long amountVal = amount;
if (region.getCoprocessorHost() != null) {
- amountVal = region.getCoprocessorHost().preIncrementColumnValue(row,
- family, qualifier, amountVal, writeToWAL);
+ Long amountVal = region.getCoprocessorHost().preIncrementColumnValue(row,
+ family, qualifier, amount, writeToWAL);
+ if (amountVal != null) {
+ return amountVal.longValue();
+ }
}
long retval = region.incrementColumnValue(row, family, qualifier, amount,
- writeToWAL);
+ writeToWAL);
if (region.getCoprocessorHost() != null) {
retval = region.getCoprocessorHost().postIncrementColumnValue(row,
- family, qualifier, amountVal, writeToWAL, retval);
+ family, qualifier, amount, writeToWAL, retval);
}
return retval;
} catch (IOException e) {
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java?rev=1049471&r1=1049470&r2=1049471&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java Wed Dec 15 10:11:59 2010
@@ -34,11 +34,8 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
-import static org.junit.Assert.*;
-
/**
* A sample region observer that tests the RegionObserver interface.
* It works with TestRegionObserverInterface to provide the test case.
@@ -59,22 +56,26 @@ public class SimpleRegionObserver extend
boolean hadPreIncrement = false;
boolean hadPostIncrement = false;
-
- // Overriden RegionObserver methods
@Override
- public Get preGet(CoprocessorEnvironment e, Get get) {
+ public void preGet(final CoprocessorEnvironment e, final Get get,
+ final List<KeyValue> results) throws IOException {
+ assertNotNull(e);
+ assertNotNull(e.getRegion());
+ assertNotNull(get);
+ assertNotNull(results);
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE)) {
hadPreGet = true;
- assertNotNull(e);
- assertNotNull(e.getRegion());
}
- return get;
}
@Override
- public List<KeyValue> postGet(CoprocessorEnvironment e, Get get,
- List<KeyValue> results) {
+ public void postGet(final CoprocessorEnvironment e, final Get get,
+ final List<KeyValue> results) {
+ assertNotNull(e);
+ assertNotNull(e.getRegion());
+ assertNotNull(get);
+ assertNotNull(results);
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE)) {
boolean foundA = false;
@@ -96,12 +97,14 @@ public class SimpleRegionObserver extend
assertTrue(foundC);
hadPostGet = true;
}
- return results;
}
@Override
- public Map<byte[], List<KeyValue>> prePut(CoprocessorEnvironment e,
- Map<byte[], List<KeyValue>> familyMap) {
+ public void prePut(final CoprocessorEnvironment e, final Map<byte[],
+ List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
+ assertNotNull(e);
+ assertNotNull(e.getRegion());
+ assertNotNull(familyMap);
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE)) {
List<KeyValue> kvs = familyMap.get(TestRegionObserverInterface.A);
@@ -121,12 +124,14 @@ public class SimpleRegionObserver extend
TestRegionObserverInterface.C));
hadPrePut = true;
}
- return familyMap;
}
@Override
- public void postPut(CoprocessorEnvironment e,
- Map<byte[], List<KeyValue>> familyMap) {
+ public void postPut(final CoprocessorEnvironment e, final Map<byte[],
+ List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
+ assertNotNull(e);
+ assertNotNull(e.getRegion());
+ assertNotNull(familyMap);
List<KeyValue> kvs = familyMap.get(TestRegionObserverInterface.A);
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE)) {
@@ -149,18 +154,23 @@ public class SimpleRegionObserver extend
}
@Override
- public Map<byte[], List<KeyValue>> preDelete(CoprocessorEnvironment e,
- Map<byte[], List<KeyValue>> familyMap) {
+ public void preDelete(final CoprocessorEnvironment e, final Map<byte[],
+ List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
+ assertNotNull(e);
+ assertNotNull(e.getRegion());
+ assertNotNull(familyMap);
if (beforeDelete && e.getRegion().getTableDesc().getName().equals(
TestRegionObserverInterface.TEST_TABLE)) {
hadPreDeleted = true;
}
- return familyMap;
}
@Override
- public void postDelete(CoprocessorEnvironment e,
- Map<byte[], List<KeyValue>> familyMap) {
+ public void postDelete(final CoprocessorEnvironment e, final Map<byte[],
+ List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
+ assertNotNull(e);
+ assertNotNull(e.getRegion());
+ assertNotNull(familyMap);
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE)) {
beforeDelete = false;
@@ -170,7 +180,12 @@ public class SimpleRegionObserver extend
@Override
public void preGetClosestRowBefore(final CoprocessorEnvironment e,
- final byte[] row, final byte[] family) {
+ final byte[] row, final byte[] family, final Result result)
+ throws IOException {
+ assertNotNull(e);
+ assertNotNull(e.getRegion());
+ assertNotNull(row);
+ assertNotNull(result);
if (beforeDelete && e.getRegion().getTableDesc().getName().equals(
TestRegionObserverInterface.TEST_TABLE)) {
hadPreGetClosestRowBefore = true;
@@ -178,70 +193,35 @@ public class SimpleRegionObserver extend
}
@Override
- public Result postGetClosestRowBefore(final CoprocessorEnvironment e,
- final byte[] row, final byte[] family, Result result) {
+ public void postGetClosestRowBefore(final CoprocessorEnvironment e,
+ final byte[] row, final byte[] family, final Result result)
+ throws IOException {
+ assertNotNull(e);
+ assertNotNull(e.getRegion());
+ assertNotNull(row);
+ assertNotNull(result);
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE)) {
hadPostGetClosestRowBefore = true;
}
- return result;
- }
-
- @Override
- public Scan preScannerOpen(CoprocessorEnvironment e, Scan scan) {
- // not tested -- need to go through the RS to get here
- return scan;
- }
-
- @Override
- public void postScannerOpen(CoprocessorEnvironment e, Scan scan,
- long scannerId) {
- // not tested -- need to go through the RS to get here
}
@Override
- public void preScannerNext(final CoprocessorEnvironment e,
- final long scannerId) {
- // not tested -- need to go through the RS to get here
- }
-
- @Override
- public List<KeyValue> postScannerNext(final CoprocessorEnvironment e,
- final long scannerId, List<KeyValue> results) {
- // not tested -- need to go through the RS to get here
- return results;
- }
-
- @Override
- public void preScannerClose(final CoprocessorEnvironment e,
- final long scannerId) {
- // not tested -- need to go through the RS to get here
- }
-
- @Override
- public void postScannerClose(final CoprocessorEnvironment e,
- final long scannerId) {
- // not tested -- need to go through the RS to get here
- }
-
- @Override
- public Increment preIncrement(CoprocessorEnvironment e, Increment increment)
- throws IOException {
+ public void preIncrement(final CoprocessorEnvironment e,
+ final Increment increment, final Result result) throws IOException {
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE_2)) {
hadPreIncrement = true;
}
- return increment;
}
@Override
- public Result postIncrement(CoprocessorEnvironment e, Increment increment,
- Result result) throws IOException {
+ public void postIncrement(final CoprocessorEnvironment e,
+ final Increment increment, final Result result) throws IOException {
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE_2)) {
hadPostIncrement = true;
}
- return result;
}
boolean hadPreGet() {
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java?rev=1049471&r1=1049470&r2=1049471&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java Wed Dec 15 10:11:59 2010
@@ -46,8 +46,9 @@ public class TestRegionObserverStacking
public static class ObserverA extends BaseRegionObserverCoprocessor {
long id;
@Override
- public void postPut(final CoprocessorEnvironment e,
- Map<byte[], List<KeyValue>> familyMap) {
+ public void postPut(final CoprocessorEnvironment e, final Map<byte[],
+ List<KeyValue>> familyMap, final boolean writeToWAL)
+ throws IOException {
id = System.currentTimeMillis();
try {
Thread.sleep(10);
@@ -59,8 +60,9 @@ public class TestRegionObserverStacking
public static class ObserverB extends BaseRegionObserverCoprocessor {
long id;
@Override
- public void postPut(final CoprocessorEnvironment e,
- Map<byte[], List<KeyValue>> familyMap) {
+ public void postPut(final CoprocessorEnvironment e, final Map<byte[],
+ List<KeyValue>> familyMap, final boolean writeToWAL)
+ throws IOException {
id = System.currentTimeMillis();
try {
Thread.sleep(10);
@@ -73,8 +75,9 @@ public class TestRegionObserverStacking
long id;
@Override
- public void postPut(final CoprocessorEnvironment e,
- Map<byte[], List<KeyValue>> familyMap) {
+ public void postPut(final CoprocessorEnvironment e, final Map<byte[],
+ List<KeyValue>> familyMap, final boolean writeToWAL)
+ throws IOException {
id = System.currentTimeMillis();
try {
Thread.sleep(10);