You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by br...@apache.org on 2020/08/10 09:57:53 UTC
[hbase] branch branch-2 updated: HBASE-24680 Refactor the
checkAndMutate code on the server side (#2184)
This is an automated email from the ASF dual-hosted git repository.
brfrn169 pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 22bf9a3 HBASE-24680 Refactor the checkAndMutate code on the server side (#2184)
22bf9a3 is described below
commit 22bf9a38c97f73bc1507c7de86af032500ead1ac
Author: Toshihiro Suzuki <br...@gmail.com>
AuthorDate: Mon Aug 10 18:57:17 2020 +0900
HBASE-24680 Refactor the checkAndMutate code on the server side (#2184)
Signed-off-by: Duo Zhang <zh...@apache.org>
Signed-off-by: Josh Elser <el...@apache.org>
---
.../apache/hadoop/hbase/client/CheckAndMutate.java | 11 +-
.../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 71 +++
.../regionserver/MetricsRegionServerSource.java | 7 +
.../MetricsRegionServerSourceImpl.java | 7 +
.../hadoop/hbase/coprocessor/RegionObserver.java | 189 ++++++++
.../apache/hadoop/hbase/regionserver/HRegion.java | 133 ++++--
.../hbase/regionserver/MetricsRegionServer.java | 4 +
.../hadoop/hbase/regionserver/RSRpcServices.java | 428 +++++++----------
.../apache/hadoop/hbase/regionserver/Region.java | 44 ++
.../hbase/regionserver/RegionCoprocessorHost.java | 311 ++-----------
.../hbase/coprocessor/SimpleRegionObserver.java | 39 ++
.../coprocessor/TestRegionObserverInterface.java | 73 ++-
.../hadoop/hbase/regionserver/TestHRegion.java | 512 ++++++++++++++++++++-
.../regionserver/TestMetricsRegionServer.java | 3 +-
14 files changed, 1213 insertions(+), 619 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java
index 5fce3e2..26eb23d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java
@@ -214,7 +214,7 @@ public final class CheckAndMutate extends Mutation {
this.op = op;
this.value = value;
this.filter = null;
- this.timeRange = timeRange;
+ this.timeRange = timeRange != null ? timeRange : TimeRange.allTime();
this.action = action;
}
@@ -225,7 +225,7 @@ public final class CheckAndMutate extends Mutation {
this.op = null;
this.value = null;
this.filter = filter;
- this.timeRange = timeRange;
+ this.timeRange = timeRange != null ? timeRange : TimeRange.allTime();
this.action = action;
}
@@ -265,6 +265,13 @@ public final class CheckAndMutate extends Mutation {
}
/**
+ * @return whether this has a filter or not
+ */
+ public boolean hasFilter() {
+ return filter != null;
+ }
+
+ /**
* @return the time range to check
*/
public TimeRange getTimeRange() {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 09db446..c82243a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.Cell.Type;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ExtendedCellBuilder;
import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
@@ -65,6 +66,7 @@ import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.ClientUtil;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
@@ -84,6 +86,7 @@ import org.apache.hadoop.hbase.client.RegionLoadStats;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.RegionStatesCount;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.SlowLogParams;
import org.apache.hadoop.hbase.client.SnapshotDescription;
@@ -3469,4 +3472,72 @@ public final class ProtobufUtil {
return clearSlowLogResponses.getIsCleaned();
}
+ public static CheckAndMutate toCheckAndMutate(ClientProtos.Condition condition,
+ MutationProto mutation, CellScanner cellScanner) throws IOException {
+ byte[] row = condition.getRow().toByteArray();
+ CheckAndMutate.Builder builder = CheckAndMutate.newBuilder(row);
+ Filter filter = condition.hasFilter() ? ProtobufUtil.toFilter(condition.getFilter()) : null;
+ if (filter != null) {
+ builder.ifMatches(filter);
+ } else {
+ builder.ifMatches(condition.getFamily().toByteArray(),
+ condition.getQualifier().toByteArray(),
+ CompareOperator.valueOf(condition.getCompareType().name()),
+ ProtobufUtil.toComparator(condition.getComparator()).getValue());
+ }
+ TimeRange timeRange = condition.hasTimeRange() ?
+ ProtobufUtil.toTimeRange(condition.getTimeRange()) : TimeRange.allTime();
+ builder.timeRange(timeRange);
+
+ try {
+ MutationType type = mutation.getMutateType();
+ switch (type) {
+ case PUT:
+ return builder.build(ProtobufUtil.toPut(mutation, cellScanner));
+ case DELETE:
+ return builder.build(ProtobufUtil.toDelete(mutation, cellScanner));
+ default:
+ throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
+ }
+ } catch (IllegalArgumentException e) {
+ throw new DoNotRetryIOException(e.getMessage());
+ }
+ }
+
+ public static CheckAndMutate toCheckAndMutate(ClientProtos.Condition condition,
+ List<Mutation> mutations) throws IOException {
+ assert mutations.size() > 0;
+ byte[] row = condition.getRow().toByteArray();
+ CheckAndMutate.Builder builder = CheckAndMutate.newBuilder(row);
+ Filter filter = condition.hasFilter() ? ProtobufUtil.toFilter(condition.getFilter()) : null;
+ if (filter != null) {
+ builder.ifMatches(filter);
+ } else {
+ builder.ifMatches(condition.getFamily().toByteArray(),
+ condition.getQualifier().toByteArray(),
+ CompareOperator.valueOf(condition.getCompareType().name()),
+ ProtobufUtil.toComparator(condition.getComparator()).getValue());
+ }
+ TimeRange timeRange = condition.hasTimeRange() ?
+ ProtobufUtil.toTimeRange(condition.getTimeRange()) : TimeRange.allTime();
+ builder.timeRange(timeRange);
+
+ try {
+ if (mutations.size() == 1) {
+ Mutation m = mutations.get(0);
+ if (m instanceof Put) {
+ return builder.build((Put) m);
+ } else if (m instanceof Delete) {
+ return builder.build((Delete) m);
+ } else {
+ throw new DoNotRetryIOException("Unsupported mutate type: " + mutations.get(0)
+ .getClass().getSimpleName().toUpperCase());
+ }
+ } else {
+ return builder.build(new RowMutations(mutations.get(0).getRow()).add(mutations));
+ }
+ } catch (IllegalArgumentException e) {
+ throw new DoNotRetryIOException(e.getMessage());
+ }
+ }
}
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
index 958495a..ce9319e 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
@@ -87,6 +87,12 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo
void updateCheckAndPut(long t);
/**
+ * Update checkAndMutate histogram
+ * @param t time it took
+ */
+ void updateCheckAndMutate(long t);
+
+ /**
* Update the Get time histogram .
*
* @param t time it took
@@ -393,6 +399,7 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo
String DELETE_KEY = "delete";
String CHECK_AND_DELETE_KEY = "checkAndDelete";
String CHECK_AND_PUT_KEY = "checkAndPut";
+ String CHECK_AND_MUTATE_KEY = "checkAndMutate";
String DELETE_BATCH_KEY = "deleteBatch";
String GET_SIZE_KEY = "getSize";
String GET_KEY = "get";
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
index 4af8bec..e8d7f36 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
@@ -42,6 +42,7 @@ public class MetricsRegionServerSourceImpl
private final MetricHistogram deleteBatchHisto;
private final MetricHistogram checkAndDeleteHisto;
private final MetricHistogram checkAndPutHisto;
+ private final MetricHistogram checkAndMutateHisto;
private final MetricHistogram getHisto;
private final MetricHistogram incrementHisto;
private final MetricHistogram appendHisto;
@@ -112,6 +113,7 @@ public class MetricsRegionServerSourceImpl
deleteBatchHisto = getMetricsRegistry().newTimeHistogram(DELETE_BATCH_KEY);
checkAndDeleteHisto = getMetricsRegistry().newTimeHistogram(CHECK_AND_DELETE_KEY);
checkAndPutHisto = getMetricsRegistry().newTimeHistogram(CHECK_AND_PUT_KEY);
+ checkAndMutateHisto = getMetricsRegistry().newTimeHistogram(CHECK_AND_MUTATE_KEY);
getHisto = getMetricsRegistry().newTimeHistogram(GET_KEY);
slowGet = getMetricsRegistry().newCounter(SLOW_GET_KEY, SLOW_GET_DESC, 0L);
@@ -615,6 +617,11 @@ public class MetricsRegionServerSourceImpl
}
@Override
+ public void updateCheckAndMutate(long t) {
+ checkAndMutateHisto.add(t);
+ }
+
+ @Override
public void updatePutBatch(long t) {
putBatchHisto.add(t);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index 05071fc..0bc0631 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
+import org.apache.hadoop.hbase.client.CheckAndMutateResult;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
@@ -39,6 +41,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
@@ -514,7 +517,11 @@ public interface RegionObserver {
* @param put data to put if check succeeds
* @param result the default value of the result
* @return the return value to return to client if bypassing default processing
+ *
+ * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+ * {@link #preCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead.
*/
+ @Deprecated
default boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Put put,
boolean result) throws IOException {
@@ -535,7 +542,11 @@ public interface RegionObserver {
* @param put data to put if check succeeds
* @param result the default value of the result
* @return the return value to return to client if bypassing default processing
+ *
+ * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+ * {@link #preCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead.
*/
+ @Deprecated
default boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
Filter filter, Put put, boolean result) throws IOException {
return result;
@@ -562,7 +573,12 @@ public interface RegionObserver {
* @param put data to put if check succeeds
* @param result the default value of the result
* @return the return value to return to client if bypassing default processing
+ *
+ * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+ * {@link #preCheckAndMutateAfterRowLock(ObserverContext, CheckAndMutate,CheckAndMutateResult)}
+ * instead.
*/
+ @Deprecated
default boolean preCheckAndPutAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
ByteArrayComparable comparator, Put put, boolean result) throws IOException {
@@ -587,7 +603,12 @@ public interface RegionObserver {
* @param put data to put if check succeeds
* @param result the default value of the result
* @return the return value to return to client if bypassing default processing
+ *
+ * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+ * {@link #preCheckAndMutateAfterRowLock(ObserverContext, CheckAndMutate,CheckAndMutateResult)}
+ * instead.
*/
+ @Deprecated
default boolean preCheckAndPutAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
byte[] row, Filter filter, Put put, boolean result) throws IOException {
return result;
@@ -607,7 +628,11 @@ public interface RegionObserver {
* @param put data to put if check succeeds
* @param result from the checkAndPut
* @return the possibly transformed return value to return to client
+ *
+ * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+ * {@link #postCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead.
*/
+ @Deprecated
default boolean postCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Put put,
boolean result) throws IOException {
@@ -625,7 +650,11 @@ public interface RegionObserver {
* @param put data to put if check succeeds
* @param result from the checkAndPut
* @return the possibly transformed return value to return to client
+ *
+ * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+ * {@link #postCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead.
*/
+ @Deprecated
default boolean postCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
Filter filter, Put put, boolean result) throws IOException {
return result;
@@ -648,7 +677,11 @@ public interface RegionObserver {
* @param delete delete to commit if check succeeds
* @param result the default value of the result
* @return the value to return to client if bypassing default processing
+ *
+ * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+ * {@link #preCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead.
*/
+ @Deprecated
default boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator,
Delete delete, boolean result) throws IOException {
@@ -669,7 +702,11 @@ public interface RegionObserver {
* @param delete delete to commit if check succeeds
* @param result the default value of the result
* @return the value to return to client if bypassing default processing
+ *
+ * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+ * {@link #preCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead.
*/
+ @Deprecated
default boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
Filter filter, Delete delete, boolean result) throws IOException {
return result;
@@ -696,7 +733,12 @@ public interface RegionObserver {
* @param delete delete to commit if check succeeds
* @param result the default value of the result
* @return the value to return to client if bypassing default processing
+ *
+ * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+ * {@link #preCheckAndMutateAfterRowLock(ObserverContext, CheckAndMutate,CheckAndMutateResult)}
+ * instead.
*/
+ @Deprecated
default boolean preCheckAndDeleteAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
ByteArrayComparable comparator, Delete delete, boolean result) throws IOException {
@@ -721,7 +763,12 @@ public interface RegionObserver {
* @param delete delete to commit if check succeeds
* @param result the default value of the result
* @return the value to return to client if bypassing default processing
+ *
+ * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+ * {@link #preCheckAndMutateAfterRowLock(ObserverContext, CheckAndMutate,CheckAndMutateResult)}
+ * instead.
*/
+ @Deprecated
default boolean preCheckAndDeleteAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
byte[] row, Filter filter, Delete delete, boolean result) throws IOException {
return result;
@@ -741,7 +788,11 @@ public interface RegionObserver {
* @param delete delete to commit if check succeeds
* @param result from the CheckAndDelete
* @return the possibly transformed returned value to return to client
+ *
+ * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+ * {@link #postCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead.
*/
+ @Deprecated
default boolean postCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator,
Delete delete, boolean result) throws IOException {
@@ -759,13 +810,151 @@ public interface RegionObserver {
* @param delete delete to commit if check succeeds
* @param result from the CheckAndDelete
* @return the possibly transformed returned value to return to client
+ *
+ * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+ * {@link #postCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead.
*/
+ @Deprecated
default boolean postCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
Filter filter, Delete delete, boolean result) throws IOException {
return result;
}
/**
+ * Called before checkAndMutate
+ * <p>
+ * Call CoprocessorEnvironment#bypass to skip default actions.
+ * If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
+ * <p>
+ * Note: Do not retain references to any Cells in actions beyond the life of this invocation.
+ * If need a Cell reference for later use, copy the cell and use that.
+ * @param c the environment provided by the region server
+ * @param checkAndMutate the CheckAndMutate object
+ * @param result the default value of the result
+ * @return the return value to return to client if bypassing default processing
+ * @throws IOException if an error occurred on the coprocessor
+ */
+ default CheckAndMutateResult preCheckAndMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+ CheckAndMutate checkAndMutate, CheckAndMutateResult result) throws IOException {
+ if (checkAndMutate.getAction() instanceof Put) {
+ boolean success;
+ if (checkAndMutate.hasFilter()) {
+ success = preCheckAndPut(c, checkAndMutate.getRow(), checkAndMutate.getFilter(),
+ (Put) checkAndMutate.getAction(), result.isSuccess());
+ } else {
+ success = preCheckAndPut(c, checkAndMutate.getRow(), checkAndMutate.getFamily(),
+ checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(),
+ new BinaryComparator(checkAndMutate.getValue()), (Put) checkAndMutate.getAction(),
+ result.isSuccess());
+ }
+ return new CheckAndMutateResult(success, null);
+ } else if (checkAndMutate.getAction() instanceof Delete) {
+ boolean success;
+ if (checkAndMutate.hasFilter()) {
+ success = preCheckAndDelete(c, checkAndMutate.getRow(), checkAndMutate.getFilter(),
+ (Delete) checkAndMutate.getAction(), result.isSuccess());
+ } else {
+ success = preCheckAndDelete(c, checkAndMutate.getRow(), checkAndMutate.getFamily(),
+ checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(),
+ new BinaryComparator(checkAndMutate.getValue()), (Delete) checkAndMutate.getAction(),
+ result.isSuccess());
+ }
+ return new CheckAndMutateResult(success, null);
+ }
+ return result;
+ }
+
+ /**
+ * Called before checkAndDelete but after acquiring rowlock.
+ * <p>
+ * <b>Note:</b> Caution to be taken for not doing any long time operation in this hook.
+ * Row will be locked for longer time. Trying to acquire lock on another row, within this,
+ * can lead to potential deadlock.
+ * <p>
+ * Call CoprocessorEnvironment#bypass to skip default actions.
+ * If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
+ * <p>
+ * Note: Do not retain references to any Cells in actions beyond the life of this invocation.
+ * If need a Cell reference for later use, copy the cell and use that.
+ * @param c the environment provided by the region server
+ * @param checkAndMutate the CheckAndMutate object
+ * @param result the default value of the result
+ * @return the value to return to client if bypassing default processing
+ * @throws IOException if an error occurred on the coprocessor
+ */
+ default CheckAndMutateResult preCheckAndMutateAfterRowLock(
+ ObserverContext<RegionCoprocessorEnvironment> c, CheckAndMutate checkAndMutate,
+ CheckAndMutateResult result) throws IOException {
+ if (checkAndMutate.getAction() instanceof Put) {
+ boolean success;
+ if (checkAndMutate.hasFilter()) {
+ success = preCheckAndPutAfterRowLock(c, checkAndMutate.getRow(),
+ checkAndMutate.getFilter(), (Put) checkAndMutate.getAction(), result.isSuccess());
+ } else {
+ success = preCheckAndPutAfterRowLock(c, checkAndMutate.getRow(),
+ checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
+ checkAndMutate.getCompareOp(), new BinaryComparator(checkAndMutate.getValue()),
+ (Put) checkAndMutate.getAction(), result.isSuccess());
+ }
+ return new CheckAndMutateResult(success, null);
+ } else if (checkAndMutate.getAction() instanceof Delete) {
+ boolean success;
+ if (checkAndMutate.hasFilter()) {
+ success = preCheckAndDeleteAfterRowLock(c, checkAndMutate.getRow(),
+ checkAndMutate.getFilter(), (Delete) checkAndMutate.getAction(), result.isSuccess());
+ } else {
+ success = preCheckAndDeleteAfterRowLock(c, checkAndMutate.getRow(),
+ checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
+ checkAndMutate.getCompareOp(), new BinaryComparator(checkAndMutate.getValue()),
+ (Delete) checkAndMutate.getAction(), result.isSuccess());
+ }
+ return new CheckAndMutateResult(success, null);
+ }
+ return result;
+ }
+
+ /**
+ * Called after checkAndMutate
+ * <p>
+ * Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation.
+ * If need a Cell reference for later use, copy the cell and use that.
+ * @param c the environment provided by the region server
+ * @param checkAndMutate the CheckAndMutate object
+ * @param result from the checkAndMutate
+ * @return the possibly transformed returned value to return to client
+ * @throws IOException if an error occurred on the coprocessor
+ */
+ default CheckAndMutateResult postCheckAndMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+ CheckAndMutate checkAndMutate, CheckAndMutateResult result) throws IOException {
+ if (checkAndMutate.getAction() instanceof Put) {
+ boolean success;
+ if (checkAndMutate.hasFilter()) {
+ success = postCheckAndPut(c, checkAndMutate.getRow(),
+ checkAndMutate.getFilter(), (Put) checkAndMutate.getAction(), result.isSuccess());
+ } else {
+ success = postCheckAndPut(c, checkAndMutate.getRow(),
+ checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
+ checkAndMutate.getCompareOp(), new BinaryComparator(checkAndMutate.getValue()),
+ (Put) checkAndMutate.getAction(), result.isSuccess());
+ }
+ return new CheckAndMutateResult(success, null);
+ } else if (checkAndMutate.getAction() instanceof Delete) {
+ boolean success;
+ if (checkAndMutate.hasFilter()) {
+ success = postCheckAndDelete(c, checkAndMutate.getRow(),
+ checkAndMutate.getFilter(), (Delete) checkAndMutate.getAction(), result.isSuccess());
+ } else {
+ success = postCheckAndDelete(c, checkAndMutate.getRow(),
+ checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
+ checkAndMutate.getCompareOp(), new BinaryComparator(checkAndMutate.getValue()),
+ (Delete) checkAndMutate.getAction(), result.isSuccess());
+ }
+ return new CheckAndMutateResult(success, null);
+ }
+ return result;
+ }
+
+ /**
* Called before Append.
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index f62da4c..2cffa32 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -103,6 +103,8 @@ import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagUtil;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
+import org.apache.hadoop.hbase.client.CheckAndMutateResult;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.CompactionState;
import org.apache.hadoop.hbase.client.Delete;
@@ -129,6 +131,7 @@ import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterWrapper;
@@ -4249,43 +4252,103 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
@Override
+ @Deprecated
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
ByteArrayComparable comparator, TimeRange timeRange, Mutation mutation) throws IOException {
- return doCheckAndRowMutate(row, family, qualifier, op, comparator, null, timeRange, null,
- mutation);
+ CheckAndMutate checkAndMutate;
+ try {
+ CheckAndMutate.Builder builder = CheckAndMutate.newBuilder(row)
+ .ifMatches(family, qualifier, op, comparator.getValue()).timeRange(timeRange);
+ if (mutation instanceof Put) {
+ checkAndMutate = builder.build((Put) mutation);
+ } else if (mutation instanceof Delete) {
+ checkAndMutate = builder.build((Delete) mutation);
+ } else {
+ throw new DoNotRetryIOException("Unsupported mutate type: " + mutation.getClass()
+ .getSimpleName().toUpperCase());
+ }
+ } catch (IllegalArgumentException e) {
+ throw new DoNotRetryIOException(e.getMessage());
+ }
+ return checkAndMutate(checkAndMutate).isSuccess();
}
@Override
+ @Deprecated
public boolean checkAndMutate(byte[] row, Filter filter, TimeRange timeRange, Mutation mutation)
throws IOException {
- return doCheckAndRowMutate(row, null, null, null, null, filter, timeRange, null, mutation);
+ CheckAndMutate checkAndMutate;
+ try {
+ CheckAndMutate.Builder builder = CheckAndMutate.newBuilder(row).ifMatches(filter)
+ .timeRange(timeRange);
+ if (mutation instanceof Put) {
+ checkAndMutate = builder.build((Put) mutation);
+ } else if (mutation instanceof Delete) {
+ checkAndMutate = builder.build((Delete) mutation);
+ } else {
+ throw new DoNotRetryIOException("Unsupported mutate type: " + mutation.getClass()
+ .getSimpleName().toUpperCase());
+ }
+ } catch (IllegalArgumentException e) {
+ throw new DoNotRetryIOException(e.getMessage());
+ }
+ return checkAndMutate(checkAndMutate).isSuccess();
}
@Override
+ @Deprecated
public boolean checkAndRowMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
ByteArrayComparable comparator, TimeRange timeRange, RowMutations rm) throws IOException {
- return doCheckAndRowMutate(row, family, qualifier, op, comparator, null, timeRange, rm, null);
+ CheckAndMutate checkAndMutate;
+ try {
+ checkAndMutate = CheckAndMutate.newBuilder(row)
+ .ifMatches(family, qualifier, op, comparator.getValue()).timeRange(timeRange).build(rm);
+ } catch (IllegalArgumentException e) {
+ throw new DoNotRetryIOException(e.getMessage());
+ }
+ return checkAndMutate(checkAndMutate).isSuccess();
}
@Override
+ @Deprecated
public boolean checkAndRowMutate(byte[] row, Filter filter, TimeRange timeRange, RowMutations rm)
throws IOException {
- return doCheckAndRowMutate(row, null, null, null, null, filter, timeRange, rm, null);
+ CheckAndMutate checkAndMutate;
+ try {
+ checkAndMutate = CheckAndMutate.newBuilder(row).ifMatches(filter).timeRange(timeRange)
+ .build(rm);
+ } catch (IllegalArgumentException e) {
+ throw new DoNotRetryIOException(e.getMessage());
+ }
+ return checkAndMutate(checkAndMutate).isSuccess();
}
- /**
- * checkAndMutate and checkAndRowMutate are 90% the same. Rather than copy/paste, below has
- * switches in the few places where there is deviation.
- */
- private boolean doCheckAndRowMutate(byte[] row, byte[] family, byte[] qualifier,
- CompareOperator op, ByteArrayComparable comparator, Filter filter, TimeRange timeRange,
- RowMutations rowMutations, Mutation mutation)
- throws IOException {
- // Could do the below checks but seems wacky with two callers only. Just comment out for now.
- // One caller passes a Mutation, the other passes RowMutation. Presume all good so we don't
- // need these commented out checks.
- // if (rowMutations == null && mutation == null) throw new DoNotRetryIOException("Both null");
- // if (rowMutations != null && mutation != null) throw new DoNotRetryIOException("Both set");
+ @Override
+ public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException {
+ byte[] row = checkAndMutate.getRow();
+ Filter filter = null;
+ byte[] family = null;
+ byte[] qualifier = null;
+ CompareOperator op = null;
+ ByteArrayComparable comparator = null;
+ if (checkAndMutate.hasFilter()) {
+ filter = checkAndMutate.getFilter();
+ } else {
+ family = checkAndMutate.getFamily();
+ qualifier = checkAndMutate.getQualifier();
+ op = checkAndMutate.getCompareOp();
+ comparator = new BinaryComparator(checkAndMutate.getValue());
+ }
+ TimeRange timeRange = checkAndMutate.getTimeRange();
+
+ Mutation mutation = null;
+ RowMutations rowMutations = null;
+ if (checkAndMutate.getAction() instanceof Mutation) {
+ mutation = (Mutation) checkAndMutate.getAction();
+ } else {
+ rowMutations = (RowMutations) checkAndMutate.getAction();
+ }
+
if (mutation != null) {
checkMutationType(mutation);
checkRow(mutation, row);
@@ -4312,32 +4375,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
checkRow(row, "doCheckAndRowMutate");
RowLock rowLock = getRowLockInternal(get.getRow(), false, null);
try {
- if (mutation != null && this.getCoprocessorHost() != null) {
- // Call coprocessor.
- Boolean processed = null;
- if (mutation instanceof Put) {
- if (filter != null) {
- processed = this.getCoprocessorHost()
- .preCheckAndPutAfterRowLock(row, filter, (Put) mutation);
- } else {
- processed = this.getCoprocessorHost()
- .preCheckAndPutAfterRowLock(row, family, qualifier, op, comparator,
- (Put) mutation);
- }
- } else if (mutation instanceof Delete) {
- if (filter != null) {
- processed = this.getCoprocessorHost()
- .preCheckAndDeleteAfterRowLock(row, filter, (Delete) mutation);
- } else {
- processed = this.getCoprocessorHost()
- .preCheckAndDeleteAfterRowLock(row, family, qualifier, op, comparator,
- (Delete) mutation);
- }
- }
- if (processed != null) {
- return processed;
+ if (this.getCoprocessorHost() != null) {
+ CheckAndMutateResult result =
+ getCoprocessorHost().preCheckAndMutateAfterRowLock(checkAndMutate);
+ if (result != null) {
+ return result;
}
}
+
// NOTE: We used to wait here until mvcc caught up: mvcc.await();
// Supposition is that now all changes are done under row locks, then when we go to read,
// we'll get the latest on this row.
@@ -4395,10 +4440,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
mutateRow(rowMutations);
}
this.checkAndMutateChecksPassed.increment();
- return true;
+ return new CheckAndMutateResult(true, null);
}
this.checkAndMutateChecksFailed.increment();
- return false;
+ return new CheckAndMutateResult(false, null);
} finally {
rowLock.release();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
index a66ae00..afb249d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
@@ -153,6 +153,10 @@ public class MetricsRegionServer {
serverSource.updateCheckAndPut(t);
}
+ public void updateCheckAndMutate(long t) {
+ serverSource.updateCheckAndMutate(t);
+ }
+
public void updateGet(TableName tn, long t) {
if (tableMetrics != null && tn != null) {
tableMetrics.updateGet(tn, t);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index bc00ab2..7fe7f19 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseIOException;
@@ -67,6 +66,8 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
+import org.apache.hadoop.hbase.client.CheckAndMutateResult;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
@@ -78,7 +79,6 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.VersionInfoUtil;
@@ -87,10 +87,7 @@ import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
-import org.apache.hadoop.hbase.filter.ByteArrayComparable;
-import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
-import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
@@ -620,62 +617,55 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
- /**
- * Mutate a list of rows atomically.
- * @param cellScanner if non-null, the mutation data -- the Cell content.
- */
- private boolean checkAndRowMutate(final HRegion region, final List<ClientProtos.Action> actions,
- final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
- CompareOperator op, ByteArrayComparable comparator, Filter filter, TimeRange timeRange,
- RegionActionResult.Builder builder, ActivePolicyEnforcement spaceQuotaEnforcement)
+ private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions,
+ CellScanner cellScanner, Condition condition,ActivePolicyEnforcement spaceQuotaEnforcement)
throws IOException {
int countOfCompleteMutation = 0;
try {
if (!region.getRegionInfo().isMetaRegion()) {
regionServer.getMemStoreFlusher().reclaimMemStoreMemory();
}
- RowMutations rm = null;
- int i = 0;
- ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
- ClientProtos.ResultOrException.newBuilder();
+ List<Mutation> mutations = new ArrayList<>();
for (ClientProtos.Action action: actions) {
if (action.hasGet()) {
throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
action.getGet());
}
MutationType type = action.getMutation().getMutateType();
- if (rm == null) {
- rm = new RowMutations(action.getMutation().getRow().toByteArray(), actions.size());
- }
switch (type) {
case PUT:
Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner);
++countOfCompleteMutation;
checkCellSizeLimit(region, put);
spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
- rm.add(put);
+ mutations.add(put);
break;
case DELETE:
Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner);
++countOfCompleteMutation;
spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
- rm.add(del);
+ mutations.add(del);
break;
default:
throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
}
- // To unify the response format with doNonAtomicRegionMutation and read through client's
- // AsyncProcess we have to add an empty result instance per operation
- resultOrExceptionOrBuilder.clear();
- resultOrExceptionOrBuilder.setIndex(i++);
- builder.addResultOrException(
- resultOrExceptionOrBuilder.build());
}
- if (filter != null) {
- return region.checkAndRowMutate(row, filter, timeRange, rm);
+ if (mutations.size() == 0) {
+ return new CheckAndMutateResult(true, null);
} else {
- return region.checkAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm);
+ CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutations);
+ CheckAndMutateResult result = null;
+ if (region.getCoprocessorHost() != null) {
+ result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
+ }
+ if (result == null) {
+ result = region.checkAndMutate(checkAndMutate);
+ if (region.getCoprocessorHost() != null) {
+ result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
+ }
+ }
+ return result;
}
} finally {
// Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner
@@ -2804,26 +2794,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
try {
- Condition condition = request.getCondition();
- byte[] row = condition.getRow().toByteArray();
- byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
- byte[] qualifier =
- condition.hasQualifier() ? condition.getQualifier().toByteArray() : null;
- CompareOperator op = condition.hasCompareType() ?
- CompareOperator.valueOf(condition.getCompareType().name()) :
- null;
- ByteArrayComparable comparator = condition.hasComparator() ?
- ProtobufUtil.toComparator(condition.getComparator()) : null;
- Filter filter =
- condition.hasFilter() ? ProtobufUtil.toFilter(condition.getFilter()) : null;
- TimeRange timeRange = condition.hasTimeRange() ?
- ProtobufUtil.toTimeRange(condition.getTimeRange()) :
- TimeRange.allTime();
- boolean processed =
- checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family,
- qualifier, op, comparator, filter, timeRange, regionActionResultBuilder,
- spaceQuotaEnforcement);
- responseBuilder.setProcessed(processed);
+ CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
+ cellScanner, request.getCondition(), spaceQuotaEnforcement);
+ responseBuilder.setProcessed(result.isSuccess());
+ ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
+ ClientProtos.ResultOrException.newBuilder();
+ for (int i = 0; i < regionAction.getActionCount(); i++) {
+ // To unify the response format with doNonAtomicRegionMutation and read through
+ // client's AsyncProcess we have to add an empty result instance per operation
+ resultOrExceptionOrBuilder.clear();
+ resultOrExceptionOrBuilder.setIndex(i);
+ regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
+ }
} catch (IOException e) {
rpcServer.getMetrics().exception(e);
// As it's an atomic operation with a condition, we may expect it's a global failure.
@@ -2865,80 +2847,24 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
try {
if (regionAction.hasCondition()) {
try {
- Condition condition = regionAction.getCondition();
- byte[] row = condition.getRow().toByteArray();
- byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
- byte[] qualifier = condition.hasQualifier() ?
- condition.getQualifier().toByteArray() : null;
- CompareOperator op = condition.hasCompareType() ?
- CompareOperator.valueOf(condition.getCompareType().name()) : null;
- ByteArrayComparable comparator = condition.hasComparator() ?
- ProtobufUtil.toComparator(condition.getComparator()) : null;
- Filter filter = condition.hasFilter() ?
- ProtobufUtil.toFilter(condition.getFilter()) : null;
- TimeRange timeRange = condition.hasTimeRange() ?
- ProtobufUtil.toTimeRange(condition.getTimeRange()) : TimeRange.allTime();
-
- boolean processed;
- if (regionAction.hasAtomic() && regionAction.getAtomic()) {
- // RowMutations
- processed =
- checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family,
- qualifier, op, comparator, filter, timeRange, regionActionResultBuilder,
- spaceQuotaEnforcement);
- } else {
- if (regionAction.getActionList().isEmpty()) {
- // If the region action list is empty, do nothing.
- regionActionResultBuilder.setProcessed(true);
+ CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
+ cellScanner, regionAction.getCondition(), spaceQuotaEnforcement);
+ regionActionResultBuilder.setProcessed(result.isSuccess());
+ ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
+ ClientProtos.ResultOrException.newBuilder();
+ for (int i = 0; i < regionAction.getActionCount(); i++) {
+ if (i == 0 && result.getResult() != null) {
+ resultOrExceptionOrBuilder.setIndex(i);
+ regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder
+ .setResult(ProtobufUtil.toResult(result.getResult())).build());
continue;
}
- Action action = regionAction.getAction(0);
- if (action.hasGet()) {
- throw new DoNotRetryIOException("CheckAndMutate doesn't support GET="
- + action.getGet());
- }
- MutationProto mutation = action.getMutation();
- switch (mutation.getMutateType()) {
- case PUT:
- Put put = ProtobufUtil.toPut(mutation, cellScanner);
- checkCellSizeLimit(region, put);
- // Throws an exception when violated
- spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
- quota.addMutation(put);
-
- if (filter != null) {
- processed = region.checkAndMutate(row, filter, timeRange, put);
- } else {
- processed = region.checkAndMutate(row, family, qualifier, op, comparator,
- timeRange, put);
- }
- break;
-
- case DELETE:
- Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
- checkCellSizeLimit(region, delete);
- spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete);
- quota.addMutation(delete);
-
- if (filter != null) {
- processed = region.checkAndMutate(row, filter, timeRange, delete);
- } else {
- processed = region.checkAndMutate(row, family, qualifier, op, comparator,
- timeRange, delete);
- }
- break;
-
- default:
- throw new DoNotRetryIOException("CheckAndMutate doesn't support "
- + mutation.getMutateType());
- }
-
// To unify the response format with doNonAtomicRegionMutation and read through
// client's AsyncProcess we have to add an empty result instance per operation
- regionActionResultBuilder.addResultOrException(
- ClientProtos.ResultOrException.newBuilder().setIndex(0).build());
+ resultOrExceptionOrBuilder.clear();
+ resultOrExceptionOrBuilder.setIndex(i);
+ regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
}
- regionActionResultBuilder.setProcessed(processed);
} catch (IOException e) {
rpcServer.getMetrics().exception(e);
// As it's an atomic operation with a condition, we may expect it's a global failure.
@@ -3039,10 +2965,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
OperationQuota quota = null;
RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
- ActivePolicyEnforcement spaceQuotaEnforcement = null;
- MutationType type = null;
- HRegion region = null;
- long before = EnvironmentEdgeManager.currentTime();
// Clear scanner so we are not holding on to reference across call.
if (controller != null) {
controller.setCellScanner(null);
@@ -3051,143 +2973,58 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
checkOpen();
requestCount.increment();
rpcMutateRequestCount.increment();
- region = getRegion(request.getRegion());
+ HRegion region = getRegion(request.getRegion());
MutateResponse.Builder builder = MutateResponse.newBuilder();
MutationProto mutation = request.getMutation();
if (!region.getRegionInfo().isMetaRegion()) {
regionServer.getMemStoreFlusher().reclaimMemStoreMemory();
}
long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
- Result r = null;
- Boolean processed = null;
- type = mutation.getMutateType();
-
quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
- spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
-
- switch (type) {
- case APPEND:
- // TODO: this doesn't actually check anything.
- r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
- break;
- case INCREMENT:
- // TODO: this doesn't actually check anything.
- r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
- break;
- case PUT:
- Put put = ProtobufUtil.toPut(mutation, cellScanner);
- checkCellSizeLimit(region, put);
- // Throws an exception when violated
- spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
- quota.addMutation(put);
- if (request.hasCondition()) {
- Condition condition = request.getCondition();
- byte[] row = condition.getRow().toByteArray();
- byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
- byte[] qualifier = condition.hasQualifier() ?
- condition.getQualifier().toByteArray() : null;
- CompareOperator op = condition.hasCompareType() ?
- CompareOperator.valueOf(condition.getCompareType().name()) : null;
- ByteArrayComparable comparator = condition.hasComparator() ?
- ProtobufUtil.toComparator(condition.getComparator()) : null;
- Filter filter = condition.hasFilter() ?
- ProtobufUtil.toFilter(condition.getFilter()) : null;
- TimeRange timeRange = condition.hasTimeRange() ?
- ProtobufUtil.toTimeRange(condition.getTimeRange()) :
- TimeRange.allTime();
- if (region.getCoprocessorHost() != null) {
- if (filter != null) {
- processed = region.getCoprocessorHost().preCheckAndPut(row, filter, put);
- } else {
- processed = region.getCoprocessorHost()
- .preCheckAndPut(row, family, qualifier, op, comparator, put);
- }
- }
- if (processed == null) {
- boolean result;
- if (filter != null) {
- result = region.checkAndMutate(row, filter, timeRange, put);
- } else {
- result = region.checkAndMutate(row, family, qualifier, op, comparator, timeRange,
- put);
- }
- if (region.getCoprocessorHost() != null) {
- if (filter != null) {
- result = region.getCoprocessorHost().postCheckAndPut(row, filter, put, result);
- } else {
- result = region.getCoprocessorHost()
- .postCheckAndPut(row, family, qualifier, op, comparator, put, result);
- }
- }
- processed = result;
- }
- } else {
- region.put(put);
+ ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager()
+ .getActiveEnforcements();
+
+ if (request.hasCondition()) {
+ CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner,
+ request.getCondition(), spaceQuotaEnforcement);
+ builder.setProcessed(result.isSuccess());
+ boolean clientCellBlockSupported = isClientCellBlockSupport(context);
+ addResult(builder, result.getResult(), controller, clientCellBlockSupported);
+ if (clientCellBlockSupported) {
+ addSize(context, result.getResult(), null);
+ }
+ } else {
+ Result r = null;
+ Boolean processed = null;
+ MutationType type = mutation.getMutateType();
+ switch (type) {
+ case APPEND:
+ // TODO: this doesn't actually check anything.
+ r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
+ break;
+ case INCREMENT:
+ // TODO: this doesn't actually check anything.
+ r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
+ break;
+ case PUT:
+ put(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
processed = Boolean.TRUE;
- }
- break;
- case DELETE:
- Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
- checkCellSizeLimit(region, delete);
- spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete);
- quota.addMutation(delete);
- if (request.hasCondition()) {
- Condition condition = request.getCondition();
- byte[] row = condition.getRow().toByteArray();
- byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
- byte[] qualifier = condition.hasQualifier() ?
- condition.getQualifier().toByteArray() : null;
- CompareOperator op = condition.hasCompareType() ?
- CompareOperator.valueOf(condition.getCompareType().name()) : null;
- ByteArrayComparable comparator = condition.hasComparator() ?
- ProtobufUtil.toComparator(condition.getComparator()) : null;
- Filter filter = condition.hasFilter() ?
- ProtobufUtil.toFilter(condition.getFilter()) : null;
- TimeRange timeRange = condition.hasTimeRange() ?
- ProtobufUtil.toTimeRange(condition.getTimeRange()) :
- TimeRange.allTime();
- if (region.getCoprocessorHost() != null) {
- if (filter != null) {
- processed = region.getCoprocessorHost().preCheckAndDelete(row, filter, delete);
- } else {
- processed = region.getCoprocessorHost()
- .preCheckAndDelete(row, family, qualifier, op, comparator, delete);
- }
- }
- if (processed == null) {
- boolean result;
- if (filter != null) {
- result = region.checkAndMutate(row, filter, timeRange, delete);
- } else {
- result = region.checkAndMutate(row, family, qualifier, op, comparator, timeRange,
- delete);
- }
- if (region.getCoprocessorHost() != null) {
- if (filter != null) {
- result = region.getCoprocessorHost().postCheckAndDelete(row, filter, delete,
- result);
- } else {
- result = region.getCoprocessorHost()
- .postCheckAndDelete(row, family, qualifier, op, comparator, delete, result);
- }
- }
- processed = result;
- }
- } else {
- region.delete(delete);
+ break;
+ case DELETE:
+ delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
processed = Boolean.TRUE;
- }
- break;
- default:
- throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
- }
- if (processed != null) {
- builder.setProcessed(processed.booleanValue());
- }
- boolean clientCellBlockSupported = isClientCellBlockSupport(context);
- addResult(builder, r, controller, clientCellBlockSupported);
- if (clientCellBlockSupported) {
- addSize(context, r, null);
+ break;
+ default:
+ throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
+ }
+ if (processed != null) {
+ builder.setProcessed(processed);
+ }
+ boolean clientCellBlockSupported = isClientCellBlockSupport(context);
+ addResult(builder, r, controller, clientCellBlockSupported);
+ if (clientCellBlockSupported) {
+ addSize(context, r, null);
+ }
}
return builder.build();
} catch (IOException ie) {
@@ -3197,32 +3034,79 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (quota != null) {
quota.close();
}
- // Update metrics
- final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
- if (metricsRegionServer != null && type != null) {
- long after = EnvironmentEdgeManager.currentTime();
- switch (type) {
- case DELETE:
- if (request.hasCondition()) {
- metricsRegionServer.updateCheckAndDelete(after - before);
- } else {
- metricsRegionServer.updateDelete(
- region == null ? null : region.getRegionInfo().getTable(), after - before);
- }
- break;
+ }
+ }
+
+ private void put(HRegion region, OperationQuota quota, MutationProto mutation,
+ CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException {
+ long before = EnvironmentEdgeManager.currentTime();
+ Put put = ProtobufUtil.toPut(mutation, cellScanner);
+ checkCellSizeLimit(region, put);
+ spaceQuota.getPolicyEnforcement(region).check(put);
+ quota.addMutation(put);
+ region.put(put);
+
+ MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
+ if (metricsRegionServer != null) {
+ long after = EnvironmentEdgeManager.currentTime();
+ metricsRegionServer.updatePut(region.getRegionInfo().getTable(), after - before);
+ }
+ }
+
+ private void delete(HRegion region, OperationQuota quota, MutationProto mutation,
+ CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException {
+ long before = EnvironmentEdgeManager.currentTime();
+ Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
+ checkCellSizeLimit(region, delete);
+ spaceQuota.getPolicyEnforcement(region).check(delete);
+ quota.addMutation(delete);
+ region.delete(delete);
+
+ MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
+ if (metricsRegionServer != null) {
+ long after = EnvironmentEdgeManager.currentTime();
+ metricsRegionServer.updateDelete(region.getRegionInfo().getTable(), after - before);
+ }
+ }
+
+ private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota,
+ MutationProto mutation, CellScanner cellScanner, Condition condition,
+ ActivePolicyEnforcement spaceQuota) throws IOException {
+ long before = EnvironmentEdgeManager.currentTime();
+ CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation,
+ cellScanner);
+ checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction());
+ spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction());
+ quota.addMutation((Mutation) checkAndMutate.getAction());
+
+ CheckAndMutateResult result = null;
+ if (region.getCoprocessorHost() != null) {
+ result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
+ }
+ if (result == null) {
+ result = region.checkAndMutate(checkAndMutate);
+ if (region.getCoprocessorHost() != null) {
+ result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
+ }
+ }
+ MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
+ if (metricsRegionServer != null) {
+ long after = EnvironmentEdgeManager.currentTime();
+ metricsRegionServer.updateCheckAndMutate(after - before);
+
+ MutationType type = mutation.getMutateType();
+ switch (type) {
case PUT:
- if (request.hasCondition()) {
- metricsRegionServer.updateCheckAndPut(after - before);
- } else {
- metricsRegionServer.updatePut(
- region == null ? null : region.getRegionInfo().getTable(),after - before);
- }
+ metricsRegionServer.updateCheckAndPut(after - before);
+ break;
+ case DELETE:
+ metricsRegionServer.updateCheckAndDelete(after - before);
break;
default:
break;
- }
}
}
+ return result;
}
// This is used to keep compatible with the old client implementation. Consider remove it if we
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index 1790468..d03d19f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
+import org.apache.hadoop.hbase.client.CheckAndMutateResult;
import org.apache.hadoop.hbase.client.CompactionState;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
@@ -308,7 +310,11 @@ public interface Region extends ConfigurationObserver {
* @param comparator the expected value
* @param mutation data to put if check succeeds
* @return true if mutation was applied, false otherwise
+ *
+ * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+ * {@link #checkAndMutate(CheckAndMutate)} instead.
*/
+ @Deprecated
default boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier, CompareOperator op,
ByteArrayComparable comparator, Mutation mutation) throws IOException {
return checkAndMutate(row, family, qualifier, op, comparator, TimeRange.allTime(), mutation);
@@ -327,7 +333,11 @@ public interface Region extends ConfigurationObserver {
* @param mutation data to put if check succeeds
* @param timeRange time range to check
* @return true if mutation was applied, false otherwise
+ *
+ * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+ * {@link #checkAndMutate(CheckAndMutate)} instead.
*/
+ @Deprecated
boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier, CompareOperator op,
ByteArrayComparable comparator, TimeRange timeRange, Mutation mutation) throws IOException;
@@ -338,7 +348,11 @@ public interface Region extends ConfigurationObserver {
* @param filter the filter
* @param mutation data to put if check succeeds
* @return true if mutation was applied, false otherwise
+ *
+ * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+ * {@link #checkAndMutate(CheckAndMutate)} instead.
*/
+ @Deprecated
default boolean checkAndMutate(byte [] row, Filter filter, Mutation mutation)
throws IOException {
return checkAndMutate(row, filter, TimeRange.allTime(), mutation);
@@ -352,7 +366,11 @@ public interface Region extends ConfigurationObserver {
* @param mutation data to put if check succeeds
* @param timeRange time range to check
* @return true if mutation was applied, false otherwise
+ *
+ * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+ * {@link #checkAndMutate(CheckAndMutate)} instead.
*/
+ @Deprecated
boolean checkAndMutate(byte [] row, Filter filter, TimeRange timeRange, Mutation mutation)
throws IOException;
@@ -368,7 +386,11 @@ public interface Region extends ConfigurationObserver {
* @param comparator the expected value
* @param mutations data to put if check succeeds
* @return true if mutations were applied, false otherwise
+ *
+ * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+ * {@link #checkAndMutate(CheckAndMutate)} instead.
*/
+ @Deprecated
default boolean checkAndRowMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
ByteArrayComparable comparator, RowMutations mutations) throws IOException {
return checkAndRowMutate(row, family, qualifier, op, comparator, TimeRange.allTime(),
@@ -388,7 +410,11 @@ public interface Region extends ConfigurationObserver {
* @param mutations data to put if check succeeds
* @param timeRange time range to check
* @return true if mutations were applied, false otherwise
+ *
+ * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+ * {@link #checkAndMutate(CheckAndMutate)} instead.
*/
+ @Deprecated
boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier, CompareOperator op,
ByteArrayComparable comparator, TimeRange timeRange, RowMutations mutations)
throws IOException;
@@ -401,7 +427,11 @@ public interface Region extends ConfigurationObserver {
* @param filter the filter
* @param mutations data to put if check succeeds
* @return true if mutations were applied, false otherwise
+ *
+ * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+ * {@link #checkAndMutate(CheckAndMutate)} instead.
*/
+ @Deprecated
default boolean checkAndRowMutate(byte[] row, Filter filter, RowMutations mutations)
throws IOException {
return checkAndRowMutate(row, filter, TimeRange.allTime(), mutations);
@@ -416,11 +446,25 @@ public interface Region extends ConfigurationObserver {
* @param mutations data to put if check succeeds
* @param timeRange time range to check
* @return true if mutations were applied, false otherwise
+ *
+ * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+ * {@link #checkAndMutate(CheckAndMutate)} instead.
*/
+ @Deprecated
boolean checkAndRowMutate(byte [] row, Filter filter, TimeRange timeRange,
RowMutations mutations) throws IOException;
/**
+ * Atomically checks if a row matches the conditions and if it does, it performs the actions.
+ * Use to do many mutations on a single row. Use checkAndMutate to do one checkAndMutate at a
+ * time.
+ * @param checkAndMutate the CheckAndMutate object
+ * @return true if mutations were applied, false otherwise
+ * @throws IOException if an error occurred in this method
+ */
+ CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException;
+
+ /**
* Deletes the specified cells/row.
* @param delete
* @throws IOException
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index 93c314d..56664db 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.RawCellBuilder;
@@ -42,6 +41,8 @@ import org.apache.hadoop.hbase.RawCellBuilderFactory;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SharedConnection;
import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
+import org.apache.hadoop.hbase.client.CheckAndMutateResult;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
@@ -67,8 +68,6 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
-import org.apache.hadoop.hbase.filter.ByteArrayComparable;
-import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -1051,322 +1050,70 @@ public class RegionCoprocessorHost
/**
* Supports Coprocessor 'bypass'.
- * @param row row to check
- * @param family column family
- * @param qualifier column qualifier
- * @param op the comparison operation
- * @param comparator the comparator
- * @param put data to put if check succeeds
+ * @param checkAndMutate the CheckAndMutate object
* @return true or false to return to client if default processing should be bypassed, or null
- * otherwise
- */
- public Boolean preCheckAndPut(final byte [] row, final byte [] family,
- final byte [] qualifier, final CompareOperator op,
- final ByteArrayComparable comparator, final Put put)
- throws IOException {
- boolean bypassable = true;
- boolean defaultResult = false;
- if (coprocEnvironments.isEmpty()) {
- return null;
- }
- return execOperationWithResult(
- new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
- defaultResult, bypassable) {
- @Override
- public Boolean call(RegionObserver observer) throws IOException {
- return observer.preCheckAndPut(this, row, family, qualifier,
- op, comparator, put, getResult());
- }
- });
- }
-
- /**
- * Supports Coprocessor 'bypass'.
- * @param row row to check
- * @param filter filter
- * @param put data to put if check succeeds
- * @return true or false to return to client if default processing should be bypassed, or null
- * otherwise
+ * otherwise
+ * @throws IOException if an error occurred on the coprocessor
*/
- public Boolean preCheckAndPut(final byte [] row, final Filter filter, final Put put)
+ public CheckAndMutateResult preCheckAndMutate(CheckAndMutate checkAndMutate)
throws IOException {
boolean bypassable = true;
- boolean defaultResult = false;
+ CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null);
if (coprocEnvironments.isEmpty()) {
return null;
}
return execOperationWithResult(
- new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
- defaultResult, bypassable) {
+ new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(
+ regionObserverGetter, defaultResult, bypassable) {
@Override
- public Boolean call(RegionObserver observer) throws IOException {
- return observer.preCheckAndPut(this, row, filter, put, getResult());
+ public CheckAndMutateResult call(RegionObserver observer) throws IOException {
+ return observer.preCheckAndMutate(this, checkAndMutate, getResult());
}
});
}
/**
* Supports Coprocessor 'bypass'.
- * @param row row to check
- * @param family column family
- * @param qualifier column qualifier
- * @param op the comparison operation
- * @param comparator the comparator
- * @param put data to put if check succeeds
- * @return true or false to return to client if default processing should be bypassed, or null
- * otherwise
- */
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL",
- justification="Null is legit")
- public Boolean preCheckAndPutAfterRowLock(
- final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op,
- final ByteArrayComparable comparator, final Put put) throws IOException {
- boolean bypassable = true;
- boolean defaultResult = false;
- if (coprocEnvironments.isEmpty()) {
- return null;
- }
- return execOperationWithResult(
- new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
- defaultResult, bypassable) {
- @Override
- public Boolean call(RegionObserver observer) throws IOException {
- return observer.preCheckAndPutAfterRowLock(this, row, family, qualifier,
- op, comparator, put, getResult());
- }
- });
- }
-
- /**
- * Supports Coprocessor 'bypass'.
- * @param row row to check
- * @param filter filter
- * @param put data to put if check succeeds
+ * @param checkAndMutate the CheckAndMutate object
* @return true or false to return to client if default processing should be bypassed, or null
* otherwise
+ * @throws IOException if an error occurred on the coprocessor
*/
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL",
- justification="Null is legit")
- public Boolean preCheckAndPutAfterRowLock(
- final byte[] row, final Filter filter, final Put put) throws IOException {
+ public CheckAndMutateResult preCheckAndMutateAfterRowLock(CheckAndMutate checkAndMutate)
+ throws IOException {
boolean bypassable = true;
- boolean defaultResult = false;
+ CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null);
if (coprocEnvironments.isEmpty()) {
return null;
}
return execOperationWithResult(
- new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
- defaultResult, bypassable) {
+ new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(
+ regionObserverGetter, defaultResult, bypassable) {
@Override
- public Boolean call(RegionObserver observer) throws IOException {
- return observer.preCheckAndPutAfterRowLock(this, row, filter, put, getResult());
+ public CheckAndMutateResult call(RegionObserver observer) throws IOException {
+ return observer.preCheckAndMutateAfterRowLock(this, checkAndMutate, getResult());
}
});
}
/**
- * @param row row to check
- * @param family column family
- * @param qualifier column qualifier
- * @param op the comparison operation
- * @param comparator the comparator
- * @param put data to put if check succeeds
- * @throws IOException e
- */
- public boolean postCheckAndPut(final byte [] row, final byte [] family,
- final byte [] qualifier, final CompareOperator op,
- final ByteArrayComparable comparator, final Put put,
- boolean result) throws IOException {
- if (this.coprocEnvironments.isEmpty()) {
- return result;
- }
- return execOperationWithResult(
- new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
- @Override
- public Boolean call(RegionObserver observer) throws IOException {
- return observer.postCheckAndPut(this, row, family, qualifier,
- op, comparator, put, getResult());
- }
- });
- }
-
- /**
- * @param row row to check
- * @param filter filter
- * @param put data to put if check succeeds
- * @throws IOException e
- */
- public boolean postCheckAndPut(final byte [] row, final Filter filter, final Put put,
- boolean result) throws IOException {
- if (this.coprocEnvironments.isEmpty()) {
- return result;
- }
- return execOperationWithResult(
- new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
- @Override
- public Boolean call(RegionObserver observer) throws IOException {
- return observer.postCheckAndPut(this, row, filter, put, getResult());
- }
- });
- }
-
- /**
- * Supports Coprocessor 'bypass'.
- * @param row row to check
- * @param family column family
- * @param qualifier column qualifier
- * @param op the comparison operation
- * @param comparator the comparator
- * @param delete delete to commit if check succeeds
- * @return true or false to return to client if default processing should be bypassed, or null
- * otherwise
- */
- public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
- final byte [] qualifier, final CompareOperator op,
- final ByteArrayComparable comparator, final Delete delete)
- throws IOException {
- boolean bypassable = true;
- boolean defaultResult = false;
- if (coprocEnvironments.isEmpty()) {
- return null;
- }
- return execOperationWithResult(
- new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
- defaultResult, bypassable) {
- @Override
- public Boolean call(RegionObserver observer) throws IOException {
- return observer.preCheckAndDelete(this, row, family,
- qualifier, op, comparator, delete, getResult());
- }
- });
- }
-
- /**
- * Supports Coprocessor 'bypass'.
- * @param row row to check
- * @param filter filter
- * @param delete delete to commit if check succeeds
+ * @param checkAndMutate the CheckAndMutate object
+ * @param result the result returned by the checkAndMutate
* @return true or false to return to client if default processing should be bypassed, or null
* otherwise
+ * @throws IOException if an error occurred on the coprocessor
*/
- public Boolean preCheckAndDelete(final byte [] row, final Filter filter, final Delete delete)
- throws IOException {
- boolean bypassable = true;
- boolean defaultResult = false;
- if (coprocEnvironments.isEmpty()) {
- return null;
- }
- return execOperationWithResult(
- new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
- defaultResult, bypassable) {
- @Override
- public Boolean call(RegionObserver observer) throws IOException {
- return observer.preCheckAndDelete(this, row, filter, delete, getResult());
- }
- });
- }
-
- /**
- * Supports Coprocessor 'bypass'.
- * @param row row to check
- * @param family column family
- * @param qualifier column qualifier
- * @param op the comparison operation
- * @param comparator the comparator
- * @param delete delete to commit if check succeeds
- * @return true or false to return to client if default processing should be bypassed,
- * or null otherwise
- */
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL",
- justification="Null is legit")
- public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family,
- final byte[] qualifier, final CompareOperator op, final ByteArrayComparable comparator,
- final Delete delete) throws IOException {
- boolean bypassable = true;
- boolean defaultResult = false;
- if (coprocEnvironments.isEmpty()) {
- return null;
- }
- return execOperationWithResult(
- new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
- defaultResult, bypassable) {
- @Override
- public Boolean call(RegionObserver observer) throws IOException {
- return observer.preCheckAndDeleteAfterRowLock(this, row,
- family, qualifier, op, comparator, delete, getResult());
- }
- });
- }
-
- /**
- * Supports Coprocessor 'bypass'.
- * @param row row to check
- * @param filter filter
- * @param delete delete to commit if check succeeds
- * @return true or false to return to client if default processing should be bypassed,
- * or null otherwise
- */
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL",
- justification="Null is legit")
- public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final Filter filter,
- final Delete delete) throws IOException {
- boolean bypassable = true;
- boolean defaultResult = false;
- if (coprocEnvironments.isEmpty()) {
- return null;
- }
- return execOperationWithResult(
- new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
- defaultResult, bypassable) {
- @Override
- public Boolean call(RegionObserver observer) throws IOException {
- return observer.preCheckAndDeleteAfterRowLock(this, row, filter, delete, getResult());
- }
- });
- }
-
- /**
- * @param row row to check
- * @param family column family
- * @param qualifier column qualifier
- * @param op the comparison operation
- * @param comparator the comparator
- * @param delete delete to commit if check succeeds
- * @throws IOException e
- */
- public boolean postCheckAndDelete(final byte [] row, final byte [] family,
- final byte [] qualifier, final CompareOperator op,
- final ByteArrayComparable comparator, final Delete delete,
- boolean result) throws IOException {
- if (this.coprocEnvironments.isEmpty()) {
- return result;
- }
- return execOperationWithResult(
- new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
- @Override
- public Boolean call(RegionObserver observer) throws IOException {
- return observer.postCheckAndDelete(this, row, family,
- qualifier, op, comparator, delete, getResult());
- }
- });
- }
-
- /**
- * @param row row to check
- * @param filter filter
- * @param delete delete to commit if check succeeds
- * @throws IOException e
- */
- public boolean postCheckAndDelete(final byte [] row, final Filter filter, final Delete delete,
- boolean result) throws IOException {
+ public CheckAndMutateResult postCheckAndMutate(CheckAndMutate checkAndMutate,
+ CheckAndMutateResult result) throws IOException {
if (this.coprocEnvironments.isEmpty()) {
return result;
}
return execOperationWithResult(
- new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
+ new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(
+ regionObserverGetter, result) {
@Override
- public Boolean call(RegionObserver observer) throws IOException {
- return observer.postCheckAndDelete(this, row, filter, delete, getResult());
+ public CheckAndMutateResult call(RegionObserver observer) throws IOException {
+ return observer.postCheckAndMutate(this, checkAndMutate, getResult());
}
});
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
index ef62901..0727385 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
@@ -40,6 +40,8 @@ import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.KeepDeletedCells;
import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
+import org.apache.hadoop.hbase.client.CheckAndMutateResult;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
@@ -113,6 +115,9 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
final AtomicInteger ctPreCheckAndDeleteWithFilterAfterRowLock = new AtomicInteger(0);
final AtomicInteger ctPostCheckAndDelete = new AtomicInteger(0);
final AtomicInteger ctPostCheckAndDeleteWithFilter = new AtomicInteger(0);
+ final AtomicInteger ctPreCheckAndMutate = new AtomicInteger(0);
+ final AtomicInteger ctPreCheckAndMutateAfterRowLock = new AtomicInteger(0);
+ final AtomicInteger ctPostCheckAndMutate = new AtomicInteger(0);
final AtomicInteger ctPreScannerNext = new AtomicInteger(0);
final AtomicInteger ctPostScannerNext = new AtomicInteger(0);
final AtomicInteger ctPostScannerFilterRow = new AtomicInteger(0);
@@ -584,6 +589,28 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
}
@Override
+ public CheckAndMutateResult preCheckAndMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+ CheckAndMutate checkAndMutate, CheckAndMutateResult result) throws IOException {
+ ctPreCheckAndMutate.incrementAndGet();
+ return RegionObserver.super.preCheckAndMutate(c, checkAndMutate, result);
+ }
+
+ @Override
+ public CheckAndMutateResult preCheckAndMutateAfterRowLock(
+ ObserverContext<RegionCoprocessorEnvironment> c, CheckAndMutate checkAndMutate,
+ CheckAndMutateResult result) throws IOException {
+ ctPreCheckAndMutateAfterRowLock.incrementAndGet();
+ return RegionObserver.super.preCheckAndMutateAfterRowLock(c, checkAndMutate, result);
+ }
+
+ @Override
+ public CheckAndMutateResult postCheckAndMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+ CheckAndMutate checkAndMutate, CheckAndMutateResult result) throws IOException {
+ ctPostCheckAndMutate.incrementAndGet();
+ return RegionObserver.super.postCheckAndMutate(c, checkAndMutate, result);
+ }
+
+ @Override
public Result preAppendAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> e,
Append append) throws IOException {
ctPreAppendAfterRowLock.incrementAndGet();
@@ -826,6 +853,18 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
return ctPostCheckAndDeleteWithFilter.get();
}
+ public int getPreCheckAndMutate() {
+ return ctPreCheckAndMutate.get();
+ }
+
+ public int getPreCheckAndMutateAfterRowLock() {
+ return ctPreCheckAndMutateAfterRowLock.get();
+ }
+
+ public int getPostCheckAndMutate() {
+ return ctPostCheckAndMutate.get();
+ }
+
public boolean hadPreIncrement() {
return ctPreIncrement.get() > 0;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
index 5b77027..a3502bf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
@@ -45,11 +45,13 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionLocator;
@@ -267,15 +269,17 @@ public class TestRegionObserverInterface {
verifyMethodResult(SimpleRegionObserver.class,
new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut",
"getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock",
- "getPostCheckAndPutWithFilter" },
- tableName, new Integer[] { 0, 0, 0, 0, 0, 0 });
+ "getPostCheckAndPutWithFilter", "getPreCheckAndMutate",
+ "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
+ tableName, new Integer[] { 0, 0, 0, 0, 0, 0, 0, 0, 0 });
table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenPut(p);
verifyMethodResult(SimpleRegionObserver.class,
new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut",
"getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock",
- "getPostCheckAndPutWithFilter" },
- tableName, new Integer[] { 1, 1, 1, 0, 0, 0 });
+ "getPostCheckAndPutWithFilter", "getPreCheckAndMutate",
+ "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
+ tableName, new Integer[] { 1, 1, 1, 0, 0, 0, 1, 1, 1 });
table.checkAndMutate(Bytes.toBytes(0),
new SingleColumnValueFilter(A, A, CompareOperator.EQUAL, A))
@@ -283,8 +287,9 @@ public class TestRegionObserverInterface {
verifyMethodResult(SimpleRegionObserver.class,
new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut",
"getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock",
- "getPostCheckAndPutWithFilter" },
- tableName, new Integer[] { 1, 1, 1, 1, 1, 1 });
+ "getPostCheckAndPutWithFilter", "getPreCheckAndMutate",
+ "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
+ tableName, new Integer[] { 1, 1, 1, 1, 1, 1, 2, 2, 2 });
} finally {
util.deleteTable(tableName);
}
@@ -304,16 +309,18 @@ public class TestRegionObserverInterface {
SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete",
"getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete",
"getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock",
- "getPostCheckAndDeleteWithFilter" },
- tableName, new Integer[] { 0, 0, 0, 0, 0, 0 });
+ "getPostCheckAndDeleteWithFilter", "getPreCheckAndMutate",
+ "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
+ tableName, new Integer[] { 0, 0, 0, 0, 0, 0, 0, 0, 0 });
table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenDelete(d);
verifyMethodResult(
SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete",
"getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete",
"getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock",
- "getPostCheckAndDeleteWithFilter" },
- tableName, new Integer[] { 1, 1, 1, 0, 0, 0 });
+ "getPostCheckAndDeleteWithFilter", "getPreCheckAndMutate",
+ "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
+ tableName, new Integer[] { 1, 1, 1, 0, 0, 0, 1, 1, 1 });
table.checkAndMutate(Bytes.toBytes(0),
new SingleColumnValueFilter(A, A, CompareOperator.EQUAL, A))
@@ -322,8 +329,50 @@ public class TestRegionObserverInterface {
SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete",
"getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete",
"getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock",
- "getPostCheckAndDeleteWithFilter" },
- tableName, new Integer[] { 1, 1, 1, 1, 1, 1 });
+ "getPostCheckAndDeleteWithFilter", "getPreCheckAndMutate",
+ "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
+ tableName, new Integer[] { 1, 1, 1, 1, 1, 1, 2, 2, 2 });
+ } finally {
+ util.deleteTable(tableName);
+ table.close();
+ }
+ }
+
+ @Test
+ public void testCheckAndMutateWithRowMutationsHooks() throws Exception {
+ final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." +
+ name.getMethodName());
+ Table table = util.createTable(tableName, new byte[][] { A, B, C });
+ try {
+ byte[] row = Bytes.toBytes(0);
+
+ Put p = new Put(row).addColumn(A, A, A);
+ table.put(p);
+ verifyMethodResult(
+ SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
+ "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
+ tableName, new Integer[] { 0, 0, 0 });
+
+ table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifEquals(A, A, A)
+ .build(new RowMutations(row)
+ .add((Mutation) new Put(row).addColumn(B, B, B))
+ .add((Mutation) new Delete(row))));
+ verifyMethodResult(
+ SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
+ "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
+ tableName, new Integer[] { 1, 1, 1 });
+
+ Object[] result = new Object[2];
+ table.batch(Arrays.asList(p, CheckAndMutate.newBuilder(row)
+ .ifEquals(A, A, A)
+ .build(new RowMutations(row)
+ .add((Mutation) new Put(row).addColumn(B, B, B))
+ .add((Mutation) new Delete(row)))), result);
+ verifyMethodResult(
+ SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
+ "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
+ tableName, new Integer[] { 2, 2, 2 });
} finally {
util.deleteTable(tableName);
table.close();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 48b1c0a..803a33b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -99,6 +99,8 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
+import org.apache.hadoop.hbase.client.CheckAndMutateResult;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Delete;
@@ -1775,6 +1777,7 @@ public class TestHRegion {
// checkAndMutate tests
// ////////////////////////////////////////////////////////////////////////////
@Test
+ @Deprecated
public void testCheckAndMutate_WithEmptyRowValue() throws IOException {
byte[] row1 = Bytes.toBytes("row1");
byte[] fam1 = Bytes.toBytes("fam1");
@@ -1844,6 +1847,7 @@ public class TestHRegion {
}
@Test
+ @Deprecated
public void testCheckAndMutate_WithWrongValue() throws IOException {
byte[] row1 = Bytes.toBytes("row1");
byte[] fam1 = Bytes.toBytes("fam1");
@@ -1893,6 +1897,7 @@ public class TestHRegion {
}
@Test
+ @Deprecated
public void testCheckAndMutate_WithCorrectValue() throws IOException {
byte[] row1 = Bytes.toBytes("row1");
byte[] fam1 = Bytes.toBytes("fam1");
@@ -1941,6 +1946,7 @@ public class TestHRegion {
}
@Test
+ @Deprecated
public void testCheckAndMutate_WithNonEqualCompareOp() throws IOException {
byte[] row1 = Bytes.toBytes("row1");
byte[] fam1 = Bytes.toBytes("fam1");
@@ -2030,6 +2036,7 @@ public class TestHRegion {
}
@Test
+ @Deprecated
public void testCheckAndPut_ThatPutWasWritten() throws IOException {
byte[] row1 = Bytes.toBytes("row1");
byte[] fam1 = Bytes.toBytes("fam1");
@@ -2071,6 +2078,7 @@ public class TestHRegion {
}
@Test
+ @Deprecated
public void testCheckAndPut_wrongRowInPut() throws IOException {
this.region = initHRegion(tableName, method, CONF, COLUMNS);
Put put = new Put(row2);
@@ -2085,6 +2093,7 @@ public class TestHRegion {
}
@Test
+ @Deprecated
public void testCheckAndDelete_ThatDeleteWasWritten() throws IOException {
byte[] row1 = Bytes.toBytes("row1");
byte[] fam1 = Bytes.toBytes("fam1");
@@ -2158,6 +2167,7 @@ public class TestHRegion {
}
@Test
+ @Deprecated
public void testCheckAndMutate_WithFilters() throws Throwable {
final byte[] FAMILY = Bytes.toBytes("fam");
@@ -2232,6 +2242,7 @@ public class TestHRegion {
}
@Test
+ @Deprecated
public void testCheckAndMutate_WithFiltersAndTimeRange() throws Throwable {
final byte[] FAMILY = Bytes.toBytes("fam");
@@ -2280,6 +2291,7 @@ public class TestHRegion {
}
@Test
+ @Deprecated
public void testCheckAndMutate_wrongMutationType() throws Throwable {
// Setting up region
this.region = initHRegion(tableName, method, CONF, fam1);
@@ -2289,7 +2301,7 @@ public class TestHRegion {
new Increment(row).addColumn(fam1, qual1, 1));
fail("should throw DoNotRetryIOException");
} catch (DoNotRetryIOException e) {
- assertEquals("Action must be Put or Delete", e.getMessage());
+ assertEquals("Unsupported mutate type: INCREMENT", e.getMessage());
}
try {
@@ -2298,11 +2310,12 @@ public class TestHRegion {
new Increment(row).addColumn(fam1, qual1, 1));
fail("should throw DoNotRetryIOException");
} catch (DoNotRetryIOException e) {
- assertEquals("Action must be Put or Delete", e.getMessage());
+ assertEquals("Unsupported mutate type: INCREMENT", e.getMessage());
}
}
@Test
+ @Deprecated
public void testCheckAndMutate_wrongRow() throws Throwable {
final byte[] wrongRow = Bytes.toBytes("wrongRow");
@@ -2314,7 +2327,8 @@ public class TestHRegion {
new Put(wrongRow).addColumn(fam1, qual1, value1));
fail("should throw DoNotRetryIOException");
} catch (DoNotRetryIOException e) {
- assertEquals("Action's getRow must match", e.getMessage());
+ assertEquals("The row of the action (Put/Delete/RowMutations) <wrongRow> doesn't "
+ + "match the original one <rowA>", e.getMessage());
}
try {
@@ -2323,7 +2337,8 @@ public class TestHRegion {
new Put(wrongRow).addColumn(fam1, qual1, value1));
fail("should throw DoNotRetryIOException");
} catch (DoNotRetryIOException e) {
- assertEquals("Action's getRow must match", e.getMessage());
+ assertEquals("The row of the action (Put/Delete/RowMutations) <wrongRow> doesn't "
+ + "match the original one <rowA>", e.getMessage());
}
try {
@@ -2335,7 +2350,8 @@ public class TestHRegion {
.add((Mutation) new Delete(wrongRow).addColumns(fam1, qual2)));
fail("should throw DoNotRetryIOException");
} catch (DoNotRetryIOException e) {
- assertEquals("Action's getRow must match", e.getMessage());
+ assertEquals("The row of the action (Put/Delete/RowMutations) <wrongRow> doesn't "
+ + "match the original one <rowA>", e.getMessage());
}
try {
@@ -2347,10 +2363,494 @@ public class TestHRegion {
.add((Mutation) new Delete(wrongRow).addColumns(fam1, qual2)));
fail("should throw DoNotRetryIOException");
} catch (DoNotRetryIOException e) {
- assertEquals("Action's getRow must match", e.getMessage());
+ assertEquals("The row of the action (Put/Delete/RowMutations) <wrongRow> doesn't "
+ + "match the original one <rowA>", e.getMessage());
}
}
+ @Test
+ public void testCheckAndMutateWithEmptyRowValue() throws IOException {
+ byte[] row1 = Bytes.toBytes("row1");
+ byte[] fam1 = Bytes.toBytes("fam1");
+ byte[] qf1 = Bytes.toBytes("qualifier");
+ byte[] emptyVal = new byte[] {};
+ byte[] val1 = Bytes.toBytes("value1");
+ byte[] val2 = Bytes.toBytes("value2");
+
+ // Setting up region
+ this.region = initHRegion(tableName, method, CONF, fam1);
+ // Putting empty data in key
+ Put put = new Put(row1);
+ put.addColumn(fam1, qf1, emptyVal);
+
+ // checkAndPut with empty value
+ CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+ .ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(put));
+ assertTrue(res.isSuccess());
+
+ // Putting data in key
+ put = new Put(row1);
+ put.addColumn(fam1, qf1, val1);
+
+ // checkAndPut with correct value
+ res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+ .ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(put));
+ assertTrue(res.isSuccess());
+
+ // not empty anymore
+ res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+ .ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(put));
+ assertFalse(res.isSuccess());
+
+ Delete delete = new Delete(row1);
+ delete.addColumn(fam1, qf1);
+ res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+ .ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(delete));
+ assertFalse(res.isSuccess());
+
+ put = new Put(row1);
+ put.addColumn(fam1, qf1, val2);
+ // checkAndPut with correct value
+ res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+ .ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(put));
+ assertTrue(res.isSuccess());
+
+ // checkAndDelete with correct value
+ delete = new Delete(row1);
+ delete.addColumn(fam1, qf1);
+ delete.addColumn(fam1, qf1);
+ res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+ .ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(delete));
+ assertTrue(res.isSuccess());
+
+ delete = new Delete(row1);
+ res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+ .ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(delete));
+ assertTrue(res.isSuccess());
+
+ // checkAndPut looking for a null value
+ put = new Put(row1);
+ put.addColumn(fam1, qf1, val1);
+
+ res = region.checkAndMutate(CheckAndMutate.newBuilder(row1).ifNotExists(fam1, qf1)
+ .build(put));
+ assertTrue(res.isSuccess());
+ }
+
+ @Test
+ public void testCheckAndMutateWithWrongValue() throws IOException {
+ byte[] row1 = Bytes.toBytes("row1");
+ byte[] fam1 = Bytes.toBytes("fam1");
+ byte[] qf1 = Bytes.toBytes("qualifier");
+ byte[] val1 = Bytes.toBytes("value1");
+ byte[] val2 = Bytes.toBytes("value2");
+ BigDecimal bd1 = new BigDecimal(Double.MAX_VALUE);
+ BigDecimal bd2 = new BigDecimal(Double.MIN_VALUE);
+
+ // Setting up region
+ this.region = initHRegion(tableName, method, CONF, fam1);
+ // Putting data in key
+ Put put = new Put(row1);
+ put.addColumn(fam1, qf1, val1);
+ region.put(put);
+
+ // checkAndPut with wrong value
+ CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+ .ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(put));
+ assertFalse(res.isSuccess());
+
+ // checkAndDelete with wrong value
+ Delete delete = new Delete(row1);
+ delete.addFamily(fam1);
+ res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+ .ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(put));
+ assertFalse(res.isSuccess());
+
+ // Putting data in key
+ put = new Put(row1);
+ put.addColumn(fam1, qf1, Bytes.toBytes(bd1));
+ region.put(put);
+
+ // checkAndPut with wrong value
+ res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+ .ifMatches(fam1, qf1, CompareOperator.EQUAL, Bytes.toBytes(bd2)).build(put));
+ assertFalse(res.isSuccess());
+
+ // checkAndDelete with wrong value
+ delete = new Delete(row1);
+ delete.addFamily(fam1);
+ res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+ .ifMatches(fam1, qf1, CompareOperator.EQUAL, Bytes.toBytes(bd2)).build(delete));
+ assertFalse(res.isSuccess());
+ }
+
+ @Test
+ public void testCheckAndMutateWithCorrectValue() throws IOException {
+ byte[] row1 = Bytes.toBytes("row1");
+ byte[] fam1 = Bytes.toBytes("fam1");
+ byte[] qf1 = Bytes.toBytes("qualifier");
+ byte[] val1 = Bytes.toBytes("value1");
+ BigDecimal bd1 = new BigDecimal(Double.MIN_VALUE);
+
+ // Setting up region
+ this.region = initHRegion(tableName, method, CONF, fam1);
+ // Putting data in key
+ long now = System.currentTimeMillis();
+ Put put = new Put(row1);
+ put.addColumn(fam1, qf1, now, val1);
+ region.put(put);
+
+ // checkAndPut with correct value
+ CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+ .ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(put));
+ assertTrue("First", res.isSuccess());
+
+ // checkAndDelete with correct value
+ Delete delete = new Delete(row1, now + 1);
+ delete.addColumn(fam1, qf1);
+ res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+ .ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(delete));
+ assertTrue("Delete", res.isSuccess());
+
+ // Putting data in key
+ put = new Put(row1);
+ put.addColumn(fam1, qf1, now + 2, Bytes.toBytes(bd1));
+ region.put(put);
+
+ // checkAndPut with correct value
+ res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+ .ifMatches(fam1, qf1, CompareOperator.EQUAL, Bytes.toBytes(bd1)).build(put));
+ assertTrue("Second put", res.isSuccess());
+
+ // checkAndDelete with correct value
+ delete = new Delete(row1, now + 3);
+ delete.addColumn(fam1, qf1);
+ res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+ .ifMatches(fam1, qf1, CompareOperator.EQUAL, Bytes.toBytes(bd1)).build(delete));
+ assertTrue("Second delete", res.isSuccess());
+ }
+
+ @Test
+ public void testCheckAndMutateWithNonEqualCompareOp() throws IOException {
+ byte[] row1 = Bytes.toBytes("row1");
+ byte[] fam1 = Bytes.toBytes("fam1");
+ byte[] qf1 = Bytes.toBytes("qualifier");
+ byte[] val1 = Bytes.toBytes("value1");
+ byte[] val2 = Bytes.toBytes("value2");
+ byte[] val3 = Bytes.toBytes("value3");
+ byte[] val4 = Bytes.toBytes("value4");
+
+ // Setting up region
+ this.region = initHRegion(tableName, method, CONF, fam1);
+ // Putting val3 in key
+ Put put = new Put(row1);
+ put.addColumn(fam1, qf1, val3);
+ region.put(put);
+
+ // Test CompareOp.LESS: original = val3, compare with val3, fail
+ CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+ .ifMatches(fam1, qf1, CompareOperator.LESS, val3).build(put));
+ assertFalse(res.isSuccess());
+
+ // Test CompareOp.LESS: original = val3, compare with val4, fail
+ res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+ .ifMatches(fam1, qf1, CompareOperator.LESS, val4).build(put));
+ assertFalse(res.isSuccess());
+
+ // Test CompareOp.LESS: original = val3, compare with val2,
+ // succeed (now value = val2)
+ put = new Put(row1);
+ put.addColumn(fam1, qf1, val2);
+ res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+ .ifMatches(fam1, qf1, CompareOperator.LESS, val2).build(put));
+ assertTrue(res.isSuccess());
+
+ // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val3, fail
+ res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+ .ifMatches(fam1, qf1, CompareOperator.LESS_OR_EQUAL, val3).build(put));
+ assertFalse(res.isSuccess());
+
+ // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val2,
+ // succeed (value still = val2)
+ res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+ .ifMatches(fam1, qf1, CompareOperator.LESS_OR_EQUAL, val2).build(put));
+ assertTrue(res.isSuccess());
+
+ // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val1,
+ // succeed (now value = val3)
+ put = new Put(row1);
+ put.addColumn(fam1, qf1, val3);
+ res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+ .ifMatches(fam1, qf1, CompareOperator.LESS_OR_EQUAL, val1).build(put));
+ assertTrue(res.isSuccess());
+
+ // Test CompareOp.GREATER: original = val3, compare with val3, fail
+ res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+ .ifMatches(fam1, qf1, CompareOperator.GREATER, val3).build(put));
+ assertFalse(res.isSuccess());
+
+ // Test CompareOp.GREATER: original = val3, compare with val2, fail
+ res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+ .ifMatches(fam1, qf1, CompareOperator.GREATER, val2).build(put));
+ assertFalse(res.isSuccess());
+
+ // Test CompareOp.GREATER: original = val3, compare with val4,
+ // succeed (now value = val2)
+ put = new Put(row1);
+ put.addColumn(fam1, qf1, val2);
+ res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+ .ifMatches(fam1, qf1, CompareOperator.GREATER, val4).build(put));
+ assertTrue(res.isSuccess());
+
+ // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val1, fail
+ res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+ .ifMatches(fam1, qf1, CompareOperator.GREATER_OR_EQUAL, val1).build(put));
+ assertFalse(res.isSuccess());
+
+ // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val2,
+ // succeed (value still = val2)
+ res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+ .ifMatches(fam1, qf1, CompareOperator.GREATER_OR_EQUAL, val2).build(put));
+ assertTrue(res.isSuccess());
+
+ // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val3, succeed
+ res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+ .ifMatches(fam1, qf1, CompareOperator.GREATER_OR_EQUAL, val3).build(put));
+ assertTrue(res.isSuccess());
+ }
+
+ @Test
+ public void testCheckAndPutThatPutWasWritten() throws IOException {
+ byte[] row1 = Bytes.toBytes("row1");
+ byte[] fam1 = Bytes.toBytes("fam1");
+ byte[] fam2 = Bytes.toBytes("fam2");
+ byte[] qf1 = Bytes.toBytes("qualifier");
+ byte[] val1 = Bytes.toBytes("value1");
+ byte[] val2 = Bytes.toBytes("value2");
+
+ byte[][] families = { fam1, fam2 };
+
+ // Setting up region
+ this.region = initHRegion(tableName, method, CONF, families);
+ // Putting data in the key to check
+ Put put = new Put(row1);
+ put.addColumn(fam1, qf1, val1);
+ region.put(put);
+
+ // Creating put to add
+ long ts = System.currentTimeMillis();
+ KeyValue kv = new KeyValue(row1, fam2, qf1, ts, KeyValue.Type.Put, val2);
+ put = new Put(row1);
+ put.add(kv);
+
+ // checkAndPut with wrong value
+ CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+ .ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(put));
+ assertTrue(res.isSuccess());
+
+ Get get = new Get(row1);
+ get.addColumn(fam2, qf1);
+ Cell[] actual = region.get(get).rawCells();
+
+ Cell[] expected = { kv };
+
+ assertEquals(expected.length, actual.length);
+ for (int i = 0; i < actual.length; i++) {
+ assertEquals(expected[i], actual[i]);
+ }
+ }
+
+ @Test
+ public void testCheckAndDeleteThatDeleteWasWritten() throws IOException {
+ byte[] row1 = Bytes.toBytes("row1");
+ byte[] fam1 = Bytes.toBytes("fam1");
+ byte[] fam2 = Bytes.toBytes("fam2");
+ byte[] qf1 = Bytes.toBytes("qualifier1");
+ byte[] qf2 = Bytes.toBytes("qualifier2");
+ byte[] qf3 = Bytes.toBytes("qualifier3");
+ byte[] val1 = Bytes.toBytes("value1");
+ byte[] val2 = Bytes.toBytes("value2");
+ byte[] val3 = Bytes.toBytes("value3");
+ byte[] emptyVal = new byte[] {};
+
+ byte[][] families = { fam1, fam2 };
+
+ // Setting up region
+ this.region = initHRegion(tableName, method, CONF, families);
+ // Put content
+ Put put = new Put(row1);
+ put.addColumn(fam1, qf1, val1);
+ region.put(put);
+ Threads.sleep(2);
+
+ put = new Put(row1);
+ put.addColumn(fam1, qf1, val2);
+ put.addColumn(fam2, qf1, val3);
+ put.addColumn(fam2, qf2, val2);
+ put.addColumn(fam2, qf3, val1);
+ put.addColumn(fam1, qf3, val1);
+ region.put(put);
+
+ LOG.info("get={}", region.get(new Get(row1).addColumn(fam1, qf1)).toString());
+
+ // Multi-column delete
+ Delete delete = new Delete(row1);
+ delete.addColumn(fam1, qf1);
+ delete.addColumn(fam2, qf1);
+ delete.addColumn(fam1, qf3);
+ CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+ .ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(delete));
+ assertTrue(res.isSuccess());
+
+ Get get = new Get(row1);
+ get.addColumn(fam1, qf1);
+ get.addColumn(fam1, qf3);
+ get.addColumn(fam2, qf2);
+ Result r = region.get(get);
+ assertEquals(2, r.size());
+ assertArrayEquals(val1, r.getValue(fam1, qf1));
+ assertArrayEquals(val2, r.getValue(fam2, qf2));
+
+ // Family delete
+ delete = new Delete(row1);
+ delete.addFamily(fam2);
+ res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+ .ifMatches(fam2, qf1, CompareOperator.EQUAL, emptyVal).build(delete));
+ assertTrue(res.isSuccess());
+
+ get = new Get(row1);
+ r = region.get(get);
+ assertEquals(1, r.size());
+ assertArrayEquals(val1, r.getValue(fam1, qf1));
+
+ // Row delete
+ delete = new Delete(row1);
+ res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+ .ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(delete));
+ assertTrue(res.isSuccess());
+ get = new Get(row1);
+ r = region.get(get);
+ assertEquals(0, r.size());
+ }
+
+ @Test
+ public void testCheckAndMutateWithFilters() throws Throwable {
+ final byte[] FAMILY = Bytes.toBytes("fam");
+
+ // Setting up region
+ this.region = initHRegion(tableName, method, CONF, FAMILY);
+
+ // Put one row
+ Put put = new Put(row);
+ put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
+ put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
+ put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
+ region.put(put);
+
+ // Put with success
+ CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifMatches(new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+ Bytes.toBytes("b"))))
+ .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))));
+ assertTrue(res.isSuccess());
+
+ Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D")));
+ assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+ // Put with failure
+ res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifMatches(new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+ Bytes.toBytes("c"))))
+ .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))));
+ assertFalse(res.isSuccess());
+
+ assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).isEmpty());
+
+ // Delete with success
+ res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifMatches(new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+ Bytes.toBytes("b"))))
+ .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D"))));
+ assertTrue(res.isSuccess());
+
+ assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).isEmpty());
+
+ // Mutate with success
+ res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifMatches(new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+ Bytes.toBytes("b"))))
+ .build(new RowMutations(row)
+ .add((Mutation) new Put(row)
+ .addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))
+ .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))));
+ assertTrue(res.isSuccess());
+
+ result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("E")));
+ assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E"))));
+
+ assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).isEmpty());
+ }
+
+ @Test
+ public void testCheckAndMutateWithFiltersAndTimeRange() throws Throwable {
+ final byte[] FAMILY = Bytes.toBytes("fam");
+
+ // Setting up region
+ this.region = initHRegion(tableName, method, CONF, FAMILY);
+
+ // Put with specifying the timestamp
+ region.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")));
+
+ // Put with success
+ CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")))
+ .timeRange(TimeRange.between(0, 101))
+ .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))));
+ assertTrue(res.isSuccess());
+
+ Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B")));
+ assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+ // Put with failure
+ res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")))
+ .timeRange(TimeRange.between(0, 100))
+ .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))));
+ assertFalse(res.isSuccess());
+
+ assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).isEmpty());
+
+ // Mutate with success
+ res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")))
+ .timeRange(TimeRange.between(0, 101))
+ .build(new RowMutations(row)
+ .add((Mutation) new Put(row)
+ .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
+ .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))));
+ assertTrue(res.isSuccess());
+
+ result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D")));
+ assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+ assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).isEmpty());
+ }
+
// ////////////////////////////////////////////////////////////////////////////
// Delete tests
// ////////////////////////////////////////////////////////////////////////////
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java
index 4bf71c8..d046b36 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java
@@ -151,6 +151,7 @@ public class TestMetricsRegionServer {
rsm.updateDelete(null, 17);
rsm.updateCheckAndDelete(17);
rsm.updateCheckAndPut(17);
+ rsm.updateCheckAndMutate(17);
}
HELPER.assertCounter("appendNumOps", 24, serverSource);
@@ -162,7 +163,7 @@ public class TestMetricsRegionServer {
HELPER.assertCounter("deleteNumOps", 17, serverSource);
HELPER.assertCounter("checkAndDeleteNumOps", 17, serverSource);
HELPER.assertCounter("checkAndPutNumOps", 17, serverSource);
-
+ HELPER.assertCounter("checkAndMutateNumOps", 17, serverSource);
HELPER.assertCounter("slowAppendCount", 12, serverSource);
HELPER.assertCounter("slowDeleteCount", 13, serverSource);