You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2014/06/03 14:25:35 UTC
git commit: HBASE-11126-Add RegionObserver pre hooks that operate
under row lock (Ram)
Repository: hbase
Updated Branches:
refs/heads/master 369141b79 -> 6a2467bbf
HBASE-11126-Add RegionObserver pre hooks that operate under row lock (Ram)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6a2467bb
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6a2467bb
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6a2467bb
Branch: refs/heads/master
Commit: 6a2467bbf23c97e8499ff269365a3a1eb391b23f
Parents: 369141b
Author: Ramkrishna <ra...@intel.com>
Authored: Tue Jun 3 17:54:59 2014 +0530
Committer: Ramkrishna <ra...@intel.com>
Committed: Tue Jun 3 17:54:59 2014 +0530
----------------------------------------------------------------------
.../hbase/coprocessor/BaseRegionObserver.java | 36 ++++
.../hbase/coprocessor/RegionObserver.java | 122 ++++++++++++-
.../hadoop/hbase/regionserver/HRegion.java | 96 +++++++---
.../regionserver/MultiRowMutationProcessor.java | 4 +-
.../regionserver/RegionCoprocessorHost.java | 182 +++++++++++++++++++
.../hbase/coprocessor/SimpleRegionObserver.java | 140 ++++++++++++++
.../TestRegionObserverInterface.java | 106 ++++++++++-
7 files changed, 644 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/6a2467bb/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
index c861192..1a1b2ac 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
@@ -311,6 +311,12 @@ public abstract class BaseRegionObserver implements RegionObserver {
}
@Override
+ public void prePrepareTimeStampForDeleteVersion(
+ final ObserverContext<RegionCoprocessorEnvironment> e, final Mutation delete,
+ final Cell cell, final byte[] byteNow, final Get get) throws IOException {
+ }
+
+ @Override
public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> e,
final Delete delete, final WALEdit edit, final Durability durability)
throws IOException {
@@ -340,6 +346,15 @@ public abstract class BaseRegionObserver implements RegionObserver {
}
@Override
+ public boolean preCheckAndPutAfterRowLock(
+ final ObserverContext<RegionCoprocessorEnvironment> e,
+ final byte[] row, final byte[] family, final byte[] qualifier, final CompareOp compareOp,
+ final ByteArrayComparable comparator, final Put put,
+ final boolean result) throws IOException {
+ return result;
+ }
+
+ @Override
public boolean postCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> e,
final byte [] row, final byte [] family, final byte [] qualifier,
final CompareOp compareOp, final ByteArrayComparable comparator,
@@ -356,6 +371,15 @@ public abstract class BaseRegionObserver implements RegionObserver {
}
@Override
+ public boolean preCheckAndDeleteAfterRowLock(
+ final ObserverContext<RegionCoprocessorEnvironment> e,
+ final byte[] row, final byte[] family, final byte[] qualifier, final CompareOp compareOp,
+ final ByteArrayComparable comparator, final Delete delete,
+ final boolean result) throws IOException {
+ return result;
+ }
+
+ @Override
public boolean postCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> e,
final byte [] row, final byte [] family, final byte [] qualifier,
final CompareOp compareOp, final ByteArrayComparable comparator,
@@ -370,6 +394,12 @@ public abstract class BaseRegionObserver implements RegionObserver {
}
@Override
+ public Result preAppendAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> e,
+ final Append append) throws IOException {
+ return null;
+ }
+
+ @Override
public Result postAppend(final ObserverContext<RegionCoprocessorEnvironment> e,
final Append append, final Result result) throws IOException {
return result;
@@ -397,6 +427,12 @@ public abstract class BaseRegionObserver implements RegionObserver {
}
@Override
+ public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> e,
+ final Increment increment) throws IOException {
+ return null;
+ }
+
+ @Override
public Result postIncrement(final ObserverContext<RegionCoprocessorEnvironment> e,
final Increment increment, final Result result) throws IOException {
return result;
http://git-wip-us.apache.org/repos/asf/hbase/blob/6a2467bb/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index 96cc3bd..3425a12 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -587,6 +587,24 @@ public interface RegionObserver extends Coprocessor {
void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
final Delete delete, final WALEdit edit, final Durability durability)
throws IOException;
+/**
+ * Called before the server updates the timestamp for version delete with latest timestamp.
+ * <p>
+ * Call CoprocessorEnvironment#bypass to skip default actions
+ * <p>
+ * Call CoprocessorEnvironment#complete to skip any subsequent chained
+ * coprocessors
+ * @param c the environment provided by the region server
+ * @param mutation - the parent mutation associated with this delete cell
+ * @param cell - The deleteColumn with latest version cell
+ * @param byteNow - timestamp bytes
+ * @param get - the get formed using the current cell's row.
+ * Note that the get does not specify the family and qualifier
+ * @throws IOException
+ */
+ void prePrepareTimeStampForDeleteVersion(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final Mutation mutation, final Cell cell, final byte[] byteNow,
+ final Get get) throws IOException;
/**
* Called after the client deletes a value.
@@ -657,7 +675,7 @@ public interface RegionObserver extends Coprocessor {
MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) throws IOException;
/**
- * Called before checkAndPut
+ * Called before checkAndPut.
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions
* <p>
@@ -682,6 +700,34 @@ public interface RegionObserver extends Coprocessor {
throws IOException;
/**
+ * Called before checkAndPut but after acquiring rowlock.
+ * <p>
+ * <b>Note:</b> Caution to be taken for not doing any long time operation in this hook.
+ * Row will be locked for longer time. Trying to acquire lock on another row, within this,
+ * can lead to potential deadlock.
+ * <p>
+ * Call CoprocessorEnvironment#bypass to skip default actions
+ * <p>
+ * Call CoprocessorEnvironment#complete to skip any subsequent chained
+ * coprocessors
+ * @param c the environment provided by the region server
+ * @param row row to check
+ * @param family column family
+ * @param qualifier column qualifier
+ * @param compareOp the comparison operation
+ * @param comparator the comparator
+ * @param put data to put if check succeeds
+ * @param result
+ * @return the return value to return to client if bypassing default
+ * processing
+ * @throws IOException if an error occurred on the coprocessor
+ */
+ boolean preCheckAndPutAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final byte[] row, final byte[] family, final byte[] qualifier, final CompareOp compareOp,
+ final ByteArrayComparable comparator, final Put put,
+ final boolean result) throws IOException;
+
+ /**
* Called after checkAndPut
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
@@ -704,7 +750,7 @@ public interface RegionObserver extends Coprocessor {
throws IOException;
/**
- * Called before checkAndDelete
+ * Called before checkAndDelete.
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions
* <p>
@@ -728,6 +774,33 @@ public interface RegionObserver extends Coprocessor {
throws IOException;
/**
+ * Called before checkAndDelete but after acquiring rowock.
+ * <p>
+ * <b>Note:</b> Caution to be taken for not doing any long time operation in this hook.
+ * Row will be locked for longer time. Trying to acquire lock on another row, within this,
+ * can lead to potential deadlock.
+ * <p>
+ * Call CoprocessorEnvironment#bypass to skip default actions
+ * <p>
+ * Call CoprocessorEnvironment#complete to skip any subsequent chained
+ * coprocessors
+ * @param c the environment provided by the region server
+ * @param row row to check
+ * @param family column family
+ * @param qualifier column qualifier
+ * @param compareOp the comparison operation
+ * @param comparator the comparator
+ * @param delete delete to commit if check succeeds
+ * @param result
+ * @return the value to return to client if bypassing default processing
+ * @throws IOException if an error occurred on the coprocessor
+ */
+ boolean preCheckAndDeleteAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final byte[] row, final byte[] family, final byte[] qualifier, final CompareOp compareOp,
+ final ByteArrayComparable comparator, final Delete delete,
+ final boolean result) throws IOException;
+
+ /**
* Called after checkAndDelete
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
@@ -795,7 +868,7 @@ public interface RegionObserver extends Coprocessor {
throws IOException;
/**
- * Called before Append
+ * Called before Append.
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions
* <p>
@@ -811,6 +884,25 @@ public interface RegionObserver extends Coprocessor {
throws IOException;
/**
+ * Called before Append but after acquiring rowlock.
+ * <p>
+ * <b>Note:</b> Caution to be taken for not doing any long time operation in this hook.
+ * Row will be locked for longer time. Trying to acquire lock on another row, within this,
+ * can lead to potential deadlock.
+ * <p>
+ * Call CoprocessorEnvironment#bypass to skip default actions
+ * <p>
+ * Call CoprocessorEnvironment#complete to skip any subsequent chained
+ * coprocessors
+ * @param c the environment provided by the region server
+ * @param append Append object
+ * @return result to return to the client if bypassing default processing
+ * @throws IOException if an error occurred on the coprocessor
+ */
+ Result preAppendAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final Append append) throws IOException;
+
+ /**
* Called after Append
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
@@ -826,7 +918,7 @@ public interface RegionObserver extends Coprocessor {
throws IOException;
/**
- * Called before Increment
+ * Called before Increment.
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions
* <p>
@@ -842,6 +934,28 @@ public interface RegionObserver extends Coprocessor {
throws IOException;
/**
+ * Called before Increment but after acquiring rowlock.
+ * <p>
+ * <b>Note:</b> Caution to be taken for not doing any long time operation in this hook.
+ * Row will be locked for longer time. Trying to acquire lock on another row, within this,
+ * can lead to potential deadlock.
+ * <p>
+ * Call CoprocessorEnvironment#bypass to skip default actions
+ * <p>
+ * Call CoprocessorEnvironment#complete to skip any subsequent chained coprocessors
+ *
+ * @param c
+ * the environment provided by the region server
+ * @param increment
+ * increment object
+ * @return result to return to the client if bypassing default processing
+ * @throws IOException
+ * if an error occurred on the coprocessor
+ */
+ Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final Increment increment) throws IOException;
+
+ /**
* Called after increment
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
http://git-wip-us.apache.org/repos/asf/hbase/blob/6a2467bb/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index dea1e81..0c4f7ee 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2030,10 +2030,13 @@ public class HRegion implements HeapSize { // , Writable{
/**
* Setup correct timestamps in the KVs in Delete object.
* Caller should have the row and region locks.
+ * @param mutation
+ * @param familyMap
+ * @param byteNow
* @throws IOException
*/
- void prepareDeleteTimestamps(Map<byte[], List<Cell>> familyMap, byte[] byteNow)
- throws IOException {
+ void prepareDeleteTimestamps(Mutation mutation, Map<byte[], List<Cell>> familyMap,
+ byte[] byteNow) throws IOException {
for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
byte[] family = e.getKey();
@@ -2059,20 +2062,14 @@ public class HRegion implements HeapSize { // , Writable{
Get get = new Get(CellUtil.cloneRow(kv));
get.setMaxVersions(count);
get.addColumn(family, qual);
-
- List<Cell> result = get(get, false);
-
- if (result.size() < count) {
- // Nothing to delete
- kv.updateLatestStamp(byteNow);
- continue;
- }
- if (result.size() > count) {
- throw new RuntimeException("Unexpected size: " + result.size());
+ if (coprocessorHost != null) {
+ if (!coprocessorHost.prePrepareTimeStampForDeleteVersion(mutation, cell, byteNow,
+ get)) {
+ updateDeleteLatestVersionTimeStamp(kv, get, count, byteNow);
+ }
+ } else {
+ updateDeleteLatestVersionTimeStamp(kv, get, count, byteNow);
}
- KeyValue getkv = KeyValueUtil.ensureKeyValue(result.get(count - 1));
- Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
- getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
} else {
kv.updateLatestStamp(byteNow);
}
@@ -2080,6 +2077,23 @@ public class HRegion implements HeapSize { // , Writable{
}
}
+ void updateDeleteLatestVersionTimeStamp(KeyValue kv, Get get, int count, byte[] byteNow)
+ throws IOException {
+ List<Cell> result = get(get, false);
+
+ if (result.size() < count) {
+ // Nothing to delete
+ kv.updateLatestStamp(byteNow);
+ return;
+ }
+ if (result.size() > count) {
+ throw new RuntimeException("Unexpected size: " + result.size());
+ }
+ KeyValue getkv = KeyValueUtil.ensureKeyValue(result.get(count - 1));
+ Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(), getkv.getBuffer(),
+ getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
+ }
+
/**
* @throws IOException
*/
@@ -2452,7 +2466,9 @@ public class HRegion implements HeapSize { // , Writable{
updateKVTimestamps(familyMaps[i].values(), byteNow);
noOfPuts++;
} else {
- prepareDeleteTimestamps(familyMaps[i], byteNow);
+ if (!isInReplay) {
+ prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
+ }
noOfDeletes++;
}
}
@@ -2712,9 +2728,21 @@ public class HRegion implements HeapSize { // , Writable{
RowLock rowLock = getRowLock(get.getRow());
// wait for all previous transactions to complete (with lock held)
mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
- List<Cell> result;
try {
- result = get(get, false);
+ if (this.getCoprocessorHost() != null) {
+ Boolean processed = null;
+ if (w instanceof Put) {
+ processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family,
+ qualifier, compareOp, comparator, (Put) w);
+ } else if (w instanceof Delete) {
+ processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family,
+ qualifier, compareOp, comparator, (Delete) w);
+ }
+ if (processed != null) {
+ return processed;
+ }
+ }
+ List<Cell> result = get(get, false);
boolean valueIsNull = comparator.getValue() == null ||
comparator.getValue().length == 0;
@@ -5033,12 +5061,18 @@ public class HRegion implements HeapSize { // , Writable{
rowLock = getRowLock(row);
try {
lock(this.updatesLock.readLock());
- // wait for all prior MVCC transactions to finish - while we hold the row lock
- // (so that we are guaranteed to see the latest state)
- mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
- // now start my own transaction
- w = mvcc.beginMemstoreInsert();
try {
+ // wait for all prior MVCC transactions to finish - while we hold the row lock
+ // (so that we are guaranteed to see the latest state)
+ mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
+ if (this.coprocessorHost != null) {
+ Result r = this.coprocessorHost.preAppendAfterRowLock(append);
+ if(r!= null) {
+ return r;
+ }
+ }
+ // now start my own transaction
+ w = mvcc.beginMemstoreInsert();
long now = EnvironmentEdgeManager.currentTimeMillis();
// Process each family
for (Map.Entry<byte[], List<Cell>> family : append.getFamilyCellMap().entrySet()) {
@@ -5221,12 +5255,18 @@ public class HRegion implements HeapSize { // , Writable{
RowLock rowLock = getRowLock(row);
try {
lock(this.updatesLock.readLock());
- // wait for all prior MVCC transactions to finish - while we hold the row lock
- // (so that we are guaranteed to see the latest state)
- mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
- // now start my own transaction
- w = mvcc.beginMemstoreInsert();
try {
+ // wait for all prior MVCC transactions to finish - while we hold the row lock
+ // (so that we are guaranteed to see the latest state)
+ mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
+ if (this.coprocessorHost != null) {
+ Result r = this.coprocessorHost.preIncrementAfterRowLock(increment);
+ if (r != null) {
+ return r;
+ }
+ }
+ // now start my own transaction
+ w = mvcc.beginMemstoreInsert();
long now = EnvironmentEdgeManager.currentTimeMillis();
// Process each family
for (Map.Entry<byte [], List<Cell>> family:
http://git-wip-us.apache.org/repos/asf/hbase/blob/6a2467bb/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java
index 847e6cb..9cfa326 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java
@@ -27,9 +27,9 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationProcessorRequest;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationProcessorResponse;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -80,7 +80,7 @@ MultiRowMutationProcessorResponse> {
} else if (m instanceof Delete) {
Delete d = (Delete) m;
region.prepareDelete(d);
- region.prepareDeleteTimestamps(d.getFamilyCellMap(), byteNow);
+ region.prepareDeleteTimestamps(d, d.getFamilyCellMap(), byteNow);
} else {
throw new DoNotRetryIOException(
"Action must be Put or Delete. But was: "
http://git-wip-us.apache.org/repos/asf/hbase/blob/6a2467bb/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index 177f153..e476ea2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -1135,6 +1135,44 @@ public class RegionCoprocessorHost
}
/**
+ * @param mutation - the current mutation
+ * @param kv - the current cell
+ * @param byteNow - current timestamp in bytes
+ * @param get - the get that could be used
+ * Note that the get only does not specify the family and qualifier that should be used
+ * @return true if default processing should be bypassed
+ * @exception IOException
+ * Exception
+ */
+ public boolean prePrepareTimeStampForDeleteVersion(Mutation mutation,
+ Cell kv, byte[] byteNow, Get get) throws IOException {
+ boolean bypass = false;
+ ObserverContext<RegionCoprocessorEnvironment> ctx = null;
+ for (RegionEnvironment env : coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ctx = ObserverContext.createAndPrepare(env, ctx);
+ Thread currentThread = Thread.currentThread();
+ ClassLoader cl = currentThread.getContextClassLoader();
+ try {
+ currentThread.setContextClassLoader(env.getClassLoader());
+ ((RegionObserver) env.getInstance())
+ .prePrepareTimeStampForDeleteVersion(ctx, mutation, kv,
+ byteNow, get);
+ } catch (Throwable e) {
+ handleCoprocessorThrowable(env, e);
+ } finally {
+ currentThread.setContextClassLoader(cl);
+ }
+ bypass |= ctx.shouldBypass();
+ if (ctx.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ return bypass;
+ }
+
+ /**
* @param put The Put object
* @param edit The WALEdit object.
* @param durability The durability used
@@ -1356,6 +1394,46 @@ public class RegionCoprocessorHost
* @param compareOp the comparison operation
* @param comparator the comparator
* @param put data to put if check succeeds
+ * @return true or false to return to client if default processing should
+ * be bypassed, or null otherwise
+ * @throws IOException e
+ */
+ public Boolean preCheckAndPutAfterRowLock(final byte[] row, final byte[] family,
+ final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
+ final Put put) throws IOException {
+ boolean bypass = false;
+ boolean result = false;
+ ObserverContext<RegionCoprocessorEnvironment> ctx = null;
+ for (RegionEnvironment env : coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ctx = ObserverContext.createAndPrepare(env, ctx);
+ Thread currentThread = Thread.currentThread();
+ ClassLoader cl = currentThread.getContextClassLoader();
+ try {
+ currentThread.setContextClassLoader(env.getClassLoader());
+ result = ((RegionObserver) env.getInstance()).preCheckAndPutAfterRowLock(ctx, row,
+ family, qualifier, compareOp, comparator, put, result);
+ } catch (Throwable e) {
+ handleCoprocessorThrowable(env, e);
+ } finally {
+ currentThread.setContextClassLoader(cl);
+ }
+ bypass |= ctx.shouldBypass();
+ if (ctx.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ return bypass ? result : null;
+ }
+
+ /**
+ * @param row row to check
+ * @param family column family
+ * @param qualifier column qualifier
+ * @param compareOp the comparison operation
+ * @param comparator the comparator
+ * @param put data to put if check succeeds
* @throws IOException e
*/
public boolean postCheckAndPut(final byte [] row, final byte [] family,
@@ -1434,6 +1512,46 @@ public class RegionCoprocessorHost
* @param compareOp the comparison operation
* @param comparator the comparator
* @param delete delete to commit if check succeeds
+ * @return true or false to return to client if default processing should
+ * be bypassed, or null otherwise
+ * @throws IOException e
+ */
+ public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family,
+ final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
+ final Delete delete) throws IOException {
+ boolean bypass = false;
+ boolean result = false;
+ ObserverContext<RegionCoprocessorEnvironment> ctx = null;
+ for (RegionEnvironment env : coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ctx = ObserverContext.createAndPrepare(env, ctx);
+ Thread currentThread = Thread.currentThread();
+ ClassLoader cl = currentThread.getContextClassLoader();
+ try {
+ currentThread.setContextClassLoader(env.getClassLoader());
+ result = ((RegionObserver) env.getInstance()).preCheckAndDeleteAfterRowLock(ctx, row,
+ family, qualifier, compareOp, comparator, delete, result);
+ } catch (Throwable e) {
+ handleCoprocessorThrowable(env, e);
+ } finally {
+ currentThread.setContextClassLoader(cl);
+ }
+ bypass |= ctx.shouldBypass();
+ if (ctx.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ return bypass ? result : null;
+ }
+
+ /**
+ * @param row row to check
+ * @param family column family
+ * @param qualifier column qualifier
+ * @param compareOp the comparison operation
+ * @param comparator the comparator
+ * @param delete delete to commit if check succeeds
* @throws IOException e
*/
public boolean postCheckAndDelete(final byte [] row, final byte [] family,
@@ -1496,6 +1614,38 @@ public class RegionCoprocessorHost
}
/**
+ * @param append append object
+ * @return result to return to client if default operation should be
+ * bypassed, null otherwise
+ * @throws IOException if an error occurred on the coprocessor
+ */
+ public Result preAppendAfterRowLock(final Append append) throws IOException {
+ boolean bypass = false;
+ Result result = null;
+ ObserverContext<RegionCoprocessorEnvironment> ctx = null;
+ for (RegionEnvironment env : coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ctx = ObserverContext.createAndPrepare(env, ctx);
+ Thread currentThread = Thread.currentThread();
+ ClassLoader cl = currentThread.getContextClassLoader();
+ try {
+ currentThread.setContextClassLoader(env.getClassLoader());
+ result = ((RegionObserver) env.getInstance()).preAppendAfterRowLock(ctx, append);
+ } catch (Throwable e) {
+ handleCoprocessorThrowable(env, e);
+ } finally {
+ currentThread.setContextClassLoader(cl);
+ }
+ bypass |= ctx.shouldBypass();
+ if (ctx.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ return bypass ? result : null;
+ }
+
+ /**
* @param increment increment object
* @return result to return to client if default operation should be
* bypassed, null otherwise
@@ -1528,6 +1678,38 @@ public class RegionCoprocessorHost
}
/**
+ * @param increment increment object
+ * @return result to return to client if default operation should be
+ * bypassed, null otherwise
+ * @throws IOException if an error occurred on the coprocessor
+ */
+ public Result preIncrementAfterRowLock(final Increment increment) throws IOException {
+ boolean bypass = false;
+ Result result = null;
+ ObserverContext<RegionCoprocessorEnvironment> ctx = null;
+ for (RegionEnvironment env : coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ctx = ObserverContext.createAndPrepare(env, ctx);
+ Thread currentThread = Thread.currentThread();
+ ClassLoader cl = currentThread.getContextClassLoader();
+ try {
+ currentThread.setContextClassLoader(env.getClassLoader());
+ result = ((RegionObserver) env.getInstance()).preIncrementAfterRowLock(ctx, increment);
+ } catch (Throwable e) {
+ handleCoprocessorThrowable(env, e);
+ } finally {
+ currentThread.setContextClassLoader(cl);
+ }
+ bypass |= ctx.shouldBypass();
+ if (ctx.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ return bypass ? result : null;
+ }
+
+ /**
* @param append Append object
* @param result the result returned by the append
* @throws IOException if an error occurred on the coprocessor
http://git-wip-us.apache.org/repos/asf/hbase/blob/6a2467bb/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
index 45ecf43..bf53518 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
@@ -48,6 +49,8 @@ import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -96,11 +99,22 @@ public class SimpleRegionObserver extends BaseRegionObserver {
final AtomicInteger ctPrePut = new AtomicInteger(0);
final AtomicInteger ctPostPut = new AtomicInteger(0);
final AtomicInteger ctPreDeleted = new AtomicInteger(0);
+ final AtomicInteger ctPrePrepareDeleteTS = new AtomicInteger(0);
final AtomicInteger ctPostDeleted = new AtomicInteger(0);
final AtomicInteger ctPreGetClosestRowBefore = new AtomicInteger(0);
final AtomicInteger ctPostGetClosestRowBefore = new AtomicInteger(0);
final AtomicInteger ctPreIncrement = new AtomicInteger(0);
+ final AtomicInteger ctPreIncrementAfterRowLock = new AtomicInteger(0);
+ final AtomicInteger ctPreAppend = new AtomicInteger(0);
+ final AtomicInteger ctPreAppendAfterRowLock = new AtomicInteger(0);
final AtomicInteger ctPostIncrement = new AtomicInteger(0);
+ final AtomicInteger ctPostAppend = new AtomicInteger(0);
+ final AtomicInteger ctPreCheckAndPut = new AtomicInteger(0);
+ final AtomicInteger ctPreCheckAndPutAfterRowLock = new AtomicInteger(0);
+ final AtomicInteger ctPostCheckAndPut = new AtomicInteger(0);
+ final AtomicInteger ctPreCheckAndDelete = new AtomicInteger(0);
+ final AtomicInteger ctPreCheckAndDeleteAfterRowLock = new AtomicInteger(0);
+ final AtomicInteger ctPostCheckAndDelete = new AtomicInteger(0);
final AtomicInteger ctPreWALRestored = new AtomicInteger(0);
final AtomicInteger ctPostWALRestored = new AtomicInteger(0);
final AtomicInteger ctPreScannerNext = new AtomicInteger(0);
@@ -435,6 +449,12 @@ public class SimpleRegionObserver extends BaseRegionObserver {
}
@Override
+ public void prePrepareTimeStampForDeleteVersion(ObserverContext<RegionCoprocessorEnvironment> e,
+ Mutation delete, Cell cell, byte[] byteNow, Get get) throws IOException {
+ ctPrePrepareDeleteTS.incrementAndGet();
+ }
+
+ @Override
public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
final Delete delete, final WALEdit edit,
final Durability durability) throws IOException {
@@ -521,6 +541,13 @@ public class SimpleRegionObserver extends BaseRegionObserver {
}
@Override
+ public Result preIncrementAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> e,
+ Increment increment) throws IOException {
+ ctPreIncrementAfterRowLock.incrementAndGet();
+ return null;
+ }
+
+ @Override
public Result postIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
final Increment increment, final Result result) throws IOException {
ctPostIncrement.incrementAndGet();
@@ -528,6 +555,75 @@ public class SimpleRegionObserver extends BaseRegionObserver {
}
@Override
+ public boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> e, byte[] row,
+ byte[] family, byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator,
+ Put put, boolean result) throws IOException {
+ ctPreCheckAndPut.incrementAndGet();
+ return true;
+ }
+
+ @Override
+ public boolean preCheckAndPutAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> e,
+ byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp,
+ ByteArrayComparable comparator, Put put, boolean result) throws IOException {
+ ctPreCheckAndPutAfterRowLock.incrementAndGet();
+ return true;
+ }
+
+ @Override
+ public boolean postCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> e, byte[] row,
+ byte[] family, byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator,
+ Put put, boolean result) throws IOException {
+ ctPostCheckAndPut.incrementAndGet();
+ return true;
+ }
+
+ @Override
+ public boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> e, byte[] row,
+ byte[] family, byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator,
+ Delete delete, boolean result) throws IOException {
+ ctPreCheckAndDelete.incrementAndGet();
+ return true;
+ }
+
+ @Override
+ public boolean preCheckAndDeleteAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> e,
+ byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp,
+ ByteArrayComparable comparator, Delete delete, boolean result) throws IOException {
+ ctPreCheckAndDeleteAfterRowLock.incrementAndGet();
+ return true;
+ }
+
+ @Override
+ public boolean postCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> e, byte[] row,
+ byte[] family, byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator,
+ Delete delete, boolean result) throws IOException {
+ ctPostCheckAndDelete.incrementAndGet();
+ return true;
+ }
+
+ @Override
+ public Result preAppendAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> e,
+ Append append) throws IOException {
+ ctPreAppendAfterRowLock.incrementAndGet();
+ return null;
+ }
+
+ @Override
+ public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> e, Append append)
+ throws IOException {
+ ctPreAppend.incrementAndGet();
+ return null;
+ }
+
+ @Override
+ public Result postAppend(ObserverContext<RegionCoprocessorEnvironment> e, Append append,
+ Result result) throws IOException {
+ ctPostAppend.incrementAndGet();
+ return null;
+ }
+
+ @Override
public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
List<Pair<byte[], String>> familyPaths) throws IOException {
RegionCoprocessorEnvironment e = ctx.getEnvironment();
@@ -646,14 +742,58 @@ public class SimpleRegionObserver extends BaseRegionObserver {
return ctPostCloseRegionOperation.get();
}
+ public boolean hadPreCheckAndPut() {
+ return ctPreCheckAndPut.get() > 0;
+ }
+
+ public boolean hadPreCheckAndPutAfterRowLock() {
+ return ctPreCheckAndPutAfterRowLock.get() > 0;
+ }
+
+ public boolean hadPostCheckAndPut() {
+ return ctPostCheckAndPut.get() > 0;
+ }
+
+ public boolean hadPreCheckAndDelete() {
+ return ctPreCheckAndDelete.get() > 0;
+ }
+
+ public boolean hadPreCheckAndDeleteAfterRowLock() {
+ return ctPreCheckAndDeleteAfterRowLock.get() > 0;
+ }
+
+ public boolean hadPostCheckAndDelete() {
+ return ctPostCheckAndDelete.get() > 0;
+ }
+
public boolean hadPreIncrement() {
return ctPreIncrement.get() > 0;
}
+
+ public boolean hadPreIncrementAfterRowLock() {
+ return ctPreIncrementAfterRowLock.get() > 0;
+ }
public boolean hadPostIncrement() {
return ctPostIncrement.get() > 0;
}
+ public boolean hadPreAppend() {
+ return ctPreAppend.get() > 0;
+ }
+
+ public boolean hadPreAppendAfterRowLock() {
+ return ctPreAppendAfterRowLock.get() > 0;
+ }
+
+ public boolean hadPostAppend() {
+ return ctPostAppend.get() > 0;
+ }
+
+ public boolean hadPrePreparedDeleteTS() {
+ return ctPrePrepareDeleteTS.get() > 0;
+ }
+
public boolean hadPreWALRestored() {
return ctPreWALRestored.get() > 0;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6a2467bb/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
index be56c96..7bf31cd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
@@ -145,9 +146,9 @@ public class TestRegionObserverInterface {
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
- "hadDelete"},
+ "hadDelete", "hadPrePreparedDeleteTS"},
tableName,
- new Boolean[] {true, true, true, true, false}
+ new Boolean[] {true, true, true, true, false, false}
);
Delete delete = new Delete(ROW);
@@ -158,9 +159,9 @@ public class TestRegionObserverInterface {
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
- "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"},
+ "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete", "hadPrePreparedDeleteTS"},
tableName,
- new Boolean[] {true, true, true, true, true, true, true}
+ new Boolean[] {true, true, true, true, true, true, true, true}
);
} finally {
util.deleteTable(tableName);
@@ -218,17 +219,106 @@ public class TestRegionObserverInterface {
inc.addColumn(A, A, 1);
verifyMethodResult(SimpleRegionObserver.class,
- new String[] {"hadPreIncrement", "hadPostIncrement"},
+ new String[] {"hadPreIncrement", "hadPostIncrement", "hadPreIncrementAfterRowLock"},
tableName,
- new Boolean[] {false, false}
+ new Boolean[] {false, false, false}
);
table.increment(inc);
verifyMethodResult(SimpleRegionObserver.class,
- new String[] {"hadPreIncrement", "hadPostIncrement"},
+ new String[] {"hadPreIncrement", "hadPostIncrement", "hadPreIncrementAfterRowLock"},
tableName,
- new Boolean[] {true, true}
+ new Boolean[] {true, true, true}
+ );
+ } finally {
+ util.deleteTable(tableName);
+ table.close();
+ }
+ }
+
+ @Test
+ public void testCheckAndPutHooks() throws IOException {
+ TableName tableName =
+ TableName.valueOf(TEST_TABLE.getNameAsString() + ".testCheckAndPutHooks");
+ HTable table = util.createTable(tableName, new byte[][] {A, B, C});
+ try {
+ Put p = new Put(Bytes.toBytes(0));
+ p.add(A, A, A);
+ table.put(p);
+ table.flushCommits();
+ p = new Put(Bytes.toBytes(0));
+ p.add(A, A, A);
+ verifyMethodResult(SimpleRegionObserver.class,
+ new String[] {"hadPreCheckAndPut",
+ "hadPreCheckAndPutAfterRowLock", "hadPostCheckAndPut"},
+ tableName,
+ new Boolean[] {false, false, false}
+ );
+ table.checkAndPut(Bytes.toBytes(0), A, A, A, p);
+ verifyMethodResult(SimpleRegionObserver.class,
+ new String[] {"hadPreCheckAndPut",
+ "hadPreCheckAndPutAfterRowLock", "hadPostCheckAndPut"},
+ tableName,
+ new Boolean[] {true, true, true}
+ );
+ } finally {
+ util.deleteTable(tableName);
+ table.close();
+ }
+ }
+
+ @Test
+ public void testCheckAndDeleteHooks() throws IOException {
+ TableName tableName =
+ TableName.valueOf(TEST_TABLE.getNameAsString() + ".testCheckAndDeleteHooks");
+ HTable table = util.createTable(tableName, new byte[][] {A, B, C});
+ try {
+ Put p = new Put(Bytes.toBytes(0));
+ p.add(A, A, A);
+ table.put(p);
+ table.flushCommits();
+ Delete d = new Delete(Bytes.toBytes(0));
+ table.delete(d);
+ verifyMethodResult(SimpleRegionObserver.class,
+ new String[] {"hadPreCheckAndDelete",
+ "hadPreCheckAndDeleteAfterRowLock", "hadPostCheckAndDelete"},
+ tableName,
+ new Boolean[] {false, false, false}
+ );
+ table.checkAndDelete(Bytes.toBytes(0), A, A, A, d);
+ verifyMethodResult(SimpleRegionObserver.class,
+ new String[] {"hadPreCheckAndDelete",
+ "hadPreCheckAndDeleteAfterRowLock", "hadPostCheckAndDelete"},
+ tableName,
+ new Boolean[] {true, true, true}
+ );
+ } finally {
+ util.deleteTable(tableName);
+ table.close();
+ }
+ }
+
+ @Test
+ public void testAppendHook() throws IOException {
+ TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".testAppendHook");
+ HTable table = util.createTable(tableName, new byte[][] {A, B, C});
+ try {
+ Append app = new Append(Bytes.toBytes(0));
+ app.add(A, A, A);
+
+ verifyMethodResult(SimpleRegionObserver.class,
+ new String[] {"hadPreAppend", "hadPostAppend", "hadPreAppendAfterRowLock"},
+ tableName,
+ new Boolean[] {false, false, false}
+ );
+
+ table.append(app);
+
+ verifyMethodResult(SimpleRegionObserver.class,
+ new String[] {"hadPreAppend", "hadPostAppend", "hadPreAppendAfterRowLock"},
+ tableName,
+ new Boolean[] {true, true, true}
);
} finally {
util.deleteTable(tableName);